io.grpc.netty.shaded.io.grpc.netty.NegotiationType#com.google.pubsub.v1.PullRequest源码实例Demo

下面列出了io.grpc.netty.shaded.io.grpc.netty.NegotiationType#com.google.pubsub.v1.PullRequest 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: flink   文件: DefaultPubSubSubscriberFactory.java
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
	ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
												.negotiationType(NegotiationType.TLS)
												.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
												.build();

	PullRequest pullRequest = PullRequest.newBuilder()
							.setMaxMessages(maxMessagesPerPull)
							.setReturnImmediately(false)
							.setSubscription(projectSubscriptionName)
							.build();
	SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
						.withCallCredentials(MoreCallCredentials.from(credentials));
	return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
 
源代码2 项目: kafka-pubsub-emulator   文件: SubscriberService.java
@Override
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
  logger.atFine().log("Pulling messages %s", request);
  SubscriptionManager subscriptionManager = subscriptions.get(request.getSubscription());
  if (subscriptionManager == null) {
    String message = request.getSubscription() + " is not a valid Subscription";
    logger.atWarning().log(message);
    responseObserver.onError(Status.NOT_FOUND.withDescription(message).asException());
  } else {
    PullResponse response =
        PullResponse.newBuilder()
            .addAllReceivedMessages(
                buildReceivedMessageList(
                    request.getSubscription(),
                    subscriptionManager.pull(
                        request.getMaxMessages(), request.getReturnImmediately())))
            .build();
    logger.atFine().log("Returning %d messages", response.getReceivedMessagesCount());
    responseObserver.onNext(response);
    responseObserver.onCompleted();
  }
}
 
@Test
public void pull() {
  List<PubsubMessage> messages =
      Arrays.asList(
          PubsubMessage.newBuilder()
              .setMessageId("0-0")
              .setData(ByteString.copyFromUtf8("hello"))
              .build(),
          PubsubMessage.newBuilder()
              .setMessageId("0-1")
              .setData(ByteString.copyFromUtf8("world"))
              .build());
  when(mockSubscriptionManager3.pull(100, false)).thenReturn(messages);

  PullRequest request =
      PullRequest.newBuilder()
          .setSubscription(TestHelpers.PROJECT2_SUBSCRIPTION3)
          .setMaxMessages(100)
          .build();
  PullResponse response = blockingStub.pull(request);
  assertThat(
      response.getReceivedMessagesList(),
      Matchers.contains(
          ReceivedMessage.newBuilder().setAckId("0-0").setMessage(messages.get(0)).build(),
          ReceivedMessage.newBuilder().setAckId("0-1").setMessage(messages.get(1)).build()));
}
 
源代码4 项目: gcp-ingestion   文件: PubsubTopics.java
/**
 * Pull and ack {@link PubsubMessage}s from the subscription indicated by {@code index}.
 */
public List<PubsubMessage> pull(int index, int maxMessages, boolean returnImmediately) {
  List<ReceivedMessage> response = subscriber.pullCallable()
      .call(PullRequest.newBuilder().setMaxMessages(maxMessages)
          .setReturnImmediately(returnImmediately).setSubscription(getSubscription(index))
          .build())
      .getReceivedMessagesList();

  if (response.size() > 0) {
    subscriber.acknowledgeCallable()
        .call(AcknowledgeRequest.newBuilder().setSubscription(getSubscription(index))
            .addAllAckIds(
                response.stream().map(ReceivedMessage::getAckId).collect(Collectors.toList()))
            .build());
  }

  return response.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList());
}
 
@Override
public PullRequest createPullRequest(String subscriptionName, Integer maxMessages,
		Boolean returnImmediately) {
	Assert.hasLength(subscriptionName, "The subscription name must be provided.");

	if (maxMessages == null) {
		maxMessages = Integer.MAX_VALUE;
	}
	Assert.isTrue(maxMessages > 0, "The maxMessages must be greater than 0.");

	PullRequest.Builder pullRequestBuilder =
			PullRequest.newBuilder()
					.setSubscription(
							PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId).toString())
					.setMaxMessages(maxMessages);

	if (returnImmediately != null) {
		pullRequestBuilder.setReturnImmediately(returnImmediately);
	}

	return pullRequestBuilder.build();
}
 
