org.apache.kafka.clients.producer.internals.DefaultPartitioner#org.apache.pulsar.client.api.ProducerBuilder源码实例Demo

下面列出了org.apache.kafka.clients.producer.internals.DefaultPartitioner#org.apache.pulsar.client.api.ProducerBuilder 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: kop   文件: GroupMetadataManager.java
public GroupMetadataManager(OffsetConfig offsetConfig,
                            ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder,
                            ReaderBuilder<ByteBuffer> metadataTopicReaderBuilder,
                            ScheduledExecutorService scheduler,
                            Time time) {
    this(
        offsetConfig,
        metadataTopicProducerBuilder,
        metadataTopicReaderBuilder,
        scheduler,
        time,
        // Be same with kafka: abs(groupId.hashCode) % groupMetadataTopicPartitionCount
        // return a partitionId
        groupId -> MathUtils.signSafeMod(
            groupId.hashCode(),
            offsetConfig.offsetsTopicNumPartitions()
        )
    );
}
 
源代码2 项目: kop   文件: GroupMetadataManager.java
GroupMetadataManager(OffsetConfig offsetConfig,
                     ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder,
                     ReaderBuilder<ByteBuffer> metadataTopicConsumerBuilder,
                     ScheduledExecutorService scheduler,
                     Time time,
                     Function<String, Integer> partitioner) {
    this.offsetConfig = offsetConfig;
    this.compressionType = offsetConfig.offsetsTopicCompressionType();
    this.groupMetadataCache = new ConcurrentHashMap<>();
    this.groupMetadataTopicPartitionCount = offsetConfig.offsetsTopicNumPartitions();
    this.metadataTopicProducerBuilder = metadataTopicProducerBuilder;
    this.metadataTopicReaderBuilder = metadataTopicConsumerBuilder;
    this.scheduler = scheduler;
    this.time = time;
    this.partitioner = partitioner;
}
 
源代码3 项目: pulsar   文件: MessageChunkingTest.java
@Test
public void testPublishWithFailure() throws Exception {
    log.info("-- Starting {} test --", methodName);
    this.conf.setMaxMessageSize(5);
    final String topicName = "persistent://my-property/my-ns/my-topic1";

    ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);

    Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(false)
            .create();

    stopBroker();

    try {
        producer.send(createMessagePayload(100).getBytes());
        fail("should have failed with timeout exception");
    } catch (PulsarClientException.TimeoutException e) {
        // Ok
    }
    producer.close();
}
 
源代码4 项目: pulsar   文件: MessageChunkingTest.java
/**
 * Validate that chunking is not supported with batching and non-persistent topic
 * 
 * @throws Exception
 */
@Test
public void testInvalidUseCaseForChunking() throws Exception {

    log.info("-- Starting {} test --", methodName);
    this.conf.setMaxMessageSize(5);
    final String topicName = "persistent://my-property/my-ns/my-topic1";

    ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);

    try {
        Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
        fail("it should have failied because chunking can't be used with batching enabled");
    } catch (IllegalArgumentException ie) {
        // Ok
    }

    log.info("-- Exiting {} test --", methodName);
}
 
源代码5 项目: pulsar   文件: PersistentTopicE2ETest.java
@Test
public void testCreateProducerWithSameName() throws Exception {
    String topic = "persistent://prop/ns-abc/testCreateProducerWithSameName";

    ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
        .topic(topic)
        .producerName("test-producer-a")
        .enableBatching(false)
        .messageRoutingMode(MessageRoutingMode.SinglePartition);
    Producer<byte[]> p1 = producerBuilder.create();

    try {
        producerBuilder.create();
        fail("Should have thrown ProducerBusyException");
    } catch (ProducerBusyException e) {
        // Expected
    }

    p1.close();

    // Now p2 should succeed
    Producer<byte[]> p2 = producerBuilder.create();

    p2.close();
}
 
源代码6 项目: pulsar   文件: BrokerBookieIsolationTest.java
private Topic createTopicAndPublish(PulsarClient pulsarClient, String ns, String topicLocalName, int totalPublish)
        throws Exception {
    final String topicName = String.format("persistent://%s/%s", ns, topicLocalName);
    Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
            .subscribe();
    consumer.close();

    ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5, TimeUnit.SECONDS);

    Producer<byte[]> producer = producerBuilder.create();
    for (int i = 0; i < totalPublish; i++) {
        String message = "my-message-" + i;
        producer.send(message.getBytes());
    }
    producer.close();

    return pulsarService.getBrokerService().getTopicReference(topicName).get();
}
 
