下面列出了io.reactivex.Single#flatMap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void storeInDatabase(JsonObject operation) {
// 1. need to retrieve a connection
// 2. execute the insertion statement
// 3. close the connection
// Step 1 get the connection
Single<SQLConnection> connectionRetrieved = jdbc.rxGetConnection();
// Step 2, when the connection is retrieved (this may have failed), do the insertion (upon success)
Single<UpdateResult> update = connectionRetrieved
.flatMap(connection ->
connection.rxUpdateWithParams(INSERT_STATEMENT, new JsonArray().add(operation.encode()))
// Step 3, when the insertion is done, close the connection.
.doAfterTerminate(connection::close));
update.subscribe(result -> {
// Ok
}, err -> {
System.err.println("Failed to insert operation in database: " + err);
});
}
private void storeInDatabase(JsonObject operation) {
// 1. need to retrieve a connection
// 2. execute the insertion statement
// 3. close the connection
// Step 1 get the connection
Single<SQLConnection> connectionRetrieved = jdbc.rxGetConnection();
// Step 2, when the connection is retrieved (this may have failed), do the insertion (upon success)
Single<UpdateResult> update = connectionRetrieved
.flatMap(connection ->
connection.rxUpdateWithParams(INSERT_STATEMENT, new JsonArray().add(operation.encode()))
// Step 3, when the insertion is done, close the connection.
.doAfterTerminate(connection::close));
update.subscribe(result -> {
// Ok
}, err -> {
System.err.println("Failed to insert operation in database: " + err);
});
}
@Override
public SingleSource<T> apply(@NonNull Single<B> upstream) {
Single<Buffer> unwrapped = upstream.map(unwrap::apply);
Single<T> unmarshalled = unwrapped.flatMap(buffer -> {
try {
T obj;
if (mapper != null) {
JsonParser parser = mapper.getFactory().createParser(buffer.getBytes());
obj = nonNull(mappedType) ? mapper.readValue(parser, mappedType) :
mapper.readValue(parser, mappedTypeRef);
} else {
obj = getT(buffer, mappedType, mappedTypeRef);
}
return Single.just(obj);
} catch (Exception e) {
return Single.error(e);
}
});
return unmarshalled;
}
private Single<JsonObject> sendToSnapshot(Single<JsonObject> data) {
return data.flatMap(json -> webClient
.post(4000, "localhost", "")
.expect(ResponsePredicate.SC_SUCCESS)
.rxSendJsonObject(json)
.flatMap(resp -> Single.just(json)));
}
@Override
public Single<TransactionWasProcessed> sendTransaction(Single<Transaction<? extends TransactionAsset>> transactionSource) {
return transactionSource
.flatMap(transaction -> Single.fromPublisher(
api.processTransaction(new ProcessTransaction(transaction))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
));
}
@Override
@NonNull
@CheckReturnValue
public SingleSource<T> apply(@NonNull Single<DataSnapshot> upstream) {
return upstream.flatMap(new Function<DataSnapshot, SingleSource<? extends T>>() {
@Override
public SingleSource<? extends T> apply(@NonNull DataSnapshot dataSnapshot) {
if (dataSnapshot.exists()) {
return Single.just(dataSnapshot.getValue(clazz));
} else {
return Single.error(new NoSuchElementException());
}
}
});
}
@Override
@NonNull
@CheckReturnValue
public SingleSource<T> apply(@NonNull Single<DataSnapshot> upstream) {
return upstream.flatMap(new Function<DataSnapshot, SingleSource<? extends T>>() {
@Override
public SingleSource<? extends T> apply(@NonNull DataSnapshot dataSnapshot) {
if (dataSnapshot.exists()) {
return Single.just(dataSnapshot.getValue(typeIndicator));
} else {
return Single.error(new NoSuchElementException());
}
}
});
}
@Override
public void actionPerformed(ActionEvent e) {
if(e.getSource() == cancelButton) {
setVisible(false);
return;
}
if(e.getSource() == acceptOtherTermsBox) {
if(acceptOtherTermsBox.isSelected()) {
yourAmountOtherSlider.setValue(otherAmountOtherSlider.getValue());;
yourAmountYouSlider.setValue(otherAmountYouSlider.getValue());
}
yourAmountOtherSlider.setEnabled(!acceptOtherTermsBox.isSelected());
yourAmountYouSlider.setEnabled(!acceptOtherTermsBox.isSelected());
}
if(e.getSource() == okButton || e.getSource() == pinField) {
String error = null;
Component errorComp = null;
Globals g = Globals.getInstance();
if(error == null) {
// check if something changed
// if(priceValue.longValue() == contract.getRate() &&
// (accountDetails.getText().length()==0 ||
// accountDetails.getText().equals(contract.getMarketAccount()))
// )
// error = tr("offer_no_changes");
}
if(error == null && !acceptBox.isSelected()) {
error = tr("dlg_accept_first");
errorComp = acceptBox;
acceptBox.requestFocus();
}
if(error == null && !g.checkPIN(pinField.getPassword())) {
error = tr("dlg_invalid_pin");
pinField.requestFocus();
}
if(error!=null) {
Toast.makeText((JFrame) this.getOwner(), error, Toast.Style.ERROR).display(errorComp != null ? errorComp : okButton);
return;
}
// all set, lets place the dispute update
try {
setCursor(Cursor.getPredefinedCursor(Cursor.WAIT_CURSOR));
long amountToCreator = amount * (isCreator ? yourAmountYouSlider.getValue() : yourAmountOtherSlider.getValue()) / 100;
long amountToTaker = amount - amountToCreator;
// we are sending the dispute message with our amounts
byte[] message = BT.callMethodMessage(contract.getMethod("dispute"), amountToCreator, amountToTaker);
BurstValue amountToSend = BurstValue.fromPlanck(contract.getActivationFee());
Single<byte[]> utx = g.getNS().generateTransactionWithMessage(contract.getAddress(), g.getPubKey(),
amountToSend, suggestedFee,
Constants.BURST_DEADLINE, message);
Single<TransactionBroadcast> tx = utx.flatMap(unsignedTransactionBytes -> {
byte[] signedTransactionBytes = g.signTransaction(pinField.getPassword(), unsignedTransactionBytes);
return g.getNS().broadcastTransaction(signedTransactionBytes);
});
TransactionBroadcast tb = tx.blockingGet();
tb.getTransactionId();
setVisible(false);
Toast.makeText((JFrame) this.getOwner(),
tr("send_tx_broadcast", tb.getTransactionId().toString()), Toast.Style.SUCCESS).display();
}
catch (Exception ex) {
ex.printStackTrace();
Toast.makeText((JFrame) this.getOwner(), ex.getCause().getMessage(), Toast.Style.ERROR).display(okButton);
}
setCursor(Cursor.getDefaultCursor());
}
}
public static <T> Single<Flowable<T>> sequence(final Publisher<? extends Single<T>> fts) {
Single<Single<Flowable<T>>> res = Flowable.fromPublisher(fts).<Single<Flowable<T>>>reduce(Single.just(Flowable.empty()), (acc, next) -> acc.zipWith(next, (a, b) -> Flowable.concat(a, Flowable.just(b))));
return res.flatMap(i->i);
}
/**
* Perform a For Comprehension over a Single, accepting 3 generating functions.
* This results in a four level nested internal iteration over the provided Singles.
*
* <pre>
* {@code
*
* import static cyclops.companion.reactor.Singles.forEach4;
*
forEach4(Single.just(1),
a-> Single.just(a+1),
(a,b) -> Single.<Integer>just(a+b),
(a,b,c) -> Single.<Integer>just(a+b+c),
Tuple::tuple)
*
* }
* </pre>
*
* @param value1 top level Single
* @param value2 Nested Single
* @param value3 Nested Single
* @param value4 Nested Single
* @param yieldingFunction Generates a result per combination
* @return Single with a combined value generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> Single<R> forEach4(Single<? extends T1> value1,
Function<? super T1, ? extends Single<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Single<R2>> value3,
Function3<? super T1, ? super R1, ? super R2, ? extends Single<R3>> value4,
Function4<? super T1, ? super R1, ? super R2, ? super R3, ? extends R> yieldingFunction) {
Single<? extends R> res = value1.flatMap(in -> {
Single<R1> a = value2.apply(in);
return a.flatMap(ina -> {
Single<R2> b = value3.apply(in, ina);
return b.flatMap(inb -> {
Single<R3> c = value4.apply(in, ina, inb);
return c.map(in2 -> yieldingFunction.apply(in, ina, inb, in2));
});
});
});
return narrow(res);
}
/**
* Perform a For Comprehension over a Single, accepting 2 generating functions.
* This results in a three level nested internal iteration over the provided Singles.
*
* <pre>
* {@code
*
* import static cyclops.companion.reactor.Singles.forEach3;
*
forEach3(Single.just(1),
a-> Single.just(a+1),
(a,b) -> Single.<Integer>just(a+b),
Tuple::tuple)
*
* }
* </pre>
*
* @param value1 top level Single
* @param value2 Nested Single
* @param value3 Nested Single
* @param yieldingFunction Generates a result per combination
* @return Single with a combined value generated by the yielding function
*/
public static <T1, T2, R1, R2, R> Single<R> forEach3(Single<? extends T1> value1,
Function<? super T1, ? extends Single<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Single<R2>> value3,
Function3<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {
Single<? extends R> res = value1.flatMap(in -> {
Single<R1> a = value2.apply(in);
return a.flatMap(ina -> {
Single<R2> b = value3.apply(in, ina);
return b.map(in2 -> yieldingFunction.apply(in, ina, in2));
});
});
return narrow(res);
}
/**
* Perform a For Comprehension over a Single, accepting a generating function.
* This results in a two level nested internal iteration over the provided Singles.
*
* <pre>
* {@code
*
* import static cyclops.companion.reactor.Singles.forEach;
*
forEach(Single.just(1),
a-> Single.just(a+1),
Tuple::tuple)
*
* }
* </pre>
*
* @param value1 top level Single
* @param value2 Nested Single
* @param yieldingFunction Generates a result per combination
* @return Single with a combined value generated by the yielding function
*/
public static <T, R1, R> Single<R> forEach(Single<? extends T> value1,
Function<? super T, Single<R1>> value2,
BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {
Single<R> res = value1.flatMap(in -> {
Single<R1> a = value2.apply(in);
return a.map(ina -> yieldingFunction.apply(in, ina));
});
return narrow(res);
}