/**
 * Pulls messages asynchronously, on demand, using the pull request in argument.
 *
 * @param pullRequest pull request containing the subscription name
 * @return the ListenableFuture for the asynchronous execution, returning
 * the list of {@link AcknowledgeablePubsubMessage} containing the ack ID, subscription
 * and acknowledger
 */
private ListenableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(PullRequest pullRequest) {
	Assert.notNull(pullRequest, "The pull request can't be null.");

	ApiFuture<PullResponse> pullFuture = this.subscriberStub.pullCallable().futureCall(pullRequest);

	final SettableListenableFuture<List<AcknowledgeablePubsubMessage>> settableFuture = new SettableListenableFuture<>();
	ApiFutures.addCallback(pullFuture, new ApiFutureCallback<PullResponse>() {

		@Override
		public void onFailure(Throwable throwable) {
			settableFuture.setException(throwable);
		}

		@Override
		public void onSuccess(PullResponse pullResponse) {
			List<AcknowledgeablePubsubMessage> result = toAcknowledgeablePubsubMessageList(
					pullResponse.getReceivedMessagesList(), pullRequest.getSubscription());

			settableFuture.set(result);
		}

	}, asyncPullExecutor);

	return settableFuture;
}
 
@Override
public ListenableFuture<List<PubsubMessage>> pullAndAckAsync(String subscription, Integer maxMessages,
		Boolean returnImmediately) {
	PullRequest pullRequest = this.subscriberFactory.createPullRequest(
			subscription, maxMessages, returnImmediately);

	final SettableListenableFuture<List<PubsubMessage>> settableFuture = new SettableListenableFuture<>();

	this.pullAsync(pullRequest).addCallback(
			ackableMessages -> {
				if (!ackableMessages.isEmpty()) {
					ack(ackableMessages);
				}
				List<PubsubMessage> messages = ackableMessages.stream()
						.map(AcknowledgeablePubsubMessage::getPubsubMessage)
						.collect(Collectors.toList());

				settableFuture.set(messages);
			},
			settableFuture::setException);

	return settableFuture;
}
 
源代码8 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/**
 * Tests that when ackMessages() succeeds and the subsequent call to poll() has no messages, that
 * the subscriber does not invoke ackMessages because there should be no acks.
 */
@Test
public void testPollInRegularCase() throws Exception {
  task.start(props);
  ReceivedMessage rm1 = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm1).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  assertEquals(1, result.size());
  task.commitRecord(result.get(0));
  stubbedPullResponse = PullResponse.newBuilder().build();
  SettableApiFuture<Empty> goodFuture = SettableApiFuture.create();
  goodFuture.set(Empty.getDefaultInstance());
  when(subscriber.ackMessages(any(AcknowledgeRequest.class))).thenReturn(goodFuture);
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  result = task.poll();
  assertEquals(0, result.size());
  result = task.poll();
  assertEquals(0, result.size());
  verify(subscriber, times(1)).ackMessages(any(AcknowledgeRequest.class));
}
 
源代码9 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/**
 * Tests that when a call to ackMessages() fails, that the message is not redelivered to Kafka if
 * the message is received again by Cloud Pub/Sub. Also tests that ack ids are added properly if
 * the ack id has not been seen before.
 */
@Test
public void testPollWithDuplicateReceivedMessages() throws Exception {
  task.start(props);
  ReceivedMessage rm1 = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm1).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  assertEquals(1, result.size());
  ReceivedMessage rm2 = createReceivedMessage(ACK_ID2, CPS_MESSAGE, new HashMap<String, String>());
  stubbedPullResponse =
      PullResponse.newBuilder().addReceivedMessages(0, rm1).addReceivedMessages(1, rm2).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  result = task.poll();
  assertEquals(1, result.size());
}
 
源代码10 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/**
 * Tests when the message(s) retrieved from Cloud Pub/Sub do not have an attribute that matches
 * {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
 */
@Test
public void testPollWithNoMessageKeyAttribute() throws Exception {
  task.start(props);
  ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
  assertEquals(1, result.size());
  SourceRecord expected =
      new SourceRecord(
          null,
          null,
          KAFKA_TOPIC,
          0,
          Schema.OPTIONAL_STRING_SCHEMA,
          null,
          Schema.BYTES_SCHEMA,
          KAFKA_VALUE);
  assertRecordsEqual(expected, result.get(0));
}
 
