下面列出了io.grpc.netty.shaded.io.grpc.netty.NegotiationType#com.google.pubsub.v1.PullRequest 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
@Override
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
logger.atFine().log("Pulling messages %s", request);
SubscriptionManager subscriptionManager = subscriptions.get(request.getSubscription());
if (subscriptionManager == null) {
String message = request.getSubscription() + " is not a valid Subscription";
logger.atWarning().log(message);
responseObserver.onError(Status.NOT_FOUND.withDescription(message).asException());
} else {
PullResponse response =
PullResponse.newBuilder()
.addAllReceivedMessages(
buildReceivedMessageList(
request.getSubscription(),
subscriptionManager.pull(
request.getMaxMessages(), request.getReturnImmediately())))
.build();
logger.atFine().log("Returning %d messages", response.getReceivedMessagesCount());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
@Test
public void pull() {
List<PubsubMessage> messages =
Arrays.asList(
PubsubMessage.newBuilder()
.setMessageId("0-0")
.setData(ByteString.copyFromUtf8("hello"))
.build(),
PubsubMessage.newBuilder()
.setMessageId("0-1")
.setData(ByteString.copyFromUtf8("world"))
.build());
when(mockSubscriptionManager3.pull(100, false)).thenReturn(messages);
PullRequest request =
PullRequest.newBuilder()
.setSubscription(TestHelpers.PROJECT2_SUBSCRIPTION3)
.setMaxMessages(100)
.build();
PullResponse response = blockingStub.pull(request);
assertThat(
response.getReceivedMessagesList(),
Matchers.contains(
ReceivedMessage.newBuilder().setAckId("0-0").setMessage(messages.get(0)).build(),
ReceivedMessage.newBuilder().setAckId("0-1").setMessage(messages.get(1)).build()));
}
/**
* Pull and ack {@link PubsubMessage}s from the subscription indicated by {@code index}.
*/
public List<PubsubMessage> pull(int index, int maxMessages, boolean returnImmediately) {
List<ReceivedMessage> response = subscriber.pullCallable()
.call(PullRequest.newBuilder().setMaxMessages(maxMessages)
.setReturnImmediately(returnImmediately).setSubscription(getSubscription(index))
.build())
.getReceivedMessagesList();
if (response.size() > 0) {
subscriber.acknowledgeCallable()
.call(AcknowledgeRequest.newBuilder().setSubscription(getSubscription(index))
.addAllAckIds(
response.stream().map(ReceivedMessage::getAckId).collect(Collectors.toList()))
.build());
}
return response.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList());
}
@Override
public PullRequest createPullRequest(String subscriptionName, Integer maxMessages,
Boolean returnImmediately) {
Assert.hasLength(subscriptionName, "The subscription name must be provided.");
if (maxMessages == null) {
maxMessages = Integer.MAX_VALUE;
}
Assert.isTrue(maxMessages > 0, "The maxMessages must be greater than 0.");
PullRequest.Builder pullRequestBuilder =
PullRequest.newBuilder()
.setSubscription(
PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, this.projectId).toString())
.setMaxMessages(maxMessages);
if (returnImmediately != null) {
pullRequestBuilder.setReturnImmediately(returnImmediately);
}
return pullRequestBuilder.build();
}
/**
* Pulls messages asynchronously, on demand, using the pull request in argument.
*
* @param pullRequest pull request containing the subscription name
* @return the ListenableFuture for the asynchronous execution, returning
* the list of {@link AcknowledgeablePubsubMessage} containing the ack ID, subscription
* and acknowledger
*/
private ListenableFuture<List<AcknowledgeablePubsubMessage>> pullAsync(PullRequest pullRequest) {
Assert.notNull(pullRequest, "The pull request can't be null.");
ApiFuture<PullResponse> pullFuture = this.subscriberStub.pullCallable().futureCall(pullRequest);
final SettableListenableFuture<List<AcknowledgeablePubsubMessage>> settableFuture = new SettableListenableFuture<>();
ApiFutures.addCallback(pullFuture, new ApiFutureCallback<PullResponse>() {
@Override
public void onFailure(Throwable throwable) {
settableFuture.setException(throwable);
}
@Override
public void onSuccess(PullResponse pullResponse) {
List<AcknowledgeablePubsubMessage> result = toAcknowledgeablePubsubMessageList(
pullResponse.getReceivedMessagesList(), pullRequest.getSubscription());
settableFuture.set(result);
}
}, asyncPullExecutor);
return settableFuture;
}
@Override
public ListenableFuture<List<PubsubMessage>> pullAndAckAsync(String subscription, Integer maxMessages,
Boolean returnImmediately) {
PullRequest pullRequest = this.subscriberFactory.createPullRequest(
subscription, maxMessages, returnImmediately);
final SettableListenableFuture<List<PubsubMessage>> settableFuture = new SettableListenableFuture<>();
this.pullAsync(pullRequest).addCallback(
ackableMessages -> {
if (!ackableMessages.isEmpty()) {
ack(ackableMessages);
}
List<PubsubMessage> messages = ackableMessages.stream()
.map(AcknowledgeablePubsubMessage::getPubsubMessage)
.collect(Collectors.toList());
settableFuture.set(messages);
},
settableFuture::setException);
return settableFuture;
}
/**
* Tests that when ackMessages() succeeds and the subsequent call to poll() has no messages, that
* the subscriber does not invoke ackMessages because there should be no acks.
*/
@Test
public void testPollInRegularCase() throws Exception {
task.start(props);
ReceivedMessage rm1 = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm1).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
assertEquals(1, result.size());
task.commitRecord(result.get(0));
stubbedPullResponse = PullResponse.newBuilder().build();
SettableApiFuture<Empty> goodFuture = SettableApiFuture.create();
goodFuture.set(Empty.getDefaultInstance());
when(subscriber.ackMessages(any(AcknowledgeRequest.class))).thenReturn(goodFuture);
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
result = task.poll();
assertEquals(0, result.size());
result = task.poll();
assertEquals(0, result.size());
verify(subscriber, times(1)).ackMessages(any(AcknowledgeRequest.class));
}
/**
* Tests that when a call to ackMessages() fails, that the message is not redelivered to Kafka if
* the message is received again by Cloud Pub/Sub. Also tests that ack ids are added properly if
* the ack id has not been seen before.
*/
@Test
public void testPollWithDuplicateReceivedMessages() throws Exception {
task.start(props);
ReceivedMessage rm1 = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm1).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
assertEquals(1, result.size());
ReceivedMessage rm2 = createReceivedMessage(ACK_ID2, CPS_MESSAGE, new HashMap<String, String>());
stubbedPullResponse =
PullResponse.newBuilder().addReceivedMessages(0, rm1).addReceivedMessages(1, rm2).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
result = task.poll();
assertEquals(1, result.size());
}
/**
* Tests when the message(s) retrieved from Cloud Pub/Sub do not have an attribute that matches
* {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
*/
@Test
public void testPollWithNoMessageKeyAttribute() throws Exception {
task.start(props);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected, result.get(0));
}
/**
* Tests when the message(s) retrieved from Cloud Pub/Sub do have an attribute that matches {@link
* #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
*/
@Test
public void testPollWithMessageKeyAttribute() throws Exception {
task.start(props);
Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected, result.get(0));
}
/**
* Tests when the message(s) retrieved from Cloud Pub/Sub do have an attribute that matches {@link
* #KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE} and {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE}.
*/
@Test
public void testPollWithMessageTimestampAttribute() throws Exception{
task.start(props);
Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
attributes.put(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE, KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE, Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE));
assertRecordsEqual(expected, result.get(0));
}
/** Tests that the correct partition is assigned when the partition scheme is "hash_value". */
@Test
public void testPollWithPartitionSchemeHashValue() throws Exception {
props.put(
CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG,
CloudPubSubSourceConnector.PartitionScheme.HASH_VALUE.toString());
task.start(props);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
KAFKA_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS),
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected, result.get(0));
}
/** Tests that the no partition is assigned when the partition scheme is "kafka_partitioner". */
@Test
public void testPollWithPartitionSchemeKafkaPartitioner() throws Exception {
props.put(
CloudPubSubSourceConnector.KAFKA_PARTITION_SCHEME_CONFIG,
CloudPubSubSourceConnector.PartitionScheme.KAFKA_PARTITIONER.toString());
task.start(props);
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, new HashMap<String, String>());
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
null,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertRecordsEqual(expected, result.get(0));
assertNull(result.get(0).kafkaPartition());
}
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel = NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(maxNumberOfMessages)
.setReturnImmediately(false)
.setSubscription(subscriptionName)
.build();
List<ReceivedMessage> receivedMessages = subscriber.pullCallable().call(pullRequest).getReceivedMessagesList();
acknowledgeIds(subscriber, subscriptionName, receivedMessages);
return receivedMessages;
}
}
@OnScheduled
public void onScheduled(ProcessContext context) {
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
.setSubscription(getSubscriptionName(context))
.build();
try {
subscriber = getSubscriber(context);
} catch (IOException e) {
storedException.set(e);
getLogger().error("Failed to create Google Cloud Subscriber due to {}", new Object[]{e});
}
}
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel managedChannel = NettyChannelBuilder.forTarget(hostAndPort)
.usePlaintext() // This is 'Ok' because this is ONLY used for testing.
.build();
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setReturnImmediately(false)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub = SubscriberGrpc.newBlockingStub(managedChannel);
return new BlockingGrpcPubSubSubscriber(projectSubscriptionName, managedChannel, stub, pullRequest, retries, timeout);
}
public BlockingGrpcPubSubSubscriber(String projectSubscriptionName,
ManagedChannel channel,
SubscriberGrpc.SubscriberBlockingStub stub,
PullRequest pullRequest,
int retries,
Duration timeout) {
this.projectSubscriptionName = projectSubscriptionName;
this.channel = channel;
this.stub = stub;
this.retries = retries;
this.timeout = timeout;
this.pullRequest = pullRequest;
}
public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
// String projectId = "my-project-id";
// String subscriptionId = "my-subscription-id";
// int numOfMessages = 10; // max number of messages to be pulled
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(maxNumberOfMessages)
.setReturnImmediately(false) // return immediately if messages are not available
.setSubscription(subscriptionName)
.build();
// use pullCallable().futureCall to asynchronously perform this operation
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
// handle received message
// ...
ackIds.add(message.getAckId());
}
// acknowledge received messages
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// use acknowledgeCallable().futureCall to asynchronously perform this operation
subscriber.acknowledgeCallable().call(acknowledgeRequest);
return pullResponse.getReceivedMessagesList();
}
}
@Test
public void pull_subscriptionDoesNotExist() {
expectedException.expect(StatusRuntimeException.class);
expectedException.expectMessage(Status.NOT_FOUND.getCode().toString());
PullRequest request =
PullRequest.newBuilder()
.setSubscription("projects/project-1/subscriptions/unknown-subscription")
.setMaxMessages(100)
.build();
blockingStub.pull(request);
}
@Test
public void pull_emptyList() {
when(mockSubscriptionManager3.pull(100, false)).thenReturn(Collections.emptyList());
PullRequest request =
PullRequest.newBuilder()
.setSubscription(TestHelpers.PROJECT2_SUBSCRIPTION3)
.setMaxMessages(100)
.build();
PullResponse response = blockingStub.pull(request);
assertThat(response.getReceivedMessagesList(), Matchers.empty());
}
private List<String> getMessagesFromSubscription(String subscriptionName) {
String projectSubscriptionName = ProjectSubscriptionName.format(
projectName, subscriptionName);
PullRequest pullRequest = PullRequest.newBuilder()
.setReturnImmediately(true)
.setMaxMessages(10)
.setSubscription(projectSubscriptionName)
.build();
PullResponse pullResponse = subscriptionAdminClient.getStub().pullCallable().call(pullRequest);
return pullResponse.getReceivedMessagesList().stream()
.map((message) -> message.getMessage().getData().toStringUtf8())
.collect(Collectors.toList());
}
/**
* Pulls messages synchronously, on demand, using the pull request in argument.
*
* @param pullRequest pull request containing the subscription name
* @return the list of {@link AcknowledgeablePubsubMessage} containing the ack ID, subscription
* and acknowledger
*/
private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
Assert.notNull(pullRequest, "The pull request can't be null.");
PullResponse pullResponse = this.subscriberStub.pullCallable().call(pullRequest);
return toAcknowledgeablePubsubMessageList(
pullResponse.getReceivedMessagesList(),
pullRequest.getSubscription());
}
@Override
public List<PubsubMessage> pullAndAck(String subscription, Integer maxMessages,
Boolean returnImmediately) {
PullRequest pullRequest = this.subscriberFactory.createPullRequest(
subscription, maxMessages, returnImmediately);
List<AcknowledgeablePubsubMessage> ackableMessages = pull(pullRequest);
if (!ackableMessages.isEmpty()) {
ack(ackableMessages);
}
return ackableMessages.stream().map(AcknowledgeablePubsubMessage::getPubsubMessage)
.collect(Collectors.toList());
}
@Test
public void testCreatePullRequest_nonNullMaxMessages() {
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(() -> "project");
factory.setCredentialsProvider(this.credentialsProvider);
PullRequest request = factory.createPullRequest("test", null, true);
assertThat(request.getMaxMessages()).isEqualTo(Integer.MAX_VALUE);
}
@Test
public void testPullAndAck_NoMessages() {
when(this.pullCallable.call(any(PullRequest.class))).thenReturn(PullResponse.newBuilder().build());
List<PubsubMessage> result = this.pubSubSubscriberTemplate.pullAndAck(
"sub2", 1, true);
assertThat(result.size()).isEqualTo(0);
verify(this.pubSubSubscriberTemplate, never()).ack(any());
}
@Test
public void testPullNext_NoMessages() {
when(this.pullCallable.call(any(PullRequest.class))).thenReturn(PullResponse.newBuilder().build());
PubsubMessage message = this.pubSubSubscriberTemplate.pullNext("sub2");
assertThat(message).isNull();
verify(this.subscriberFactory).createPullRequest("sub2", 1, true);
verify(this.pubSubSubscriberTemplate, never()).ack(any());
}
/** Tests when no messages are received from the Cloud Pub/Sub PullResponse. */
@Test
public void testPollCaseWithNoMessages() throws Exception {
task.start(props);
PullResponse stubbedPullResponse = PullResponse.newBuilder().build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
assertEquals(0, task.poll().size());
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
}
/**
* Tests when the message retrieved from Cloud Pub/Sub have several attributes, including
* one that matches {@link #KAFKA_MESSAGE_KEY_ATTRIBUTE} and uses Kafka Record Headers to store them
*/
@Test
public void testPollWithMultipleAttributesAndRecordHeaders() throws Exception {
props.put(CloudPubSubSourceConnector.USE_KAFKA_HEADERS, "true");
task.start(props);
Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_MESSAGE_KEY_ATTRIBUTE, KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE);
attributes.put("attribute1", "attribute_value1");
attributes.put("attribute2", "attribute_value2");
ReceivedMessage rm = createReceivedMessage(ACK_ID1, CPS_MESSAGE, attributes);
PullResponse stubbedPullResponse = PullResponse.newBuilder().addReceivedMessages(rm).build();
when(subscriber.pull(any(PullRequest.class)).get()).thenReturn(stubbedPullResponse);
List<SourceRecord> result = task.poll();
verify(subscriber, never()).ackMessages(any(AcknowledgeRequest.class));
assertEquals(1, result.size());
ConnectHeaders headers = new ConnectHeaders();
headers.addString("attribute1", "attribute_value1");
headers.addString("attribute2", "attribute_value2");
SourceRecord expected =
new SourceRecord(
null,
null,
KAFKA_TOPIC,
0,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE,
Long.parseLong(KAFKA_MESSAGE_TIMESTAMP_ATTRIBUTE_VALUE),
headers);
assertRecordsEqual(expected, result.get(0));
}