下面列出了io.grpc.stub.ClientCallStreamObserver#setOnReadyHandler ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void beforeStart(final ClientCallStreamObserver<PPing> requestStream) {
requestStream.setOnReadyHandler(new Runnable() {
@Override
public void run() {
logger.info("{} onReady", streamId);
reconnector.reset();
final Runnable pingRunnable = new Runnable() {
@Override
public void run() {
PPing pPing = newPing();
requestStream.onNext(pPing);
}
};
PingClientResponseObserver.this.pingScheduler = schedule(pingRunnable);
}
});
}
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> rs) {
rs.setOnReadyHandler(() -> {
// called from grpc response thread
if (rs.isReady()) {
errCounter = 0;
boolean notify = userReqStream.established(rs);
if (notify) {
respStream.onEstablished();
}
}
});
}
@Benchmark
public Object gRpcBothWaysStreamingCall(Blackhole blackhole) throws InterruptedException {
PerfObserver observer = new PerfObserver(blackhole) {
private boolean done;
@Override
public void beforeStart(ClientCallStreamObserver<Messages.SimpleRequest> sender) {
sender.setOnReadyHandler(() -> {
if (done) {
return;
}
for (Messages.SimpleRequest request : ARRAY_REQUEST) {
sender.onNext(request);
}
sender.onCompleted();
done = true;
});
super.beforeStart(observer);
}
};
gRpcClient.streamingFromClient(observer);
observer.latch.await();
return observer;
}
@Override
public void beforeStart(final ClientCallStreamObserver<PCmdMessage> requestStream) {
this.requestStream = requestStream;
requestStream.setOnReadyHandler(new Runnable() {
@Override
public void run() {
logger.info("Connect to CommandServiceStream completed.");
reconnector.reset();
}
});
}
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
logger.info("beforeStart:{}", name);
requestStream.setOnReadyHandler(new Runnable() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public void run() {
logger.info("onReadyHandler:{} eventNumber:{}", name, counter.getAndIncrement());
reconnector.reset();
}
});
}
@Override
public void beforeStart(ClientCallStreamObserver<InsertObjectRequest> requestObserver) {
requestObserver.setOnReadyHandler(
new Runnable() {
@Override
public void run() {
if (objectFinalized) {
// onReadyHandler may be called after we've closed the request half of the stream.
return;
}
try {
chunkData = readRequestData();
} catch (IOException e) {
error =
new IOException(
String.format("Failed to read chunk for '%s'", resourceId), e);
return;
}
InsertObjectRequest.Builder requestBuilder =
InsertObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(writeOffset);
if (chunkData.size() > 0) {
ChecksummedData.Builder requestDataBuilder =
ChecksummedData.newBuilder().setContent(chunkData);
if (checksumsEnabled) {
Hasher chunkHasher = Hashing.crc32c().newHasher();
for (ByteBuffer buffer : chunkData.asReadOnlyByteBufferList()) {
chunkHasher.putBytes(buffer.duplicate());
// TODO(b/7502351): Switch to "concatenating" the chunk-level crc32c values
// if/when the hashing library supports that, to avoid re-scanning all data
// bytes when computing the object-level crc32c.
objectHasher.putBytes(buffer.duplicate());
}
requestDataBuilder.setCrc32C(
UInt32Value.newBuilder().setValue(chunkHasher.hash().asInt()));
}
requestBuilder.setChecksummedData(requestDataBuilder);
}
if (objectFinalized) {
requestBuilder.setFinishWrite(true);
if (checksumsEnabled) {
requestBuilder.setObjectChecksums(
ObjectChecksums.newBuilder()
.setCrc32C(
UInt32Value.newBuilder().setValue(objectHasher.hash().asInt())));
}
}
requestObserver.onNext(requestBuilder.build());
if (objectFinalized) {
// Close the request half of the streaming RPC.
requestObserver.onCompleted();
}
}
private ByteString readRequestData() throws IOException {
// Mark the input stream in case this request fails so that read can be recovered
// from where it's marked.
pipeSource.mark(MAX_BYTES_PER_MESSAGE);
ByteString data =
ByteString.readFrom(ByteStreams.limit(pipeSource, MAX_BYTES_PER_MESSAGE));
objectFinalized =
data.size() < MAX_BYTES_PER_MESSAGE || pipeSource.available() <= 0;
return data;
}
});
}