源代码11 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/**
 * Tests when the message(s) retrieved from Cloud Pub/Sub do have an attribute that matches {@link
 * #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
 */
@Test
public void testPollWithMessageKeyAttribute() throws Exception {
  task.start(props);
  Map<String, String> attributes = new HashMap<>();
  attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
  ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
  assertEquals(1, result.size());
  SourceRecord expected =
      new SourceRecord(
          null,
          null,
          KAFKA_TOPIC,
          0,
          Schema.OPTIONAL_STRING_SCHEMA,
          KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
          Schema.BYTES_SCHEMA,
          KAFKA_VALUE);
  assertRecordsEqual(expected, result.get(0));
}
 
源代码12 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/**
 * Tests when the message(s) retrieved from Cloud Pub/Sub do have an attribute that matches {@link
 * #KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE} and {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
 */
@Test
public void testPollWithMessageTimestampAttribute() throws Exception{
  task.start(props);
  Map<String, String> attributes = new HashMap<>();
  attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
  attributes.put(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE, KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE);
  ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
  assertEquals(1, result.size());
  SourceRecord expected =
          new SourceRecord(
                  null,
                  null,
                  KAFKA_TOPIC,
                  0,
                  Schema.OPTIONAL_STRING_SCHEMA,
                  KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
                  Schema.BYTES_SCHEMA,
                  KAFKA_VALUE, Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE));
  assertRecordsEqual(expected, result.get(0));
}
 
源代码13 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/** Tests that the correct partition is assigned when the partition scheme is "hash_value". */
@Test
public void testPollWithPartitionSchemeHashValue() throws Exception {
  props.put(
      CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG,
      CloudPubSubSourceConnector.PartitionScheme.HASH_VALUE.toString());
  task.start(props);
  ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
  assertEquals(1, result.size());
  SourceRecord expected =
      new SourceRecord(
          null,
          null,
          KAFKA_TOPIC,
          KAFKA_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS),
          Schema.OPTIONAL_STRING_SCHEMA,
          null,
          Schema.BYTES_SCHEMA,
          KAFKA_VALUE);
  assertRecordsEqual(expected, result.get(0));
}
 
源代码14 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/** Tests that the no partition is assigned when the partition scheme is "kafka_partitioner". */
@Test
public void testPollWithPartitionSchemeKafkaPartitioner() throws Exception {
  props.put(
          CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG,
          CloudPubSubSourceConnector.PartitionScheme.KAFKA_PARTITIONER.toString());
  task.start(props);
  ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
  assertEquals(1, result.size());
  SourceRecord expected =
          new SourceRecord(
                  null,
                  null,
                  KAFKA_TOPIC,
                  null,
                  Schema.OPTIONAL_STRING_SCHEMA,
                  null,
                  Schema.BYTES_SCHEMA,
                  KAFKA_VALUE);
  assertRecordsEqual(expected, result.get(0));
  assertNull(result.get(0).kafkaPartition());
}
 
源代码15 项目: flink   文件: DefaultPubSubSubscriberFactory.java
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
	ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
												.negotiationType(NegotiationType.TLS)
												.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
												.build();

	PullRequest pullRequest = PullRequest.newBuilder()
							.setMaxMessages(maxMessagesPerPull)
							.setReturnImmediately(false)
							.setSubscription(projectSubscriptionName)
							.build();
	SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
						.withCallCredentials(MoreCallCredentials.from(credentials));
	return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
 
源代码16 项目: flink   文件: PubsubHelper.java
public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
	SubscriberStubSettings subscriberStubSettings =
		SubscriberStubSettings.newBuilder()
			.setTransportChannelProvider(channelProvider)
			.setCredentialsProvider(NoCredentialsProvider.create())
			.build();
	try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
		String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
		PullRequest pullRequest =
			PullRequest.newBuilder()
				.setMaxMessages(maxNumberOfMessages)
				.setReturnImmediately(false)
				.setSubscription(subscriptionName)
				.build();

		List<ReceivedMessage> receivedMessages = subscriber.pullCallable().call(pullRequest).getReceivedMessagesList();
		acknowledgeIds(subscriber, subscriptionName, receivedMessages);
		return receivedMessages;
	}
}
 