源代码7 项目: pulsar   文件: PulsarManager.java
public void startup() throws Exception {
    try {
        client = PULSAR_CLIENT_BUILDER.get()
            .serviceUrl(serviceUrl)
            .build();
        ProducerBuilder<byte[]> producerBuilder = client.newProducer()
            .topic(topic)
            .producerName("pulsar-log4j2-appender-" + topic)
            .blockIfQueueFull(false);
        if (syncSend) {
            // disable batching for sync send
            producerBuilder = producerBuilder.enableBatching(false);
        } else {
            // enable batching in 10 ms for async send
            producerBuilder = producerBuilder
                .enableBatching(true)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
        }
        producer = producerBuilder.create();
    } catch (Exception t) {
        LOGGER.error("Failed to start pulsar manager", t);
        throw t;
    }
}
 
源代码8 项目: pulsar   文件: PulsarSink.java
public Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
        throws PulsarClientException {
    ProducerBuilder<T> builder = client.newProducer(schema)
            .blockIfQueueFull(true)
            .enableBatching(true)
            .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
            .compressionType(CompressionType.LZ4)
            .hashingScheme(HashingScheme.Murmur3_32Hash) //
            .messageRoutingMode(MessageRoutingMode.CustomPartition)
            .messageRouter(FunctionResultRouter.of())
            // set send timeout to be infinity to prevent potential deadlock with consumer
            // that might happen when consumer is blocked due to unacked messages
            .sendTimeout(0, TimeUnit.SECONDS)
            .topic(topic);
    if (producerName != null) {
        builder.producerName(producerName);
    }

    return builder.properties(properties).create();
}
 
源代码9 项目: pulsar   文件: PulsarProducerKafkaConfig.java
public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
    ProducerBuilder<byte[]> producerBuilder = client.newProducer();

    if (properties.containsKey(PRODUCER_NAME)) {
        producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
    }

    if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
        producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
    }

    if (properties.containsKey(MAX_PENDING_MESSAGES)) {
        producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
    }

    if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
        producerBuilder.maxPendingMessagesAcrossPartitions(
                Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
    }

    producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));

    if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
        producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
    }

    return producerBuilder;
}
 
源代码10 项目: pulsar   文件: PulsarKafkaProducerTest.java
@Test
public void testPulsarKafkaProducer() {
    ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
    ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
    doAnswer(invocation -> {
        Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
        return mockProducerBuilder;
    }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
    doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
    doAnswer(invocation -> {
        Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
        return mockClientBuilder;
    }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));

    PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
    PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
    when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
    when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);

    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
    properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
    properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, Boolean.FALSE.toString());

    new PulsarKafkaProducer<>(properties);

    verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
    verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
    verify(mockProducerBuilder, times(1)).blockIfQueueFull(false);
}
 
源代码11 项目: pulsar   文件: PulsarProducerKafkaConfig.java
public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
    ProducerBuilder<byte[]> producerBuilder = client.newProducer();

    if (properties.containsKey(PRODUCER_NAME)) {
        producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
    }

    if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
        producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
    }

    if (properties.containsKey(MAX_PENDING_MESSAGES)) {
        producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
    }

    if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
        producerBuilder.maxPendingMessagesAcrossPartitions(
                Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
    }

    producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));

    if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
        producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
    }

    return producerBuilder;
}
 
源代码12 项目: pulsar   文件: PulsarKafkaProducerTest.java
@Test
public void testPulsarKafkaProducer() {
    ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
    ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
    doAnswer(invocation -> {
        Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
        return mockProducerBuilder;
    }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
    doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
    doAnswer(invocation -> {
        Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
        return mockClientBuilder;
    }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));

    PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
    PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
    when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
    when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);

    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
    properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");

    new PulsarKafkaProducer<>(properties);

    verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
    verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
}
 
源代码13 项目: pulsar   文件: CompactionTest.java
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testAllEmptyCompactionLedger(boolean batchEnabled) throws Exception {
    final String topic = "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString();

    final int messages = 10;

    // 1.create producer and publish message to the topic.
    ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic);
    if (!batchEnabled) {
        builder.enableBatching(false);
    } else {
        builder.batchingMaxMessages(messages / 5);
    }

    Producer<byte[]> producer = builder.create();

    List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
    for (int i = 0; i < messages; i++) {
        futures.add(producer.newMessage().keyBytes("1".getBytes()).value("".getBytes()).sendAsync());
    }

    FutureUtil.waitForAll(futures).get();

    // 2.compact the topic.
    Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
    compactor.compact(topic).get();

    // consumer with readCompacted enabled only get compacted entries
    try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
            .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
        Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
        assertNull(m);
    }
}
 
