下面列出了org.apache.kafka.clients.producer.internals.DefaultPartitioner#org.apache.pulsar.client.api.ProducerBuilder 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public GroupMetadataManager(OffsetConfig offsetConfig,
ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder,
ReaderBuilder<ByteBuffer> metadataTopicReaderBuilder,
ScheduledExecutorService scheduler,
Time time) {
this(
offsetConfig,
metadataTopicProducerBuilder,
metadataTopicReaderBuilder,
scheduler,
time,
// Be same with kafka: abs(groupId.hashCode) % groupMetadataTopicPartitionCount
// return a partitionId
groupId -> MathUtils.signSafeMod(
groupId.hashCode(),
offsetConfig.offsetsTopicNumPartitions()
)
);
}
GroupMetadataManager(OffsetConfig offsetConfig,
ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder,
ReaderBuilder<ByteBuffer> metadataTopicConsumerBuilder,
ScheduledExecutorService scheduler,
Time time,
Function<String, Integer> partitioner) {
this.offsetConfig = offsetConfig;
this.compressionType = offsetConfig.offsetsTopicCompressionType();
this.groupMetadataCache = new ConcurrentHashMap<>();
this.groupMetadataTopicPartitionCount = offsetConfig.offsetsTopicNumPartitions();
this.metadataTopicProducerBuilder = metadataTopicProducerBuilder;
this.metadataTopicReaderBuilder = metadataTopicConsumerBuilder;
this.scheduler = scheduler;
this.time = time;
this.partitioner = partitioner;
}
@Test
public void testPublishWithFailure() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
final String topicName = "persistent://my-property/my-ns/my-topic1";
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(false)
.create();
stopBroker();
try {
producer.send(createMessagePayload(100).getBytes());
fail("should have failed with timeout exception");
} catch (PulsarClientException.TimeoutException e) {
// Ok
}
producer.close();
}
/**
* Validate that chunking is not supported with batching and non-persistent topic
*
* @throws Exception
*/
@Test
public void testInvalidUseCaseForChunking() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(5);
final String topicName = "persistent://my-property/my-ns/my-topic1";
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
try {
Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
fail("it should have failied because chunking can't be used with batching enabled");
} catch (IllegalArgumentException ie) {
// Ok
}
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testCreateProducerWithSameName() throws Exception {
String topic = "persistent://prop/ns-abc/testCreateProducerWithSameName";
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic(topic)
.producerName("test-producer-a")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition);
Producer<byte[]> p1 = producerBuilder.create();
try {
producerBuilder.create();
fail("Should have thrown ProducerBusyException");
} catch (ProducerBusyException e) {
// Expected
}
p1.close();
// Now p2 should succeed
Producer<byte[]> p2 = producerBuilder.create();
p2.close();
}
private Topic createTopicAndPublish(PulsarClient pulsarClient, String ns, String topicLocalName, int totalPublish)
throws Exception {
final String topicName = String.format("persistent://%s/%s", ns, topicLocalName);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
consumer.close();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5, TimeUnit.SECONDS);
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < totalPublish; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
producer.close();
return pulsarService.getBrokerService().getTopicReference(topicName).get();
}
public void startup() throws Exception {
try {
client = PULSAR_CLIENT_BUILDER.get()
.serviceUrl(serviceUrl)
.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer()
.topic(topic)
.producerName("pulsar-log4j2-appender-" + topic)
.blockIfQueueFull(false);
if (syncSend) {
// disable batching for sync send
producerBuilder = producerBuilder.enableBatching(false);
} else {
// enable batching in 10 ms for async send
producerBuilder = producerBuilder
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
}
producer = producerBuilder.create();
} catch (Exception t) {
LOGGER.error("Failed to start pulsar manager", t);
throw t;
}
}
public Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
throws PulsarClientException {
ProducerBuilder<T> builder = client.newProducer(schema)
.blockIfQueueFull(true)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.LZ4)
.hashingScheme(HashingScheme.Murmur3_32Hash) //
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(FunctionResultRouter.of())
// set send timeout to be infinity to prevent potential deadlock with consumer
// that might happen when consumer is blocked due to unacked messages
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic);
if (producerName != null) {
builder.producerName(producerName);
}
return builder.properties(properties).create();
}
public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (properties.containsKey(PRODUCER_NAME)) {
producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
}
if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES)) {
producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
producerBuilder.maxPendingMessagesAcrossPartitions(
Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
}
producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
}
return producerBuilder;
}
@Test
public void testPulsarKafkaProducer() {
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
doAnswer(invocation -> {
Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
return mockProducerBuilder;
}).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doAnswer(invocation -> {
Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
return mockClientBuilder;
}).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, Boolean.FALSE.toString());
new PulsarKafkaProducer<>(properties);
verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
verify(mockProducerBuilder, times(1)).blockIfQueueFull(false);
}
public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (properties.containsKey(PRODUCER_NAME)) {
producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
}
if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES)) {
producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
}
if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
producerBuilder.maxPendingMessagesAcrossPartitions(
Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
}
producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
}
return producerBuilder;
}
@Test
public void testPulsarKafkaProducer() {
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
doAnswer(invocation -> {
Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
return mockProducerBuilder;
}).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doAnswer(invocation -> {
Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
return mockClientBuilder;
}).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
new PulsarKafkaProducer<>(properties);
verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
}
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testAllEmptyCompactionLedger(boolean batchEnabled) throws Exception {
final String topic = "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString();
final int messages = 10;
// 1.create producer and publish message to the topic.
ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic);
if (!batchEnabled) {
builder.enableBatching(false);
} else {
builder.batchingMaxMessages(messages / 5);
}
Producer<byte[]> producer = builder.create();
List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().keyBytes("1".getBytes()).value("".getBytes()).sendAsync());
}
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();
// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
}
}
@Test(dataProvider = "batch")
public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/use/my-ns/my-topic1");
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true)
.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
} else {
producerBuilder.enableBatching(false);
}
Producer<String> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
producer.send("my-message-" + i);
}
Message<String> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = msg.getValue();
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testInvalidConfig() throws Exception {
final String topicName = "persistent://my-property/my-ns/my-topic1";
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
// batching and chunking can't be enabled together
try {
Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
fail("producer creation should have fail");
} catch (IllegalArgumentException ie) {
// Ok
}
}
@Test()
public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
int batchMessageDelayMs = 100;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-xyz/use/ns-abc/topic1")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
.subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://prop-xyz/use/ns-abc/topic1")
.messageRoutingMode(MessageRoutingMode.SinglePartition);
if (batchMessageDelayMs != 0) {
producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
.batchingMaxMessages(5);
} else {
producerBuilder.enableBatching(false);
}
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
try {
consumer.receiveAsync().handle((ok, e) -> {
if (e == null) {
// as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
Assert.fail();
}
return null;
});
} finally {
consumer.close();
}
}
private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
builder.maxPendingMessages(count);
// disable periodical flushing
builder.batchingMaxPublishDelay(1, TimeUnit.DAYS);
builder.topic(topic);
if (enableBatch) {
builder.enableBatching(true);
builder.batchingMaxMessages(count);
} else {
builder.enableBatching(false);
}
try (Producer<byte[]> producer = builder.create()) {
Future<?> lastFuture = null;
for (int i = 0; i < count; i++) {
String key = "key"+i;
byte[] data = ("my-message-" + i).getBytes();
lastFuture = producer.newMessage().key(key).value(data).sendAsync();
keys.add(key);
}
producer.flush();
lastFuture.get();
}
return keys;
}
MessageProducer(URL url, final TopicName dest, boolean batch) throws Exception {
this.url = url;
this.namespace = dest.getNamespace();
this.topicName = dest.toString();
client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer()
.topic(topicName)
.enableBatching(batch)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.batchingMaxMessages(5);
producer = producerBuilder.create();
}
@Test
public void testProducerAndConsumerPassed() throws Exception {
log.info("-- {} -- start", methodName);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/my-topic")
.subscriptionName("my-subscriber-name")
.subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/my-topic")
.enableBatching(false);
Producer<byte[]> producer = producerBuilder.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
log.info("Produced message: [{}]", message);
}
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < 10; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- {} -- end", methodName);
}
@Override
public ProducerBuilder<T> property(String key, String value) {
checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
"property key/value cannot be blank");
conf.getProperties().put(key, value);
return this;
}
@Override
public ProducerBuilder<T> intercept(ProducerInterceptor... interceptors) {
if (interceptorList == null) {
interceptorList = new ArrayList<>();
}
interceptorList.addAll(Arrays.asList(interceptors));
return this;
}
@Override
@Deprecated
public ProducerBuilder<T> intercept(org.apache.pulsar.client.api.ProducerInterceptor<T>... interceptors) {
if (interceptorList == null) {
interceptorList = new ArrayList<>();
}
interceptorList.addAll(Arrays.stream(interceptors).map(ProducerInterceptorWrapper::new)
.collect(Collectors.toList()));
return this;
}
public static GroupCoordinator of(
PulsarClientImpl pulsarClient,
GroupConfig groupConfig,
OffsetConfig offsetConfig,
Timer timer,
Time time
) {
ScheduledExecutorService coordinatorExecutor = OrderedScheduler.newSchedulerBuilder()
.name("group-coordinator-executor")
.build();
// __offset partitions producers and readers builder.
ProducerBuilder<ByteBuffer> producer = pulsarClient
.newProducer(Schema.BYTEBUFFER)
.maxPendingMessages(100000);
ReaderBuilder<ByteBuffer> reader = new ReaderBuilderImpl<>(pulsarClient, Schema.BYTEBUFFER);
reader.startMessageId(MessageId.earliest);
GroupMetadataManager metadataManager = new GroupMetadataManager(
offsetConfig,
producer,
reader,
coordinatorExecutor,
time
);
DelayedOperationPurgatory<DelayedJoin> joinPurgatory = DelayedOperationPurgatory.<DelayedJoin>builder()
.purgatoryName("group-coordinator-delayed-join")
.timeoutTimer(timer)
.build();
DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory =
DelayedOperationPurgatory.<DelayedHeartbeat>builder()
.purgatoryName("group-coordinator-delayed-heartbeat")
.timeoutTimer(timer)
.build();
OffsetAcker offsetAcker = new OffsetAcker(pulsarClient);
return new GroupCoordinator(
groupConfig,
metadataManager,
heartbeatPurgatory,
joinPurgatory,
time,
offsetAcker
);
}
@VisibleForTesting
public ProducerBuilder<byte[]> getPulsarProducerBuilder() {
return pulsarProducerBuilder;
}
@Test
public void testPulsarKafkaInterceptor() throws PulsarClientException {
// Arrange
PulsarClient mockClient = mock(PulsarClient.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
CompletableFuture mockPartitionFuture = new CompletableFuture();
CompletableFuture mockSendAsyncFuture = new CompletableFuture();
TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
mockPartitionFuture.complete(new ArrayList<>());
mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
doReturn(mockClient).when(mockClientBuilder).build();
doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(
(org.apache.pulsar.client.api.ProducerInterceptor) any());
doReturn(mockProducer).when(mockProducerBuilder).create();
doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
List interceptors = new ArrayList();
interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// Act
PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties);
pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));
// Verify
verify(mockProducerBuilder, times(1)).intercept(
(org.apache.pulsar.client.api.ProducerInterceptor)any());
}
@Test
public void testPulsarKafkaSendAvro() throws PulsarClientException {
// Arrange
PulsarClient mockClient = mock(PulsarClient.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
CompletableFuture mockPartitionFuture = new CompletableFuture();
CompletableFuture mockSendAsyncFuture = new CompletableFuture();
TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
mockPartitionFuture.complete(new ArrayList<>());
mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
doReturn(mockClient).when(mockClientBuilder).build();
doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(
(org.apache.pulsar.client.api.ProducerInterceptor) any());
doReturn(mockProducer).when(mockProducerBuilder).create();
doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
List interceptors = new ArrayList();
interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
// Act
PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
// Verify
verify(mockTypedMessageBuilder, times(1)).sendAsync();
verify(mockProducerBuilder, times(1)).intercept(
(org.apache.pulsar.client.api.ProducerInterceptor) any());
}
@Test
public void testPulsarKafkaSendAvro() throws PulsarClientException {
// Arrange
PulsarClient mockClient = mock(PulsarClient.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
CompletableFuture mockPartitionFuture = new CompletableFuture();
CompletableFuture mockSendAsyncFuture = new CompletableFuture();
TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
mockPartitionFuture.complete(new ArrayList<>());
mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
doReturn(mockClient).when(mockClientBuilder).build();
doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
doReturn(mockProducer).when(mockProducerBuilder).create();
doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
// Act
PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, foo, bar));
// Verify
verify(mockTypedMessageBuilder).sendAsync();
}
private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
ProducerBuilder<byte[]> builder = client.newProducer()
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition);
// Set to false to prevent the server thread from being blocked if a lot of messages are pending.
builder.blockIfQueueFull(false);
if (queryParams.containsKey("producerName")) {
builder.producerName(queryParams.get("producerName"));
}
if (queryParams.containsKey("initialSequenceId")) {
builder.initialSequenceId(Long.parseLong("initialSequenceId"));
}
if (queryParams.containsKey("hashingScheme")) {
builder.hashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
}
if (queryParams.containsKey("sendTimeoutMillis")) {
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("batchingEnabled")) {
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
}
if (queryParams.containsKey("batchingMaxMessages")) {
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
}
if (queryParams.containsKey("maxPendingMessages")) {
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
}
if (queryParams.containsKey("batchingMaxPublishDelay")) {
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("messageRoutingMode")) {
checkArgument(
Enums.getIfPresent(MessageRoutingMode.class, queryParams.get("messageRoutingMode")).isPresent(),
"Invalid messageRoutingMode %s", queryParams.get("messageRoutingMode"));
MessageRoutingMode routingMode = MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
builder.messageRoutingMode(routingMode);
}
}
if (queryParams.containsKey("compressionType")) {
checkArgument(Enums.getIfPresent(CompressionType.class, queryParams.get("compressionType")).isPresent(),
"Invalid compressionType %s", queryParams.get("compressionType"));
builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
}
return builder;
}
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactMultipleTimesWithoutEmptyMessage(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
final String topic = "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID().toString();
final int messages = 10;
final String key = "1";
// 1.create producer and publish message to the topic.
ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic);
if (!batchEnabled) {
builder.enableBatching(false);
} else {
builder.batchingMaxMessages(messages / 5);
}
Producer<byte[]> producer = builder.create();
List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key(key).value((i + "").getBytes()).sendAsync());
}
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();
// 3. Send more ten messages
futures.clear();
for (int i = 0; i < messages; i++) {
futures.add(producer.newMessage().key(key).value((i + 10 + "").getBytes()).sendAsync());
}
FutureUtil.waitForAll(futures).get();
// 4.compact again.
compactor.compact(topic).get();
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
Message<byte[]> m1 = consumer.receive();
assertNotNull(m1);
assertEquals(m1.getKey(), key);
assertEquals(new String(m1.getValue()), "19");
Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
assertNull(none);
}
}
@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtn);
try {
producerThreeBuilder.create();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
}
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName("test")
.topic(fqtn);
Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();
producer.send(new Schemas.PersonTwo(2, "Lucy"));
Message<Schemas.PersonTwo> message = consumerTwo.receive();
Schemas.PersonTwo personTwo = message.getValue();
consumerTwo.acknowledge(message);
assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");
producer.close();
consumerTwo.close();
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
producer = producerThreeBuilder.create();
consumerTwo = comsumerBuilder.subscribe();
producer.send(new Schemas.PersonTwo(2, "Lucy"));
message = consumerTwo.receive();
personTwo = message.getValue();
consumerTwo.acknowledge(message);
assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");
consumerTwo.close();
producer.close();
}