类org.apache.kafka.common.record.CompressionType源码实例Demo

下面列出了怎么用org.apache.kafka.common.record.CompressionType的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kop   文件: GroupMetadataManagerTest.java
private int completeTransactionalOffsetCommit(ByteBuffer buffer,
                                              long producerId,
                                              short producerEpoch,
                                              long baseOffset,
                                              boolean isCommit) {
    MemoryRecordsBuilder builder = MemoryRecords.builder(
        buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
        TimestampType.LOG_APPEND_TIME, baseOffset, Time.SYSTEM.milliseconds(),
        producerId, producerEpoch, 0, true, true,
        RecordBatch.NO_PARTITION_LEADER_EPOCH);
    ControlRecordType controlRecordType;
    if (isCommit) {
        controlRecordType = ControlRecordType.COMMIT;
    } else {
        controlRecordType = ControlRecordType.ABORT;
    }
    builder.appendEndTxnMarker(Time.SYSTEM.milliseconds(), new EndTransactionMarker(controlRecordType, 0));
    builder.build();
    return 1;
}
 
源代码2 项目: core-ng-project   文件: KafkaAppender.java
private KafkaProducer<byte[], byte[]> createProducer(KafkaURI uri) {
    var watch = new StopWatch();
    try {
        Map<String, Object> config = Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, uri.bootstrapURIs,
                ProducerConfig.ACKS_CONFIG, "0",                                        // no acknowledge to maximize performance
                ProducerConfig.CLIENT_ID_CONFIG, "log-forwarder",                       // if not specify, kafka uses producer-${seq} name, also impact jmx naming
                ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name,
                ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60 * 1000,                   // 60s, type is INT
                ProducerConfig.LINGER_MS_CONFIG, 50L,
                ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 500L,                        // longer backoff to reduce cpu usage when kafka is not available
                ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5L * 1000,               // 5s
                ProducerConfig.MAX_BLOCK_MS_CONFIG, 30L * 1000);                         // 30s, metadata update timeout, shorter than default, to get exception sooner if kafka is not available
        var serializer = new ByteArraySerializer();
        var producer = new KafkaProducer<>(config, serializer, serializer);
        producerMetrics.set(producer.metrics());
        return producer;
    } finally {
        logger.info("create kafka log producer, uri={}, elapsed={}", uri, watch.elapsed());
    }
}
 
源代码3 项目: core-ng-project   文件: MessageProducer.java
public void tryCreateProducer() {
    if (uri.resolveURI()) {
        var watch = new StopWatch();
        try {
            Map<String, Object> config = Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, uri.bootstrapURIs,
                    ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name,
                    ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60 * 1000,                        // 60s, DELIVERY_TIMEOUT_MS_CONFIG is INT type
                    ProducerConfig.LINGER_MS_CONFIG, 5L,                                         // use small linger time within acceptable range to improve batching
                    ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 500L,                            // longer backoff to reduce cpu usage when kafka is not available
                    ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5L * 1000,                   // 5s
                    ProducerConfig.MAX_BLOCK_MS_CONFIG, 30L * 1000);                             // 30s, metadata update timeout, shorter than default, to get exception sooner if kafka is not available

            var serializer = new ByteArraySerializer();
            producer = new KafkaProducer<>(config, serializer, serializer);
            producerMetrics.set(producer.metrics());
        } finally {
            logger.info("create kafka producer, uri={}, name={}, elapsed={}", uri, name, watch.elapsed());
        }
    }
}
 