源代码14 项目: pulsar   文件: V1_ProducerConsumerTest.java
@Test(dataProvider = "batch")
public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
    log.info("-- Starting {} test --", methodName);

    Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("my-subscriber-name")
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();

    ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING)
            .topic("persistent://my-property/use/my-ns/my-topic1");

    if (batchMessageDelayMs != 0) {
        producerBuilder.enableBatching(true)
            .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
            .batchingMaxMessages(5);
    } else {
        producerBuilder.enableBatching(false);
    }

    Producer<String> producer = producerBuilder.create();
    for (int i = 0; i < 10; i++) {
        producer.send("my-message-" + i);
    }

    Message<String> msg = null;
    Set<String> messageSet = Sets.newHashSet();
    for (int i = 0; i < 10; i++) {
        msg = consumer.receive(5, TimeUnit.SECONDS);
        String receivedMessage = msg.getValue();
        log.debug("Received message: [{}]", receivedMessage);
        String expectedMessage = "my-message-" + i;
        testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
    }
    // Acknowledge the consumption of all messages at once
    consumer.acknowledgeCumulative(msg);
    consumer.close();
    log.info("-- Exiting {} test --", methodName);
}
 
源代码15 项目: pulsar   文件: MessageChunkingTest.java
@Test
public void testInvalidConfig() throws Exception {
    final String topicName = "persistent://my-property/my-ns/my-topic1";
    ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
    // batching and chunking can't be enabled together
    try {
        Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(true).create();
        fail("producer creation should have fail");
    } catch (IllegalArgumentException ie) {
        // Ok
    }
}
 
源代码16 项目: pulsar   文件: ZeroQueueSizeTest.java
@Test()
public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {

    int batchMessageDelayMs = 100;
    Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-xyz/use/ns-abc/topic1")
            .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
            .subscribe();

    ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
        .topic("persistent://prop-xyz/use/ns-abc/topic1")
        .messageRoutingMode(MessageRoutingMode.SinglePartition);

    if (batchMessageDelayMs != 0) {
        producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
                .batchingMaxMessages(5);
    } else {
        producerBuilder.enableBatching(false);
    }

    Producer<byte[]> producer = producerBuilder.create();
    for (int i = 0; i < 10; i++) {
        String message = "my-message-" + i;
        producer.send(message.getBytes());
    }

    try {
        consumer.receiveAsync().handle((ok, e) -> {
            if (e == null) {
                // as zero receiverQueueSize doesn't support batch message, must receive exception at callback.
                Assert.fail();
            }
            return null;
        });
    } finally {
        consumer.close();
    }
}
 
源代码17 项目: pulsar   文件: ReaderTest.java
private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
    Set<String> keys = new HashSet<>();
    ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
    builder.messageRoutingMode(MessageRoutingMode.SinglePartition);
    builder.maxPendingMessages(count);
    // disable periodical flushing
    builder.batchingMaxPublishDelay(1, TimeUnit.DAYS);
    builder.topic(topic);
    if (enableBatch) {
        builder.enableBatching(true);
        builder.batchingMaxMessages(count);
    } else {
        builder.enableBatching(false);
    }
    try (Producer<byte[]> producer = builder.create()) {
        Future<?> lastFuture = null;
        for (int i = 0; i < count; i++) {
            String key = "key"+i;
            byte[] data = ("my-message-" + i).getBytes();
            lastFuture = producer.newMessage().key(key).value(data).sendAsync();
            keys.add(key);
        }
        producer.flush();
        lastFuture.get();
    }
    return keys;
}
 
源代码18 项目: pulsar   文件: ReplicatorTestBase.java
MessageProducer(URL url, final TopicName dest, boolean batch) throws Exception {
    this.url = url;
    this.namespace = dest.getNamespace();
    this.topicName = dest.toString();
    client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
    ProducerBuilder<byte[]> producerBuilder = client.newProducer()
        .topic(topicName)
        .enableBatching(batch)
        .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
        .batchingMaxMessages(5);
    producer = producerBuilder.create();

}
 