源代码17 项目: nifi   文件: ConsumeGCPubSub.java
@OnScheduled
public void onScheduled(ProcessContext context) {
    final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();

    pullRequest = PullRequest.newBuilder()
            .setMaxMessages(batchSize)
            .setSubscription(getSubscriptionName(context))
            .build();

    try {
        subscriber = getSubscriber(context);
    } catch (IOException e) {
        storedException.set(e);
        getLogger().error("Failed to create Google Cloud Subscriber due to {}", new Object[]{e});
    }
}
 
源代码18 项目: flink   文件: PubSubSubscriberFactoryForEmulator.java
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
	ManagedChannel managedChannel = NettyChannelBuilder.forTarget(hostAndPort)
													.usePlaintext() // This is 'Ok' because this is ONLY used for testing.
													.build();

	PullRequest pullRequest = PullRequest.newBuilder()
										.setMaxMessages(maxMessagesPerPull)
										.setReturnImmediately(false)
										.setSubscription(projectSubscriptionName)
										.build();
	SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(managedChannel);
	return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, managedChannel, stub, pullRequest, retries, timeout);
}
 
源代码19 项目: flink   文件: BlockingGrpcPubSubSubscriber.java
public BlockingGrpcPubSubSubscriber(String projectSubscriptionName,
									ManagedChannel channel,
									SubscriberGrpc.SubscriberBlockingStub stub,
									PullRequest pullRequest,
									int retries,
									Duration timeout) {
	this.projectSubscriptionName = projectSubscriptionName;
	this.channel = channel;
	this.stub = stub;
	this.retries = retries;
	this.timeout = timeout;
	this.pullRequest = pullRequest;
}
 
源代码20 项目: flink   文件: PubsubHelper.java
public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
	SubscriberStubSettings subscriberStubSettings =
		SubscriberStubSettings.newBuilder()
			.setTransportChannelProvider(channelProvider)
			.setCredentialsProvider(NoCredentialsProvider.create())
			.build();
	try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
		// String projectId = "my-project-id";
		// String subscriptionId = "my-subscription-id";
		// int numOfMessages = 10;   // max number of messages to be pulled
		String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
		PullRequest pullRequest =
			PullRequest.newBuilder()
				.setMaxMessages(maxNumberOfMessages)
				.setReturnImmediately(false) // return immediately if messages are not available
				.setSubscription(subscriptionName)
				.build();

		// use pullCallable().futureCall to asynchronously perform this operation
		PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
		List<String> ackIds = new ArrayList<>();
		for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
			// handle received message
			// ...
			ackIds.add(message.getAckId());
		}
		// acknowledge received messages
		AcknowledgeRequest acknowledgeRequest =
			AcknowledgeRequest.newBuilder()
				.setSubscription(subscriptionName)
				.addAllAckIds(ackIds)
				.build();
		// use acknowledgeCallable().futureCall to asynchronously perform this operation
		subscriber.acknowledgeCallable().call(acknowledgeRequest);
		return pullResponse.getReceivedMessagesList();
	}
}
 
@Test
public void pull_subscriptionDoesNotExist() {
  expectedException.expect(StatusRuntimeException.class);
  expectedException.expectMessage(Status.NOT_FOUND.getCode().toString());

  PullRequest request =
      PullRequest.newBuilder()
          .setSubscription("projects/project-1/subscriptions/unknown-subscription")
          .setMaxMessages(100)
          .build();
  blockingStub.pull(request);
}
 
@Test
public void pull_emptyList() {
  when(mockSubscriptionManager3.pull(100, false)).thenReturn(Collections.emptyList());

  PullRequest request =
      PullRequest.newBuilder()
          .setSubscription(TestHelpers.PROJECT2_SUBSCRIPTION3)
          .setMaxMessages(100)
          .build();
  PullResponse response = blockingStub.pull(request);

  assertThat(response.getReceivedMessagesList(), Matchers.empty());
}
 
