下面列出了io.grpc.netty.shaded.io.grpc.netty.NegotiationType#com.google.pubsub.v1.SubscriberGrpc 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
@Test(timeout = 10000)
public void streamingPull_subscriptionDoesNotExist() throws InterruptedException {
SubscriberStub asyncStub = SubscriberGrpc.newStub(grpcServerRule.getChannel());
CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<StreamingPullRequest> requestObserver =
asyncStub.streamingPull(
new StreamObserver<StreamingPullResponse>() {
@Override
public void onNext(StreamingPullResponse streamingPullResponse) {}
@Override
public void onError(Throwable throwable) {
finishLatch.countDown();
}
@Override
public void onCompleted() {}
});
StreamingPullRequest request =
StreamingPullRequest.newBuilder()
.setSubscription("projects/project-1/subscriptions/unknown-subscription")
.build();
requestObserver.onNext(request);
assertThat(finishLatch.await(5, TimeUnit.SECONDS), Matchers.is(true));
}
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel managedChannel = NettyChannelBuilder.forTarget(hostAndPort)
.usePlaintext() // This is 'Ok' because this is ONLY used for testing.
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(managedChannel);
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, managedChannel, stub, pullRequest, retries, timeout);
}
public BlockingGrpcPubSubSubscriber(String projectSubscriptionName,
ManagedChannel channel,
SubscriberGrpc.SubscriberBlockingStub stub,
PullRequest pullRequest,
int retries,
Duration timeout) {
this.projectSubscriptionName = projectSubscriptionName;
this.channel = channel;
this.stub = stub;
this.retries = retries;
this.timeout = timeout;
this.pullRequest = pullRequest;
}
@Before
public void setUp() {
configurationManager = setupConfigurationRepository();
subscriber =
new SubscriberService(
configurationManager, mockSubscriptionManagerFactory, mockStatisticsManager);
grpcServerRule.getServiceRegistry().addService(subscriber);
blockingStub = SubscriberGrpc.newBlockingStub(grpcServerRule.getChannel());
}
@Test(timeout = 10000)
public void streamingPull_cancel() throws ExecutionException, InterruptedException {
SubscriberStub asyncStub = SubscriberGrpc.newStub(grpcServerRule.getChannel());
when(mockSubscriptionManager1.pull(500, true, 10)).thenReturn(Collections.emptyList());
CompletableFuture<StreamingPullResponse> streamingFuture = new CompletableFuture<>();
StreamObserver<StreamingPullRequest> requestObserver =
asyncStub.streamingPull(
new StreamObserver<StreamingPullResponse>() {
@Override
public void onNext(StreamingPullResponse streamingPullResponse) {
streamingFuture.complete(streamingPullResponse);
}
@Override
public void onError(Throwable throwable) {
streamingFuture.completeExceptionally(throwable);
}
@Override
public void onCompleted() {}
});
StreamingPullRequest request =
StreamingPullRequest.newBuilder()
.setSubscription(TestHelpers.PROJECT1_SUBSCRIPTION1)
.build();
requestObserver.onNext(request);
try {
streamingFuture.get(2, TimeUnit.SECONDS); // Wait before canceling
} catch (TimeoutException ignored) {
}
expectedException.expect(ExecutionException.class);
expectedException.expectMessage(Status.CANCELLED.getCode().toString());
requestObserver.onError(Status.CANCELLED.asException());
streamingFuture.get();
}
/** Return a stub for making a subscribe request with a timeout. */
private SubscriberBlockingStub subscriberStub() throws IOException {
if (cachedSubscriberStub == null) {
cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
}
return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
}
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel managedChannel = NettyChannelBuilder.forTarget(hostAndPort)
.usePlaintext() // This is 'Ok' because this is ONLY used for testing.
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(managedChannel);
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, managedChannel, stub, pullRequest, retries, timeout);
}
public BlockingGrpcPubSubSubscriber(String projectSubscriptionName,
ManagedChannel channel,
SubscriberGrpc.SubscriberBlockingStub stub,
PullRequest pullRequest,
int retries,
Duration timeout) {
this.projectSubscriptionName = projectSubscriptionName;
this.channel = channel;
this.stub = stub;
this.retries = retries;
this.timeout = timeout;
this.pullRequest = pullRequest;
}
/** Add status information to healthcheck for each service. */
private void startHealthcheckServices() {
healthStatusManager.setStatus(PublisherGrpc.SERVICE_NAME, SERVING);
healthStatusManager.setStatus(SubscriberGrpc.SERVICE_NAME, SERVING);
healthStatusManager.setStatus(AdminGrpc.SERVICE_NAME, SERVING);
}
@Test(timeout = 10000)
public void streamingPull() throws ExecutionException, InterruptedException {
CountDownLatch completeLatch = new CountDownLatch(1);
SubscriberStub asyncStub = SubscriberGrpc.newStub(grpcServerRule.getChannel());
List<PubsubMessage> messages =
Arrays.asList(
PubsubMessage.newBuilder()
.setMessageId("0-0")
.setData(ByteString.copyFromUtf8("hello"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-1")
.setData(ByteString.copyFromUtf8("world"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-2")
.setData(ByteString.copyFromUtf8("goodbye"))
.build());
when(mockSubscriptionManager1.pull(500, true, 10)).thenReturn(messages);
CompletableFuture<StreamingPullResponse> streamingFuture = new CompletableFuture<>();
StreamObserver<StreamingPullRequest> requestObserver =
asyncStub.streamingPull(
new StreamObserver<StreamingPullResponse>() {
@Override
public void onNext(StreamingPullResponse streamingPullResponse) {
streamingFuture.complete(streamingPullResponse);
}
@Override
public void onError(Throwable throwable) {
streamingFuture.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
completeLatch.countDown();
}
});
StreamingPullRequest request =
StreamingPullRequest.newBuilder()
.setSubscription(TestHelpers.PROJECT1_SUBSCRIPTION1)
.build();
requestObserver.onNext(request);
StreamingPullResponse response = streamingFuture.get();
assertThat(
response.getReceivedMessagesList(),
Matchers.contains(
ReceivedMessage.newBuilder().setAckId("0-0").setMessage(messages.get(0)).build(),
ReceivedMessage.newBuilder().setAckId("0-1").setMessage(messages.get(1)).build(),
ReceivedMessage.newBuilder().setAckId("0-2").setMessage(messages.get(2)).build()));
List<String> ackIds =
response
.getReceivedMessagesList()
.stream()
.map(ReceivedMessage::getAckId)
.collect(Collectors.toList());
request = StreamingPullRequest.newBuilder().addAllAckIds(ackIds).build();
requestObserver.onNext(request);
requestObserver.onCompleted();
completeLatch.await();
verify(mockSubscriptionManager1).acknowledge(ackIds);
}
@Test(timeout = 10000)
public void streamingPull_overrideAckDeadline() throws ExecutionException, InterruptedException {
CountDownLatch completeLatch = new CountDownLatch(1);
SubscriberStub asyncStub = SubscriberGrpc.newStub(grpcServerRule.getChannel());
List<PubsubMessage> messages =
Arrays.asList(
PubsubMessage.newBuilder()
.setMessageId("0-0")
.setData(ByteString.copyFromUtf8("hello"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-1")
.setData(ByteString.copyFromUtf8("world"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-2")
.setData(ByteString.copyFromUtf8("goodbye"))
.build());
when(mockSubscriptionManager1.pull(500, true, 60)).thenReturn(messages);
CompletableFuture<StreamingPullResponse> streamingFuture = new CompletableFuture<>();
StreamObserver<StreamingPullRequest> requestObserver =
asyncStub.streamingPull(
new StreamObserver<StreamingPullResponse>() {
@Override
public void onNext(StreamingPullResponse streamingPullResponse) {
streamingFuture.complete(streamingPullResponse);
}
@Override
public void onError(Throwable throwable) {
streamingFuture.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
completeLatch.countDown();
}
});
StreamingPullRequest request =
StreamingPullRequest.newBuilder()
.setSubscription(TestHelpers.PROJECT1_SUBSCRIPTION1)
.setStreamAckDeadlineSeconds(60)
.build();
requestObserver.onNext(request);
StreamingPullResponse response = streamingFuture.get();
assertThat(
response.getReceivedMessagesList(),
Matchers.contains(
ReceivedMessage.newBuilder().setAckId("0-0").setMessage(messages.get(0)).build(),
ReceivedMessage.newBuilder().setAckId("0-1").setMessage(messages.get(1)).build(),
ReceivedMessage.newBuilder().setAckId("0-2").setMessage(messages.get(2)).build()));
List<String> ackIds =
response
.getReceivedMessagesList()
.stream()
.map(ReceivedMessage::getAckId)
.collect(Collectors.toList());
request = StreamingPullRequest.newBuilder().addAllAckIds(ackIds).build();
requestObserver.onNext(request);
requestObserver.onCompleted();
completeLatch.await();
verify(mockSubscriptionManager1).acknowledge(ackIds);
}
@Test(timeout = 10000)
public void streamingPull_modifyAndAck() throws ExecutionException, InterruptedException {
CountDownLatch completeLatch = new CountDownLatch(1);
SubscriberStub asyncStub = SubscriberGrpc.newStub(grpcServerRule.getChannel());
List<PubsubMessage> messages =
Arrays.asList(
PubsubMessage.newBuilder()
.setMessageId("0-0")
.setData(ByteString.copyFromUtf8("hello"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-1")
.setData(ByteString.copyFromUtf8("world"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-2")
.setData(ByteString.copyFromUtf8("goodbye"))
.build());
when(mockSubscriptionManager1.pull(500, true, 10)).thenReturn(messages);
CompletableFuture<StreamingPullResponse> streamingFuture = new CompletableFuture<>();
StreamObserver<StreamingPullRequest> requestObserver =
asyncStub.streamingPull(
new StreamObserver<StreamingPullResponse>() {
@Override
public void onNext(StreamingPullResponse streamingPullResponse) {
streamingFuture.complete(streamingPullResponse);
}
@Override
public void onError(Throwable throwable) {
streamingFuture.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
completeLatch.countDown();
}
});
StreamingPullRequest request =
StreamingPullRequest.newBuilder()
.setSubscription(TestHelpers.PROJECT1_SUBSCRIPTION1)
.build();
requestObserver.onNext(request);
StreamingPullResponse response = streamingFuture.get();
assertThat(
response.getReceivedMessagesList(),
Matchers.contains(
ReceivedMessage.newBuilder().setAckId("0-0").setMessage(messages.get(0)).build(),
ReceivedMessage.newBuilder().setAckId("0-1").setMessage(messages.get(1)).build(),
ReceivedMessage.newBuilder().setAckId("0-2").setMessage(messages.get(2)).build()));
request =
StreamingPullRequest.newBuilder()
.addAllModifyDeadlineAckIds(Collections.singletonList("0-0"))
.addAllModifyDeadlineSeconds(Collections.singleton(60))
.build();
requestObserver.onNext(request);
requestObserver.onCompleted();
completeLatch.await();
verify(mockSubscriptionManager1).modifyAckDeadline(Collections.singletonList("0-0"), 60);
}
@Test(timeout = 10000)
public void streamingPull_mismatchedModifyAckDeadlines()
throws ExecutionException, InterruptedException {
SubscriberStub asyncStub = SubscriberGrpc.newStub(grpcServerRule.getChannel());
List<PubsubMessage> messages =
Arrays.asList(
PubsubMessage.newBuilder()
.setMessageId("0-0")
.setData(ByteString.copyFromUtf8("hello"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-1")
.setData(ByteString.copyFromUtf8("world"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-2")
.setData(ByteString.copyFromUtf8("goodbye"))
.build());
when(mockSubscriptionManager1.pull(500, true, 10)).thenReturn(messages);
CompletableFuture<StreamingPullResponse> messagesFuture = new CompletableFuture<>();
CompletableFuture<StreamingPullResponse> errorFuture = new CompletableFuture<>();
StreamObserver<StreamingPullRequest> requestObserver =
asyncStub.streamingPull(
new StreamObserver<StreamingPullResponse>() {
@Override
public void onNext(StreamingPullResponse streamingPullResponse) {
messagesFuture.complete(streamingPullResponse);
}
@Override
public void onError(Throwable throwable) {
errorFuture.completeExceptionally(throwable);
}
@Override
public void onCompleted() {}
});
StreamingPullRequest request =
StreamingPullRequest.newBuilder()
.setSubscription(TestHelpers.PROJECT1_SUBSCRIPTION1)
.build();
requestObserver.onNext(request);
StreamingPullResponse response = messagesFuture.get();
assertThat(
response.getReceivedMessagesList(),
Matchers.contains(
ReceivedMessage.newBuilder().setAckId("0-0").setMessage(messages.get(0)).build(),
ReceivedMessage.newBuilder().setAckId("0-1").setMessage(messages.get(1)).build(),
ReceivedMessage.newBuilder().setAckId("0-2").setMessage(messages.get(2)).build()));
request =
StreamingPullRequest.newBuilder()
.addAllModifyDeadlineAckIds(Arrays.asList("0-0", "0-1"))
.addAllModifyDeadlineSeconds(Collections.singleton(60))
.build();
expectedException.expect(ExecutionException.class);
expectedException.expectMessage(
"Request contained 2 modifyAckDeadlineIds but 1 modifyDeadlineSeconds");
requestObserver.onNext(request);
errorFuture.get();
}