下面列出了怎么用io.reactivex.subjects.SingleSubject的API类实例代码及写法,或者点击链接到github查看源代码。
private void processRead(SingleSubject<Pair<UUID, byte[]>> scopedSubject, UUID svc, UUID chr) {
synchronized (syncRoot) {
PeripheralError error = subscribeChecks();
if (error != null) {
scopedSubject.onError(error);
return;
}
BluetoothGattCharacteristic characteristic = getCharacteristic(svc, chr);
if (characteristic == null) {
scopedSubject.onError(new PeripheralError(MISSING_CHARACTERISTIC));
return;
}
readSubject = scopedSubject;
currentOperation = readSubject;
if (bluetoothGatt != null && !bluetoothGatt.readCharacteristic(characteristic)) {
readSubject.onError(new PeripheralError(READ_CHARACTERISTIC_FAILED, ERROR_STATUS_CALL_FAILED));
}
}
}
private void processUnregisterNotification(
SingleSubject<UUID> scopedSubject, UUID svc, UUID chr) {
synchronized (syncRoot) {
PeripheralError error = subscribeChecks();
if (error != null) {
scopedSubject.onError(error);
return;
}
BluetoothGattCharacteristic characteristic = getCharacteristic(svc, chr);
if (characteristic == null) {
scopedSubject.onError(new PeripheralError(MISSING_CHARACTERISTIC));
return;
}
registerNotificationSubject = scopedSubject;
currentOperation = registerNotificationSubject;
if (bluetoothGatt != null) {
error = setCharacteristicNotification(bluetoothGatt, characteristic, false);
if (error != null) {
registerNotificationSubject.onError(new PeripheralError(UNREGISTER_NOTIFICATION_FAILED, error));
}
}
}
}
@TargetApi(21)
private void processRequestMtu(SingleSubject<Integer> scopedSubject, int mtu) {
synchronized (syncRoot) {
PeripheralError error = subscribeChecks();
if (error != null) {
scopedSubject.onError(error);
return;
}
requestMtuSubject = scopedSubject;
currentOperation = requestMtuSubject;
if (bluetoothGatt != null && !bluetoothGatt.requestMtu(mtu)) {
requestMtuSubject.onError(new PeripheralError(REQUEST_MTU_FAILED, ERROR_STATUS_CALL_FAILED));
}
}
}
private void processReadRssi(SingleSubject<Integer> scopedSubject) {
synchronized (syncRoot) {
PeripheralError error = subscribeChecks();
if (error != null) {
scopedSubject.onError(error);
return;
}
readRssiSubject = scopedSubject;
currentOperation = readRssiSubject;
if (bluetoothGatt != null && !bluetoothGatt.readRemoteRssi()) {
readRssiSubject.onError(new PeripheralError(READ_RSSI_FAILED, ERROR_STATUS_CALL_FAILED));
}
}
}
public void start() {
VertxOptions options = new VertxOptions();
options.setMaxEventLoopExecuteTime(Long.MAX_VALUE);
SingleSubject waitSubject = SingleSubject.create();
Handler<AsyncResult<String>> completionHandler = event -> {
if (event.succeeded()) {
waitSubject.onSuccess(event.result());
} else {
event.cause().printStackTrace();
System.exit(0);
}
};
vertx = Vertx.vertx(options);
vertx.deployVerticle(vehicle, completionHandler);
waitSubject.blockingGet();
}
@Override
public Single<CharactersResponse> loadCharacter(String query,
String privateKey,
String publicKey,
long timestamp) {
if (characterSubscription == null || characterSubscription.isDisposed()) {
characterSubject = SingleSubject.create();
// generate hash using timestamp and API keys
String hash = HashGenerator.generate(timestamp, privateKey, publicKey);
characterSubscription = api.getCharacters(query, publicKey, hash, timestamp)
.subscribeOn(scheduler.backgroundThread())
.subscribe(characterSubject::onSuccess);
}
return characterSubject.hide();
}
@Override
public Single<byte[]> read(UUID svc, UUID chr) {
final SingleSubject<Pair<UUID, byte[]>> scopedSubject = SingleSubject.create();
return scopedSubject
.filter(chrPair -> chrPair.first.equals(chr))
.flatMapSingle(chrPair -> Single.just(chrPair.second))
.doOnSubscribe(disposable -> processRead(scopedSubject, svc, chr))
.doFinally(() -> clearReadSubject(scopedSubject));
}
@Override
public Completable write(UUID svc, UUID chr, byte[] data) {
final SingleSubject<UUID> scopedSubject = SingleSubject.create();
return scopedSubject
.filter(writeChr -> writeChr.equals(chr))
.ignoreElement()
.doOnSubscribe(disposable -> processWrite(scopedSubject, svc, chr, data))
.doFinally(() -> clearWriteSubject(scopedSubject));
}
@Override
public Completable registerNotification(UUID svc, UUID chr, @Nullable Preprocessor preprocessor) {
final SingleSubject<UUID> scopedSubject = SingleSubject.create();
return scopedSubject
.filter(regChr -> regChr.equals(chr))
.ignoreElement()
.doOnSubscribe(
disposable -> processRegisterNotification(scopedSubject, svc, chr, preprocessor))
.doFinally(() -> clearRegisterNotificationSubject(scopedSubject));
}
@Override
public Completable unregisterNotification(UUID svc, UUID chr) {
final SingleSubject<UUID> scopedSubject = SingleSubject.create();
return scopedSubject
.filter(regChr -> regChr.equals(chr))
.ignoreElement()
.doOnSubscribe(disposable -> processUnregisterNotification(scopedSubject, svc, chr))
.doOnComplete(() -> preprocessorMap.remove(chr))
.doFinally(() -> clearRegisterNotificationSubject(scopedSubject));
}
@TargetApi(21)
@Override
public Single<Integer> requestMtu(int mtu) {
if (Build.VERSION.SDK_INT < Build.VERSION_CODES.LOLLIPOP) {
return Single.error(new PeripheralError(PeripheralError.Code.MINIMUM_SDK_UNSUPPORTED));
}
final SingleSubject<Integer> scopedSubject = SingleSubject.create();
return scopedSubject
.doOnSubscribe(disposable -> processRequestMtu(scopedSubject, mtu))
.doFinally(() -> clearRequestMtuSubject(scopedSubject));
}
@Override
public Single<Integer> readRssi() {
SingleSubject<Integer> scopedSubject = SingleSubject.create();
return scopedSubject
.doOnSubscribe(disposable -> processReadRssi(scopedSubject))
.doFinally(() -> clearReadRssiSubject(scopedSubject));
}
private void clearReadSubject(final SingleSubject<Pair<UUID, byte[]>> scopedSubject) {
synchronized (syncRoot) {
if (readSubject == scopedSubject) {
readSubject = null;
}
}
}
private void processWrite(SingleSubject<UUID> scopedSubject, UUID svc, UUID chr, byte[] data) {
synchronized (syncRoot) {
PeripheralError error = subscribeChecks();
if (error != null) {
scopedSubject.onError(error);
return;
}
BluetoothGattCharacteristic characteristic = getCharacteristic(svc, chr);
if (characteristic == null) {
scopedSubject.onError(new PeripheralError(MISSING_CHARACTERISTIC));
return;
}
writeSubject = scopedSubject;
currentOperation = writeSubject;
if ((characteristic.getProperties() & BluetoothGattCharacteristic.PROPERTY_WRITE_NO_RESPONSE)
== BluetoothGattCharacteristic.PROPERTY_WRITE_NO_RESPONSE) {
characteristic.setWriteType(BluetoothGattCharacteristic.WRITE_TYPE_NO_RESPONSE);
} else {
characteristic.setWriteType(BluetoothGattCharacteristic.WRITE_TYPE_DEFAULT);
}
if (!characteristic.setValue(data)) {
writeSubject.onError(new PeripheralError(CHARACTERISTIC_SET_VALUE_FAILED));
}
if (bluetoothGatt != null && !bluetoothGatt.writeCharacteristic(characteristic)) {
writeSubject.onError(new PeripheralError(WRITE_CHARACTERISTIC_FAILED, ERROR_STATUS_CALL_FAILED));
}
}
}
private void clearWriteSubject(final SingleSubject<UUID> scopedSubject) {
synchronized (syncRoot) {
if (writeSubject == scopedSubject) {
writeSubject = null;
}
}
}
private void processRegisterNotification(
SingleSubject<UUID> scopedSubject, UUID svc, UUID chr, @Nullable Preprocessor preprocessor) {
synchronized (syncRoot) {
PeripheralError error = subscribeChecks();
if (error != null) {
scopedSubject.onError(error);
return;
}
BluetoothGattCharacteristic characteristic = getCharacteristic(svc, chr);
if (characteristic == null) {
scopedSubject.onError(new PeripheralError(MISSING_CHARACTERISTIC));
return;
}
registerNotificationSubject = scopedSubject;
currentOperation = registerNotificationSubject;
if (bluetoothGatt != null) {
error = setCharacteristicNotification(bluetoothGatt, characteristic, true);
if (error != null) {
registerNotificationSubject.onError(new PeripheralError(REGISTER_NOTIFICATION_FAILED, error));
} else {
preprocessorMap.put(chr, preprocessor);
}
}
}
}
private void clearRegisterNotificationSubject(SingleSubject<UUID> scopedSubject) {
synchronized (syncRoot) {
if (registerNotificationSubject == scopedSubject) {
registerNotificationSubject = null;
}
}
}
private void clearRequestMtuSubject(SingleSubject<Integer> scopedSubject) {
synchronized (syncRoot) {
if (requestMtuSubject == scopedSubject) {
requestMtuSubject = null;
}
}
}
private void clearReadRssiSubject(SingleSubject<Integer> scopedSubject) {
synchronized (syncRoot) {
if (readRssiSubject == scopedSubject) {
readRssiSubject = null;
}
}
}
private Mono getBody(VertxRequestContext vertxRequestContext) {
HttpServerRequest serverRequest = vertxRequestContext.getServerRequest();
SingleSubject<VertxRequestContext> bodySubject = SingleSubject.create();
Handler<Buffer> bodyHandler = (event) -> {
vertxRequestContext.setRequestBody(event.toString().getBytes(StandardCharsets.UTF_8));
bodySubject.onSuccess(vertxRequestContext);
};
serverRequest.bodyHandler(bodyHandler);
return Mono.from(bodySubject.toFlowable());
}
@Override
public void stop() {
SingleSubject waitSubject = SingleSubject.create();
Handler<AsyncResult<Void>> completionHandler = event -> waitSubject.onSuccess("test");
vertx.close(completionHandler);
waitSubject.blockingGet();
vehicle.testModule.clear();
vertx = null;
vehicle = null;
port = -1;
}
public Single<Entity> getEntity(String guid) {
SingleSubject<Entity> e = map.get(guid);
if (e == null) {
e = SingleSubject.create();
SingleSubject<Entity> f = map.putIfAbsent(guid, e);
if (f == null) {
longLoadFromDatabase(guid).subscribe(e);
} else {
e = f;
}
}
return e;
}
public void stop() {
SingleSubject waitSubject = SingleSubject.create();
Handler<AsyncResult<Void>> completionHandler = event -> waitSubject.onSuccess("test");
vertx.close(completionHandler);
waitSubject.blockingGet();
}
@Test
public void race() throws Exception {
Worker w = Schedulers.newThread().createWorker();
try {
for (int i = 0; i < 1000; i++) {
Integer[] value = { 0, 0 };
TestObserver<Integer> to = new TestObserver<Integer>() {
@Override
public void onSuccess(Integer v) {
value[1] = value[0];
super.onSuccess(v);
}
};
SingleSubject<Integer> subj = SingleSubject.create();
subj.observeOn(Schedulers.single())
.onTerminateDetach()
.subscribe(to);
AtomicInteger wip = new AtomicInteger(2);
CountDownLatch cdl = new CountDownLatch(2);
w.schedule(() -> {
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0);
}
subj.onSuccess(1);
cdl.countDown();
});
Schedulers.single().scheduleDirect(() -> {
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0);
}
to.cancel();
value[0] = null;
cdl.countDown();
});
cdl.await();
Assert.assertNotNull(value[1]);
}
} finally {
w.dispose();
}
}