下面列出了怎么用org.apache.kafka.common.record.CompressionType的API类实例代码及写法,或者点击链接到github查看源代码。
private int completeTransactionalOffsetCommit(ByteBuffer buffer,
long producerId,
short producerEpoch,
long baseOffset,
boolean isCommit) {
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.LOG_APPEND_TIME, baseOffset, Time.SYSTEM.milliseconds(),
producerId, producerEpoch, 0, true, true,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
ControlRecordType controlRecordType;
if (isCommit) {
controlRecordType = ControlRecordType.COMMIT;
} else {
controlRecordType = ControlRecordType.ABORT;
}
builder.appendEndTxnMarker(Time.SYSTEM.milliseconds(), new EndTransactionMarker(controlRecordType, 0));
builder.build();
return 1;
}
private KafkaProducer<byte[], byte[]> createProducer(KafkaURI uri) {
var watch = new StopWatch();
try {
Map<String, Object> config = Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, uri.bootstrapURIs,
ProducerConfig.ACKS_CONFIG, "0", // no acknowledge to maximize performance
ProducerConfig.CLIENT_ID_CONFIG, "log-forwarder", // if not specify, kafka uses producer-${seq} name, also impact jmx naming
ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60 * 1000, // 60s, type is INT
ProducerConfig.LINGER_MS_CONFIG, 50L,
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 500L, // longer backoff to reduce cpu usage when kafka is not available
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5L * 1000, // 5s
ProducerConfig.MAX_BLOCK_MS_CONFIG, 30L * 1000); // 30s, metadata update timeout, shorter than default, to get exception sooner if kafka is not available
var serializer = new ByteArraySerializer();
var producer = new KafkaProducer<>(config, serializer, serializer);
producerMetrics.set(producer.metrics());
return producer;
} finally {
logger.info("create kafka log producer, uri={}, elapsed={}", uri, watch.elapsed());
}
}
public void tryCreateProducer() {
if (uri.resolveURI()) {
var watch = new StopWatch();
try {
Map<String, Object> config = Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, uri.bootstrapURIs,
ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60 * 1000, // 60s, DELIVERY_TIMEOUT_MS_CONFIG is INT type
ProducerConfig.LINGER_MS_CONFIG, 5L, // use small linger time within acceptable range to improve batching
ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 500L, // longer backoff to reduce cpu usage when kafka is not available
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5L * 1000, // 5s
ProducerConfig.MAX_BLOCK_MS_CONFIG, 30L * 1000); // 30s, metadata update timeout, shorter than default, to get exception sooner if kafka is not available
var serializer = new ByteArraySerializer();
producer = new KafkaProducer<>(config, serializer, serializer);
producerMetrics.set(producer.metrics());
} finally {
logger.info("create kafka producer, uri={}, name={}, elapsed={}", uri, name, watch.elapsed());
}
}
}
public void initGroupCoordinator(BrokerService service) throws Exception {
GroupConfig groupConfig = new GroupConfig(
kafkaConfig.getGroupMinSessionTimeoutMs(),
kafkaConfig.getGroupMaxSessionTimeoutMs(),
kafkaConfig.getGroupInitialRebalanceDelayMs()
);
OffsetConfig offsetConfig = OffsetConfig.builder()
.offsetsTopicName(kafkaConfig.getKafkaMetadataTenant() + "/"
+ kafkaConfig.getKafkaMetadataNamespace()
+ "/" + Topic.GROUP_METADATA_TOPIC_NAME)
.offsetsTopicNumPartitions(kafkaConfig.getOffsetsTopicNumPartitions())
.offsetsTopicCompressionType(CompressionType.valueOf(kafkaConfig.getOffsetsTopicCompressionCodec()))
.maxMetadataSize(kafkaConfig.getOffsetMetadataMaxSize())
.offsetsRetentionCheckIntervalMs(kafkaConfig.getOffsetsRetentionCheckIntervalMs())
.offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes()))
.build();
createKafkaMetadataNamespaceIfNeeded(service);
// topicName in pulsar format: tenant/ns/topic
createKafkaOffsetsTopic(service);
this.groupCoordinator = GroupCoordinator.of(
(PulsarClientImpl) (service.pulsar().getClient()),
groupConfig,
offsetConfig,
SystemTimer.builder()
.executorName("group-coordinator-timer")
.build(),
Time.SYSTEM
);
loadOffsetTopics(groupCoordinator);
}
private int appendConsumerOffsetCommit(ByteBuffer buffer,
long baseOffset,
Map<TopicPartition, Long> offsets) {
MemoryRecordsBuilder builder =
MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset);
List<SimpleRecord> commitRecords = createCommittedOffsetRecords(offsets, groupId);
commitRecords.forEach(builder::append);
builder.build();
return offsets.size();
}
private int appendTransactionalOffsetCommits(ByteBuffer buffer,
long producerId,
short producerEpoch,
long baseOffset,
Map<TopicPartition, Long> offsets) {
MemoryRecordsBuilder builder =
MemoryRecords.builder(buffer, CompressionType.NONE, baseOffset, producerId, producerEpoch, 0, true);
List<SimpleRecord> commitRecords = createCommittedOffsetRecords(offsets, groupId);
commitRecords.forEach(builder::append);
builder.build();
return offsets.size();
}
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfigs.BOOTSTRAP);
props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConfigs.CLIENT_ID);
// if u want to know offset in callback, setting acks=-1
//props.put("acks", "-1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name);
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
@Substitute
public static CompressionType forName(String name) {
if (NONE.name.equals(name)) {
return NONE;
} else if (GZIP.name.equals(name)) {
return GZIP;
} else {
throw new IllegalArgumentException("Unknown or unsupported compression name: " + name);
}
}
@Substitute
public static CompressionType forId(int id) {
switch (id) {
case 0:
return NONE;
case 1:
return GZIP;
default:
throw new IllegalArgumentException("Unknown or unsupported compression type id: " + id);
}
}
private static ConfigSetting forSetting(CompressionType type, boolean sslEnabled) {
switch (type) {
case NONE:
return sslEnabled ? ConfigSetting.SSL_NONE : ConfigSetting.PLAINTEXT_NONE;
case GZIP:
return sslEnabled ? ConfigSetting.SSL_GZIP : ConfigSetting.PLAINTEXT_GZIP;
case SNAPPY:
return sslEnabled ? ConfigSetting.SSL_SNAPPY : ConfigSetting.PLAINTEXT_SNAPPY;
case LZ4:
return sslEnabled ? ConfigSetting.SSL_LZ4 : ConfigSetting.PLAINTEXT_LZ4;
default:
throw new IllegalStateException("Should not be here.");
}
}