源代码19 项目: pulsar   文件: SaslAuthenticateTest.java
@Test
public void testProducerAndConsumerPassed() throws Exception {
    log.info("-- {} -- start", methodName);

    Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic("persistent://my-property/my-ns/my-topic")
        .subscriptionName("my-subscriber-name")
        .subscribe();

    ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
        .topic("persistent://my-property/my-ns/my-topic")
        .enableBatching(false);

    Producer<byte[]> producer = producerBuilder.create();
    for (int i = 0; i < 10; i++) {
        String message = "my-message-" + i;
        producer.send(message.getBytes());
        log.info("Produced message: [{}]", message);
    }

    Message<byte[]> msg = null;
    Set<String> messageSet = Sets.newHashSet();
    for (int i = 0; i < 10; i++) {
        msg = consumer.receive(5, TimeUnit.SECONDS);
        String receivedMessage = new String(msg.getData());
        log.info("Received message: [{}]", receivedMessage);
        String expectedMessage = "my-message-" + i;
        testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
    }
    // Acknowledge the consumption of all messages at once
    consumer.acknowledgeCumulative(msg);
    consumer.close();

    log.info("-- {} -- end", methodName);
}
 
源代码20 项目: pulsar   文件: ProducerBuilderImpl.java
@Override
public ProducerBuilder<T> property(String key, String value) {
    checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
            "property key/value cannot be blank");
    conf.getProperties().put(key, value);
    return this;
}
 
源代码21 项目: pulsar   文件: ProducerBuilderImpl.java
@Override
public ProducerBuilder<T> intercept(ProducerInterceptor... interceptors) {
    if (interceptorList == null) {
        interceptorList = new ArrayList<>();
    }
    interceptorList.addAll(Arrays.asList(interceptors));
    return this;
}
 
源代码22 项目: pulsar   文件: ProducerBuilderImpl.java
@Override
@Deprecated
public ProducerBuilder<T> intercept(org.apache.pulsar.client.api.ProducerInterceptor<T>... interceptors) {
    if (interceptorList == null) {
        interceptorList = new ArrayList<>();
    }
    interceptorList.addAll(Arrays.stream(interceptors).map(ProducerInterceptorWrapper::new)
                                 .collect(Collectors.toList()));
    return this;
}
 
源代码23 项目: kop   文件: GroupCoordinator.java
public static GroupCoordinator of(
    PulsarClientImpl pulsarClient,
    GroupConfig groupConfig,
    OffsetConfig offsetConfig,
    Timer timer,
    Time time
) {
    ScheduledExecutorService coordinatorExecutor = OrderedScheduler.newSchedulerBuilder()
        .name("group-coordinator-executor")
        .build();

    // __offset partitions producers and readers builder.
    ProducerBuilder<ByteBuffer> producer = pulsarClient
        .newProducer(Schema.BYTEBUFFER)
        .maxPendingMessages(100000);
    ReaderBuilder<ByteBuffer> reader = new ReaderBuilderImpl<>(pulsarClient, Schema.BYTEBUFFER);

    reader.startMessageId(MessageId.earliest);
    GroupMetadataManager metadataManager = new GroupMetadataManager(
        offsetConfig,
        producer,
        reader,
        coordinatorExecutor,
        time
    );

    DelayedOperationPurgatory<DelayedJoin> joinPurgatory = DelayedOperationPurgatory.<DelayedJoin>builder()
        .purgatoryName("group-coordinator-delayed-join")
        .timeoutTimer(timer)
        .build();

    DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory =
        DelayedOperationPurgatory.<DelayedHeartbeat>builder()
            .purgatoryName("group-coordinator-delayed-heartbeat")
            .timeoutTimer(timer)
            .build();

    OffsetAcker offsetAcker = new OffsetAcker(pulsarClient);
    return new GroupCoordinator(
        groupConfig,
        metadataManager,
        heartbeatPurgatory,
        joinPurgatory,
        time,
        offsetAcker
    );
}
 
源代码24 项目: pulsar   文件: PulsarKafkaProducer.java
@VisibleForTesting
public ProducerBuilder<byte[]> getPulsarProducerBuilder() {
    return pulsarProducerBuilder;
}
 