源代码4 项目: kop   文件: KafkaProtocolHandler.java
public void initGroupCoordinator(BrokerService service) throws Exception {
    GroupConfig groupConfig = new GroupConfig(
        kafkaConfig.getGroupMinSessionTimeoutMs(),
        kafkaConfig.getGroupMaxSessionTimeoutMs(),
        kafkaConfig.getGroupInitialRebalanceDelayMs()
    );

    OffsetConfig offsetConfig = OffsetConfig.builder()
        .offsetsTopicName(kafkaConfig.getKafkaMetadataTenant() + "/"
            + kafkaConfig.getKafkaMetadataNamespace()
            + "/" + Topic.GROUP_METADATA_TOPIC_NAME)
        .offsetsTopicNumPartitions(kafkaConfig.getOffsetsTopicNumPartitions())
        .offsetsTopicCompressionType(CompressionType.valueOf(kafkaConfig.getOffsetsTopicCompressionCodec()))
        .maxMetadataSize(kafkaConfig.getOffsetMetadataMaxSize())
        .offsetsRetentionCheckIntervalMs(kafkaConfig.getOffsetsRetentionCheckIntervalMs())
        .offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes()))
        .build();

    createKafkaMetadataNamespaceIfNeeded(service);
    // topicName in pulsar format: tenant/ns/topic
    createKafkaOffsetsTopic(service);

    this.groupCoordinator = GroupCoordinator.of(
        (PulsarClientImpl) (service.pulsar().getClient()),
        groupConfig,
        offsetConfig,
        SystemTimer.builder()
            .executorName("group-coordinator-timer")
            .build(),
        Time.SYSTEM
    );

    loadOffsetTopics(groupCoordinator);
}
 
源代码5 项目: kop   文件: GroupMetadataManagerTest.java
private int appendConsumerOffsetCommit(ByteBuffer buffer,
                                       long baseOffset,
                                       Map<TopicPartition, Long> offsets) {
    MemoryRecordsBuilder builder =
        MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset);
    List<SimpleRecord> commitRecords = createCommittedOffsetRecords(offsets, groupId);
    commitRecords.forEach(builder::append);
    builder.build();
    return offsets.size();
}
 
源代码6 项目: kop   文件: GroupMetadataManagerTest.java
private int appendTransactionalOffsetCommits(ByteBuffer buffer,
                                             long producerId,
                                             short producerEpoch,
                                             long baseOffset,
                                             Map<TopicPartition, Long> offsets) {
    MemoryRecordsBuilder builder =
        MemoryRecords.builder(buffer, CompressionType.NONE, baseOffset, producerId, producerEpoch, 0, true);
    List<SimpleRecord> commitRecords = createCommittedOffsetRecords(offsets, groupId);
    commitRecords.forEach(builder::append);
    builder.build();
    return offsets.size();
}
 
源代码7 项目: joyqueue   文件: Producer.java
public Producer(String topic, Boolean isAsync) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfigs.BOOTSTRAP);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConfigs.CLIENT_ID);
    // if u want to know offset in callback, setting acks=-1
    //props.put("acks", "-1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name);
    producer = new KafkaProducer<>(props);
    this.topic = topic;
    this.isAsync = isAsync;
}
 
源代码8 项目: quarkus   文件: SubstituteSnappy.java
@Substitute
public static CompressionType forName(String name) {
    if (NONE.name.equals(name)) {
        return NONE;
    } else if (GZIP.name.equals(name)) {
        return GZIP;
    } else {
        throw new IllegalArgumentException("Unknown  or unsupported compression name: " + name);
    }
}
 
源代码9 项目: quarkus   文件: SubstituteSnappy.java
@Substitute
public static CompressionType forId(int id) {
    switch (id) {
        case 0:
            return NONE;
        case 1:
            return GZIP;
        default:
            throw new IllegalArgumentException("Unknown or unsupported compression type id: " + id);
    }
}
 
源代码10 项目: cruise-control   文件: ModelParameters.java
private static ConfigSetting forSetting(CompressionType type, boolean sslEnabled) {
  switch (type) {
    case NONE:
      return sslEnabled ? ConfigSetting.SSL_NONE : ConfigSetting.PLAINTEXT_NONE;
    case GZIP:
      return sslEnabled ? ConfigSetting.SSL_GZIP : ConfigSetting.PLAINTEXT_GZIP;
    case SNAPPY:
      return sslEnabled ? ConfigSetting.SSL_SNAPPY : ConfigSetting.PLAINTEXT_SNAPPY;
    case LZ4:
      return sslEnabled ? ConfigSetting.SSL_LZ4 : ConfigSetting.PLAINTEXT_LZ4;
    default:
      throw new IllegalStateException("Should not be here.");
  }
}
 
 类所在包
 类方法
 同包方法