private List<String> getMessagesFromSubscription(String subscriptionName) {
	String projectSubscriptionName = ProjectSubscriptionName.format(
			projectName, subscriptionName);

	PullRequest pullRequest = PullRequest.newBuilder()
			.setReturnImmediately(true)
			.setMaxMessages(10)
			.setSubscription(projectSubscriptionName)
			.build();

	PullResponse pullResponse = subscriptionAdminClient.getStub().pullCallable().call(pullRequest);
	return pullResponse.getReceivedMessagesList().stream()
			.map((message) -> message.getMessage().getData().toStringUtf8())
			.collect(Collectors.toList());
}
 
/**
 * Pulls messages synchronously, on demand, using the pull request in argument.
 *
 * @param pullRequest pull request containing the subscription name
 * @return the list of {@link AcknowledgeablePubsubMessage} containing the ack ID, subscription
 * and acknowledger
 */
private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
	Assert.notNull(pullRequest, "The pull request can't be null.");

	PullResponse pullResponse = this.subscriberStub.pullCallable().call(pullRequest);
	return toAcknowledgeablePubsubMessageList(
			pullResponse.getReceivedMessagesList(),
			pullRequest.getSubscription());
}
 
@Override
public List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages,
		Boolean returnImmediately) {
	PullRequest pullRequest = this.subscriberFactory.createPullRequest(
			subscription, maxMessages, returnImmediately);

	List<AcknowledgeablePubsubMessage> ackableMessages = pull(pullRequest);

	if (!ackableMessages.isEmpty()) {
		ack(ackableMessages);
	}

	return ackableMessages.stream().map(AcknowledgeablePubsubMessage::getPubsubMessage)
			.collect(Collectors.toList());
}
 
@Test
public void testCreatePullRequest_nonNullMaxMessages() {
	DefaultSubscriberFactory factory = new DefaultSubscriberFactory(() -> "project");
	factory.setCredentialsProvider(this.credentialsProvider);

	PullRequest request = factory.createPullRequest("test", null, true);
	assertThat(request.getMaxMessages()).isEqualTo(Integer.MAX_VALUE);
}
 
@Test
public void testPullAndAck_NoMessages() {
	when(this.pullCallable.call(any(PullRequest.class))).thenReturn(PullResponse.newBuilder().build());

	List<PubsubMessage> result = this.pubSubSubscriberTemplate.pullAndAck(
			"sub2", 1, true);

	assertThat(result.size()).isEqualTo(0);

	verify(this.pubSubSubscriberTemplate, never()).ack(any());
}
 
@Test
public void testPullNext_NoMessages() {
	when(this.pullCallable.call(any(PullRequest.class))).thenReturn(PullResponse.newBuilder().build());

	PubsubMessage message = this.pubSubSubscriberTemplate.pullNext("sub2");

	assertThat(message).isNull();

	verify(this.subscriberFactory).createPullRequest("sub2", 1, true);
	verify(this.pubSubSubscriberTemplate, never()).ack(any());
}
 
源代码29 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/** Tests when no messages are received from the Cloud Pub/Sub PullResponse. */
@Test
public void testPollCaseWithNoMessages() throws Exception {
  task.start(props);
  PullResponse stubbedPullResponse = PullResponse.newBuilder().build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  assertEquals(0, task.poll().size());
  verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
}
 
源代码30 项目: pubsub   文件: CloudPubSubSourceTaskTest.java
/**
 * Tests when the message retrieved from Cloud Pub/Sub have several attributes, including
 * one that matches {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE} and uses Kafka Record Headers to store them
 */
@Test
public void testPollWithMultipleAttributesAndRecordHeaders() throws Exception {
  props.put(CloudPubSubSourceConnector.USE_KAFKA_HEADERS, "true");
  task.start(props);
  Map<String, String> attributes = new HashMap<>();
  attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
  attributes.put("attribute1", "attribute_value1");
  attributes.put("attribute2", "attribute_value2");
  ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
  PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
  when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
  List<SourceRecord> result = task.poll();
  verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
  assertEquals(1, result.size());

  ConnectHeaders headers = new ConnectHeaders();
  headers.addString("attribute1", "attribute_value1");
  headers.addString("attribute2", "attribute_value2");

  SourceRecord expected =
      new SourceRecord(
          null,
          null,
          KAFKA_TOPIC,
          0,
          Schema.OPTIONAL_STRING_SCHEMA,
          KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
          Schema.BYTES_SCHEMA,
          KAFKA_VALUE,
          Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE),
          headers);
  assertRecordsEqual(expected, result.get(0));
}