下面列出了io.reactivex.Single#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Single<BrowserRecord> makeNewFile(String name, int type)
{
return Single.create(emitter -> CreateNewFile.createObservable(
getActivity().getApplicationContext(),
getLocation(),
name,
type,
false
).compose(bindToLifecycle()).
subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(rec -> {
addRecordToList(rec);
if(!emitter.isDisposed())
emitter.onSuccess(rec);
},
err -> Logger.showAndLog(getActivity(), err)));
}
@Override
protected Single<Bitmap> build(String amount) {
return Single.create(emitter -> {
String username = preferenceUtils.retrieveUsername();
String qrText = username + "," + amount;
Map<EncodeHintType, String> hints = new HashMap<>();
hints.put(EncodeHintType.CHARACTER_SET, "UTF-8");
QRCode qrCode = Encoder.encode(qrText, ErrorCorrectionLevel.H, hints);
final ByteMatrix byteMatrix = qrCode.getMatrix();
final int width = byteMatrix.getWidth();
final int height = byteMatrix.getHeight();
final Bitmap bitmap = Bitmap.createBitmap(width, height, Bitmap.Config.ARGB_8888);
for (int y = 0; y < height; y++) {
for (int x = 0; x < width; x++) {
byte val = byteMatrix.get(x, y);
bitmap.setPixel(x, y, val == 1 ? Color.BLACK : Color.WHITE);
}
}
emitter.onSuccess(Bitmap.createScaledBitmap(bitmap, SIZE, SIZE, false));
});
}
/**
* Executes single request
*
* @param url URL for request
* @return Response
*/
public Single<Response> singleRequest(String url) {
Request request = new Request.Builder()
.url(url)
.build();
return Single.create(e -> {
Response response = mHttpClient.newCall(request).execute();
e.onSuccess(response);
});
}
public static Single<List<File>> getSDTxtFile(){
//外部存储卡路径
String rootPath = Environment.getExternalStorageDirectory().getPath();
return Single.create(new SingleOnSubscribe<List<File>>() {
@Override
public void subscribe(SingleEmitter<List<File>> e) throws Exception {
List<File> files = getTxtFiles(rootPath,0);
e.onSuccess(files);
}
});
}
/**
* The returned Single emits one Query result as a List.
*/
public static <T> Single<List<T>> single(final Query<T> query) {
return Single.create(emitter -> {
query.subscribe().single().observer(data -> {
if (!emitter.isDisposed()) {
emitter.onSuccess(data);
}
});
// no need to cancel, single never subscribes
});
}
private Single<World> updateWorld(World world) {
return Single.create(sink -> pgClients.getOne().preparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2", Tuple.of(world.randomNumber, world.id), ar -> {
if (ar.failed()) {
sink.onError(ar.cause());
} else {
sink.onSuccess(world);
}
}));
}
@Override @NonNull public Single<List<T>> observeRemove(
@NonNull final PredicateFunc<T> predicateFunc) {
assertNotNull(predicateFunc, "predicateFunc");
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists()) {
emitter.onSuccess(Collections.<T>emptyList());
return;
}
List<T> originalList = converter.read(file, type);
if (originalList == null) originalList = Collections.emptyList();
int indexOfItemToRemove = -1;
for (int i = 0; i < originalList.size(); i++) {
if (predicateFunc.test(originalList.get(i))) {
indexOfItemToRemove = i;
break;
}
}
List<T> modifiedList = new ArrayList<T>(originalList);
if (indexOfItemToRemove != -1) {
modifiedList.remove(indexOfItemToRemove);
converterWrite(modifiedList, converter, type, file);
}
emitter.onSuccess(modifiedList);
updateSubject.onNext(modifiedList);
}
});
}
});
}
@Override
public Single<DeleteByQueryResponse> deleteByQuery(List<String> indices, DeleteByQueryOptions options) {
return Single.create(handler -> {
elasticSearchService.deleteByQuery(indices, options, response -> {
if (response.succeeded()) {
handler.onSuccess(response.result());
} else {
handler.onError(response.cause());
}
});
});
}
public Single<PublishToken> create(final String topic,
final MqttMessage msg) {
return Single.create(emitter -> {
try {
this.client.publish(topic, msg.getPayload(), msg.getQos(),
msg.isRetained(), null,
new PublishActionListener(emitter));
} catch (final MqttException exception) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.log(Level.SEVERE, exception.getMessage(), exception);
}
emitter.onError(exception);
}
});
}
@Override
public Single<MultiSearchResponse> multiSearch(final List<MultiSearchQueryOptions> multiSearchQueryOptions, MultiSearchOptions options) {
return Single.create(handler -> {
elasticSearchService.multiSearch(multiSearchQueryOptions, options, response -> {
if (response.succeeded()) {
handler.onSuccess(response.result());
} else {
handler.onError(response.cause());
}
});
});
}
@Override
public Single<World> getWorld(int id) {
return Single.create(sink ->
pgClients.getOne().preparedQuery("SELECT * FROM world WHERE id = $1", Tuple.of(id), ar -> {
if (ar.failed()) {
sink.onError(ar.cause());
} else {
final Row row = ar.result().iterator().next();
World world = new World(row.getInteger(0), row.getInteger(1));
sink.onSuccess(world);
}
}));
}
public static Single<CachedPathInfo> create(Location loc)
{
return Single.create(emitter -> {
CachedPathInfo cachedPathInfo = new CachedPathInfoBase();
cachedPathInfo.init(loc.getCurrentPath());
emitter.onSuccess(cachedPathInfo);
});
}
@Override
protected Single<Responses.Account> build(String accountId) {
return Single.create(emitter -> {
long currentTime = System.currentTimeMillis();
Keypair adminKeys = crypto.convertFromExisting(PUB_KEY, PRIV_KEY);
// GetAccount
UnsignedQuery query = modelQueryBuilder
.createdTime(BigInteger.valueOf(currentTime))
.queryCounter(BigInteger.valueOf(QUERY_COUNTER))
.creatorAccountId(CREATOR)
.getAccount(accountId + "@" + DOMAIN_ID)
.build();
// sign transaction and get its binary representation (Blob)
protoQueryHelper = new ModelProtoQuery(query);
ByteVector queryBlob = protoQueryHelper.signAndAddSignature(adminKeys).finish().blob();
byte bquery[] = toByteArray(queryBlob);
Queries.Query protoQuery = null;
try {
protoQuery = Queries.Query.parseFrom(bquery);
} catch (InvalidProtocolBufferException e) {
emitter.onError(e);
}
QueryServiceGrpc.QueryServiceBlockingStub queryStub = QueryServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(CONNECTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Responses.QueryResponse queryResponse = queryStub.find(protoQuery);
emitter.onSuccess(queryResponse.getAccountResponse().getAccount());
});
}
@Override
protected Single<String> build(Void v) {
return Single.create(emitter -> {
long currentTime = System.currentTimeMillis();
Keypair userKeys = preferenceUtils.retrieveKeys();
String username = preferenceUtils.retrieveUsername();
UnsignedQuery accountDetails = modelQueryBuilder.creatorAccountId(username + "@" + DOMAIN_ID)
.queryCounter(BigInteger.valueOf(QUERY_COUNTER))
.createdTime(BigInteger.valueOf(currentTime))
.getAccountDetail(username + "@" + DOMAIN_ID)
.build();
protoQueryHelper = new ModelProtoQuery(accountDetails);
ByteVector queryBlob = protoQueryHelper.signAndAddSignature(userKeys).finish().blob();
byte bquery[] = toByteArray(queryBlob);
Queries.Query protoQuery = null;
try {
protoQuery = Queries.Query.parseFrom(bquery);
} catch (InvalidProtocolBufferException e) {
emitter.onError(e);
}
QueryServiceGrpc.QueryServiceBlockingStub queryStub = QueryServiceGrpc.newBlockingStub(channel);
Responses.QueryResponse queryResponse = queryStub.find(protoQuery);
JsonElement jsonElement = new Gson().fromJson(queryResponse.getAccountDetailResponse().getDetail(), JsonObject.class).get(username + "@" + DOMAIN_ID);
;
String detail = jsonElement != null ? jsonElement.getAsJsonObject().get(Constants.ACCOUNT_DETAILS).getAsString() : "";
emitter.onSuccess(detail);
});
}
@Override
public Single<Tokens> getTokens() {
return Single.create(emitter -> emitter.onSuccess(mSyncSessionClient.getTokens()));
}
/**
* Emits BluetoothGatt and completes after connection is established.
*
* @return BluetoothGatt after connection reaches {@link com.polidea.rxandroidble2.RxBleConnection.RxBleConnectionState#CONNECTED}
* state.
* @throws com.polidea.rxandroidble2.exceptions.BleDisconnectedException if connection was disconnected/failed before
* it was established.
*/
@NonNull
private Single<BluetoothGatt> getConnectedBluetoothGatt() {
// start connecting the BluetoothGatt
// note: Due to different Android BLE stack implementations it is not certain whether `connectGatt()` or `BluetoothGattCallback`
// will emit BluetoothGatt first
return Single.create(new SingleOnSubscribe<BluetoothGatt>() {
@Override
public void subscribe(final SingleEmitter<BluetoothGatt> emitter) {
final DisposableSingleObserver<BluetoothGatt> disposableGattObserver = getBluetoothGattAndChangeStatusToConnected()
// when the connected state will be emitted bluetoothGattProvider should contain valid Gatt
.delaySubscription(
rxBleGattCallback
.getOnConnectionStateChange()
.filter(new Predicate<RxBleConnection.RxBleConnectionState>() {
@Override
public boolean test(RxBleConnection.RxBleConnectionState rxBleConnectionState) {
return rxBleConnectionState == CONNECTED;
}
})
)
// disconnect may happen even if the connection was not established yet
.mergeWith(rxBleGattCallback.<BluetoothGatt>observeDisconnect().firstOrError())
.firstOrError()
.subscribeWith(disposableSingleObserverFromEmitter(emitter));
emitter.setDisposable(disposableGattObserver);
connectionStateChangedAction.onConnectionStateChange(CONNECTING);
/*
* Apparently the connection may be established fast enough to introduce a race condition so the subscription
* must be established first before starting the connection.
* https://github.com/Polidea/RxAndroidBle/issues/178
* */
final BluetoothGatt bluetoothGatt = connectionCompat
.connectGatt(bluetoothDevice, autoConnect, rxBleGattCallback.getBluetoothGattCallback());
/*
* Update BluetoothGatt when connection is initiated. It is not certain
* if this or RxBleGattCallback.onConnectionStateChange will be first.
* */
bluetoothGattProvider.updateBluetoothGatt(bluetoothGatt);
}
});
}
public Single<SuBinary> build() {
return Single.create(emitter -> {
Type type = Type.NONE;
String path = null;
String version = null;
String extra = null;
final List<String> rawResult = new ArrayList<>();
Cmd.Result versionResult = trySession(Cmd.builder("su --version"));
if (versionResult.getExitCode() != Cmd.ExitCode.OK && versionResult.getExitCode() != Cmd.ExitCode.EXCEPTION) {
versionResult = Cmd.builder("su --V", "su -version", "su -v", "su -V").timeout(5000).execute(session);
}
rawResult.addAll(versionResult.getOutput());
// Did we hear a faint response?
if (versionResult.getOutput().size() > 0 || versionResult.getExitCode() == Cmd.ExitCode.OK) {
type = Type.UNKNOWN;
}
// Who's there?
for (String line : versionResult.merge()) {
for (Map.Entry<Pattern, Type> entry : PATTERNMAP.entrySet()) {
Matcher matcher = entry.getKey().matcher(line);
if (matcher.matches()) {
type = entry.getValue();
if (matcher.groupCount() == 1) {
version = matcher.group(1);
} else if (matcher.groupCount() == 2) {
version = matcher.group(1);
extra = matcher.group(2);
}
break;
}
}
}
if (type != Type.NONE) {
Cmd.Result pathResult = trySession(Cmd.builder("command -v su"));
if (pathResult.getExitCode() == Cmd.ExitCode.OK) {
if (pathResult.getOutput().size() == 1) {
path = pathResult.getOutput().get(0);
} else {
Timber.tag(TAG).w("Unexpected su binary path: %s", pathResult.getOutput());
}
}
}
emitter.onSuccess(new SuBinary(type, path, version, extra, rawResult));
});
}
@Override @NonNull public Single<List<T>> observeAddOrReplace(@NonNull final T value,
@NonNull final PredicateFunc<T> predicateFunc) {
assertNotNull(value, "value");
assertNotNull(predicateFunc, "predicateFunc");
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists() && !file.createNewFile()) {
throw new IOException("Could not create store.");
}
List<T> originalList = converter.read(file, type);
if (originalList == null) originalList = Collections.emptyList();
int indexOfItemToReplace = -1;
for (int i = 0; i < originalList.size(); i++) {
if (predicateFunc.test(originalList.get(i))) {
indexOfItemToReplace = i;
break;
}
}
int modifiedListSize = indexOfItemToReplace == -1 ? originalList.size() + 1 :
originalList.size();
List<T> modifiedList = new ArrayList<T>(modifiedListSize);
modifiedList.addAll(originalList);
if (indexOfItemToReplace == -1) {
modifiedList.add(value);
} else {
modifiedList.remove(indexOfItemToReplace);
modifiedList.add(indexOfItemToReplace, value);
}
converterWrite(modifiedList, converter, type, file);
emitter.onSuccess(modifiedList);
updateSubject.onNext(modifiedList);
}
});
}
});
}
@Override
public Single<Boolean> isLoggedIn() {
return Single.create(emitter -> emitter.onSuccess(mSyncSessionClient.isAuthenticated()));
}
@Override
public Single<Tokens> getTokens() {
return Single.create(emitter -> emitter.onSuccess(mSyncSessionClientImpl.getTokens()));
}