下面列出了io.grpc.netty.shaded.io.grpc.netty.NegotiationType#com.google.cloud.pubsub.v1.stub.SubscriberStubSettings 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
private void makeSubscriber() {
try {
log.info("Creating subscriber.");
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20MB
.build())
.setCredentialsProvider(gcpCredentialsProvider)
.build();
subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
// We change the subscriber every 25 - 35 minutes in order to avoid GOAWAY errors.
nextSubscriberResetTime =
System.currentTimeMillis() + rand.nextInt(10 * 60 * 1000) + 25 * 60 * 1000;
} catch (IOException e) {
throw new RuntimeException("Could not create subscriber stub; no subscribing can occur.", e);
}
}
/**
* Check whether the user provided Cloud Pub/Sub subscription name specified by {@link
* #CPS_SUBSCRIPTION_CONFIG} exists or not.
*/
@VisibleForTesting
public void verifySubscription(String cpsProject, String cpsSubscription, CredentialsProvider credentialsProvider) {
try {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20MB
.build())
.setCredentialsProvider(credentialsProvider)
.build();
GrpcSubscriberStub stub = GrpcSubscriberStub.create(subscriberStubSettings);
GetSubscriptionRequest request =
GetSubscriptionRequest.newBuilder()
.setSubscription(
String.format(
ConnectorUtils.CPS_SUBSCRIPTION_FORMAT, cpsProject, cpsSubscription))
.build();
stub.getSubscriptionCallable().call(request);
} catch (Exception e) {
throw new ConnectException(
"Error verifying the subscription " + cpsSubscription + " for project " + cpsProject, e);
}
}
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(maxNumberOfMessages)
.setReturnImmediately(false)
.setSubscription(subscriptionName)
.build();
List<ReceivedMessage> receivedMessages = subscriber.pullCallable().call(pullRequest).getReceivedMessagesList();
acknowledgeIds(subscriber, subscriptionName, receivedMessages);
return receivedMessages;
}
}
public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
// String projectId = "my-project-id";
// String subscriptionId = "my-subscription-id";
// int numOfMessages = 10; // max number of messages to be pulled
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(maxNumberOfMessages)
.setReturnImmediately(false) // return immediately if messages are not available
.setSubscription(subscriptionName)
.build();
// use pullCallable().futureCall to asynchronously perform this operation
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
// handle received message
// ...
ackIds.add(message.getAckId());
}
// acknowledge received messages
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// use acknowledgeCallable().futureCall to asynchronously perform this operation
subscriber.acknowledgeCallable().call(acknowledgeRequest);
return pullResponse.getReceivedMessagesList();
}
}
@Override
public SubscriberStub createSubscriberStub() {
SubscriberStubSettings.Builder subscriberStubSettings = SubscriberStubSettings.newBuilder();
if (this.credentialsProvider != null) {
subscriberStubSettings.setCredentialsProvider(this.credentialsProvider);
}
if (this.pullEndpoint != null) {
subscriberStubSettings.setEndpoint(this.pullEndpoint);
}
if (this.executorProvider != null) {
subscriberStubSettings.setExecutorProvider(this.executorProvider);
}
if (this.headerProvider != null) {
subscriberStubSettings.setHeaderProvider(this.headerProvider);
}
if (this.channelProvider != null) {
subscriberStubSettings.setTransportChannelProvider(this.channelProvider);
}
if (this.apiClock != null) {
subscriberStubSettings.setClock(this.apiClock);
}
if (this.subscriberStubRetrySettings != null) {
subscriberStubSettings.pullSettings().setRetrySettings(
this.subscriberStubRetrySettings);
}
try {
return GrpcSubscriberStub.create(subscriberStubSettings.build());
}
catch (IOException ex) {
throw new RuntimeException("Error creating the SubscriberStub", ex);
}
}
private SubscriberStub getSubscriber(ProcessContext context) throws IOException {
final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.build();
return GrpcSubscriberStub.create(subscriberStubSettings);
}
private String getLastProcessedTimestamp() {
String timestamp = "";
try {
final SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20MB
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
final String subscriptionName = ProjectSubscriptionName.format(PROJECT_ID, tableName);
final PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(1)
.setReturnImmediately(true)
.setSubscription(subscriptionName)
.build();
final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
final DatumReader<GenericRecord> datumReader =
new GenericDatumReader<GenericRecord>(avroSchema);
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
final JsonDecoder decoder =
DecoderFactory.get()
.jsonDecoder(avroSchema, message.getMessage().getData().newInput());
final GenericRecord record = datumReader.read(null, decoder);
timestamp = record.get("Timestamp").toString();
log.debug("---------------- Got Timestamp: " + timestamp);
}
}
} catch (IOException e) {
log.error("Could not get last processed timestamp from pub / sub", e);
// If we cannot find a previously processed timestamp, we will default
// to the one present in the config file.
return startingTimestamp;
}
return timestamp;
}