源代码25 项目: pulsar   文件: PulsarKafkaProducerTest.java
@Test
public void testPulsarKafkaInterceptor() throws PulsarClientException {
    // Arrange
    PulsarClient mockClient = mock(PulsarClient.class);
    ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
    org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
    ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
    CompletableFuture mockPartitionFuture = new CompletableFuture();
    CompletableFuture mockSendAsyncFuture = new CompletableFuture();
    TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);

    mockPartitionFuture.complete(new ArrayList<>());
    mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
    doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
    doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
    doReturn(mockClient).when(mockClientBuilder).build();
    doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
    doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
    doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
    doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(
            (org.apache.pulsar.client.api.ProducerInterceptor) any());
    doReturn(mockProducer).when(mockProducerBuilder).create();
    doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
    doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
    PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
    PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
    when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
    when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);

    Properties properties = new Properties();
    List interceptors =  new ArrayList();
    interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
    properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    // Act
    PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties);

    pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));

    // Verify
    verify(mockProducerBuilder, times(1)).intercept(
            (org.apache.pulsar.client.api.ProducerInterceptor)any());
}
 
源代码26 项目: pulsar   文件: PulsarKafkaProducerTest.java
@Test
public void testPulsarKafkaSendAvro() throws PulsarClientException {
    // Arrange
    PulsarClient mockClient = mock(PulsarClient.class);
    ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
    org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
    ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
    CompletableFuture mockPartitionFuture = new CompletableFuture();
    CompletableFuture mockSendAsyncFuture = new CompletableFuture();
    TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);

    mockPartitionFuture.complete(new ArrayList<>());
    mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
    doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
    doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
    doReturn(mockClient).when(mockClientBuilder).build();
    doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
    doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
    doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
    doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(
            (org.apache.pulsar.client.api.ProducerInterceptor) any());
    doReturn(mockProducer).when(mockProducerBuilder).create();
    doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
    doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
    PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
    PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
    when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
    when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);

    Properties properties = new Properties();
    List interceptors =  new ArrayList();
    interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
    properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
    properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
    AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
    // Act
    PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);

    Bar bar = new Bar();
    bar.setField1(true);

    Foo foo = new Foo();
    foo.setField1("field1");
    foo.setField2("field2");
    foo.setField3(3);

    pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));

    // Verify
    verify(mockTypedMessageBuilder, times(1)).sendAsync();
    verify(mockProducerBuilder, times(1)).intercept(
            (org.apache.pulsar.client.api.ProducerInterceptor) any());
}
 
源代码27 项目: pulsar   文件: PulsarKafkaProducerTest.java
@Test
public void testPulsarKafkaSendAvro() throws PulsarClientException {
    // Arrange
    PulsarClient mockClient = mock(PulsarClient.class);
    ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
    org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
    ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
    CompletableFuture mockPartitionFuture = new CompletableFuture();
    CompletableFuture mockSendAsyncFuture = new CompletableFuture();
    TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);

    mockPartitionFuture.complete(new ArrayList<>());
    mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
    doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
    doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
    doReturn(mockClient).when(mockClientBuilder).build();
    doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
    doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
    doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
    doReturn(mockProducer).when(mockProducerBuilder).create();
    doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
    doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
    PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
    PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
    when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
    when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);

    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
    properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");

    AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
    AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
    // Act
    PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);

    Bar bar = new Bar();
    bar.setField1(true);

    Foo foo = new Foo();
    foo.setField1("field1");
    foo.setField2("field2");
    foo.setField3(3);

    pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, foo, bar));

    // Verify
    verify(mockTypedMessageBuilder).sendAsync();
}
 
源代码28 项目: pulsar   文件: ProducerHandler.java
private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
    ProducerBuilder<byte[]> builder = client.newProducer()
        .enableBatching(false)
        .messageRoutingMode(MessageRoutingMode.SinglePartition);

    // Set to false to prevent the server thread from being blocked if a lot of messages are pending.
    builder.blockIfQueueFull(false);

    if (queryParams.containsKey("producerName")) {
        builder.producerName(queryParams.get("producerName"));
    }

    if (queryParams.containsKey("initialSequenceId")) {
        builder.initialSequenceId(Long.parseLong("initialSequenceId"));
    }

    if (queryParams.containsKey("hashingScheme")) {
        builder.hashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
    }

    if (queryParams.containsKey("sendTimeoutMillis")) {
        builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
    }

    if (queryParams.containsKey("batchingEnabled")) {
        builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
    }

    if (queryParams.containsKey("batchingMaxMessages")) {
        builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
    }

    if (queryParams.containsKey("maxPendingMessages")) {
        builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
    }

    if (queryParams.containsKey("batchingMaxPublishDelay")) {
        builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
                TimeUnit.MILLISECONDS);
    }

    if (queryParams.containsKey("messageRoutingMode")) {
        checkArgument(
                Enums.getIfPresent(MessageRoutingMode.class, queryParams.get("messageRoutingMode")).isPresent(),
                "Invalid messageRoutingMode %s", queryParams.get("messageRoutingMode"));
        MessageRoutingMode routingMode = MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
        if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
            builder.messageRoutingMode(routingMode);
        }
    }

    if (queryParams.containsKey("compressionType")) {
        checkArgument(Enums.getIfPresent(CompressionType.class, queryParams.get("compressionType")).isPresent(),
                "Invalid compressionType %s", queryParams.get("compressionType"));
        builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
    }

    return builder;
}
 
源代码29 项目: pulsar   文件: CompactionTest.java
@Test(timeOut = 20000, dataProvider = "lastDeletedBatching")
public void testCompactMultipleTimesWithoutEmptyMessage(boolean batchEnabled) throws PulsarClientException, ExecutionException, InterruptedException {
    final String topic = "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID().toString();

    final int messages = 10;
    final String key = "1";

    // 1.create producer and publish message to the topic.
    ProducerBuilder<byte[]> builder = pulsarClient.newProducer().topic(topic);
    if (!batchEnabled) {
        builder.enableBatching(false);
    } else {
        builder.batchingMaxMessages(messages / 5);
    }

    Producer<byte[]> producer = builder.create();

    List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
    for (int i = 0; i < messages; i++) {
        futures.add(producer.newMessage().key(key).value((i + "").getBytes()).sendAsync());
    }

    FutureUtil.waitForAll(futures).get();

    // 2.compact the topic.
    Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
    compactor.compact(topic).get();

    // 3. Send more ten messages
    futures.clear();
    for (int i = 0; i < messages; i++) {
        futures.add(producer.newMessage().key(key).value((i + 10 + "").getBytes()).sendAsync());
    }
    FutureUtil.waitForAll(futures).get();

    // 4.compact again.
    compactor.compact(topic).get();

    try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
            .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) {
        Message<byte[]> m1 = consumer.receive();
        assertNotNull(m1);
        assertEquals(m1.getKey(), key);
        assertEquals(new String(m1.getValue()), "19");
        Message<byte[]> none = consumer.receive(2, TimeUnit.SECONDS);
        assertNull(none);
    }
}
 
源代码30 项目: pulsar   文件: SchemaCompatibilityCheckTest.java
@Test(dataProvider =  "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
    final String tenant = PUBLIC_TENANT;
    final String topic = "test-consumer-compatibility";

    String namespace = "test-namespace-" + randomName(16);
    String fqtn = TopicName.get(
            TopicDomain.persistent.value(),
            tenant,
            namespace,
            topic
    ).toString();

    NamespaceName namespaceName = NamespaceName.get(tenant, namespace);

    admin.namespaces().createNamespace(
            tenant + "/" + namespace,
            Sets.newHashSet(CLUSTER_NAME)
    );

    assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
            SchemaCompatibilityStrategy.FULL);
    
    admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
    admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());

    admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
    ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
            .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
                    (false).withSupportSchemaVersioning(true).
                    withPojo(Schemas.PersonTwo.class).build()))
            .topic(fqtn);
    try {
        producerThreeBuilder.create();
    } catch (Exception e) {
        Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
    }

    admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
    ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
            SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
                    (false).withSupportSchemaVersioning(true).
                    withPojo(Schemas.PersonTwo.class).build()))
            .subscriptionName("test")
            .topic(fqtn);

    Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
    Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();

    producer.send(new Schemas.PersonTwo(2, "Lucy"));
    Message<Schemas.PersonTwo> message = consumerTwo.receive();

    Schemas.PersonTwo personTwo = message.getValue();
    consumerTwo.acknowledge(message);

    assertEquals(personTwo.getId(), 2);
    assertEquals(personTwo.getName(), "Lucy");

    producer.close();
    consumerTwo.close();

    admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);

    producer = producerThreeBuilder.create();
    consumerTwo = comsumerBuilder.subscribe();

    producer.send(new Schemas.PersonTwo(2, "Lucy"));
    message = consumerTwo.receive();

    personTwo = message.getValue();
    consumerTwo.acknowledge(message);

    assertEquals(personTwo.getId(), 2);
    assertEquals(personTwo.getName(), "Lucy");

    consumerTwo.close();
    producer.close();
}