下面列出了怎么用org.apache.kafka.common.utils.SystemTime的API类实例代码及写法,或者点击链接到github查看源代码。
public static void initialize() {
MetricConfig metricConfig = new MetricConfig()
.samples(100)
.timeWindow(
1000,
TimeUnit.MILLISECONDS
);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter("io.confluent.ksql.metrics"));
// Replace all static contents other than Time to ensure they are cleaned for tests that are
// not aware of the need to initialize/cleanup this test, in case test processes are reused.
// Tests aware of the class clean everything up properly to get the state into a clean state,
// a full, fresh instantiation here ensures something like KsqlEngineMetricsTest running after
// another test that used MetricsCollector without running cleanUp will behave correctly.
metrics = new Metrics(metricConfig, reporters, new SystemTime());
collectorMap = new ConcurrentHashMap<>();
}
public ConsumerService() {
thread = new Thread(() -> {
consumer();
}, name + "consumer-service");
Properties props = ConfigService.getKafkaConsumerConf();
MONITOR_TOPIC = ConfigService.monitorConfig.getMonitorTopic();
consumer = new MonitorConsumer(MONITOR_TOPIC, props);
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporterList = new ArrayList<>();
reporterList.add(new JmxReporter("kmf.services"));
Metrics metrics = new Metrics(metricConfig, reporterList, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", "monitor");
sensor = new ConsumerMetrics(metrics, tags);
}
public ConnectEmbedded(Properties workerConfig, Properties... connectorConfigs) throws Exception {
Time time = new SystemTime();
DistributedConfig config = new DistributedConfig(Utils.propsToStringMap(workerConfig));
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
//not sure if this is going to work but because we don't have advertised url we can get at least a fairly random
String workerId = UUID.randomUUID().toString();
worker = new Worker(workerId, time, config, offsetBackingStore);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter());
configBackingStore.configure(config);
//advertisedUrl = "" as we don't have the rest server - hopefully this will not break anything
herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore, "");
this.connectorConfigs = connectorConfigs;
shutdownHook = new ShutdownHook();
}
public ClusterTopicManipulationService(String name, AdminClient adminClient) {
LOGGER.info("ClusterTopicManipulationService constructor initiated {}", this.getClass().getName());
_isOngoingTopicCreationDone = true;
_isOngoingTopicDeletionDone = true;
_adminClient = adminClient;
_executor = Executors.newSingleThreadScheduledExecutor();
_reportIntervalSecond = Duration.ofSeconds(1);
_running = new AtomicBoolean(false);
_configDefinedServiceName = name;
// TODO: instantiate a new instance of ClusterTopicManipulationMetrics(..) here.
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(Service.JMX_PREFIX));
Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", name);
_clusterTopicManipulationMetrics = new ClusterTopicManipulationMetrics(metrics, tags);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
kafkaBroker = new TestKafkaBroker();
Map<String, String> props = makeWorkerProps();
WorkerConfig workerCfg = new StandaloneConfig(props);
MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
offBackingStore.configure(workerCfg);
worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
worker.start();
herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
herder.start();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
kafkaBroker = new TestKafkaBroker();
for (String topic : TOPICS)
kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
Map<String, String> props = makeWorkerProps();
WorkerConfig workerCfg = new StandaloneConfig(props);
OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class);
offBackingStore.configure(workerCfg);
worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore);
worker.start();
herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg));
herder.start();
}
public KafkaStore(
String bootstrapServers,
Serializer<K, V> serializer,
String topic,
Collection<StoreUpdateObserver<K, V>> observers) {
this(bootstrapServers, serializer, topic, observers, new SystemTime(), emptyMap(), emptyMap());
}
public KafkaStore(
String bootstrapServers,
Serializer<K, V> serializer,
String topic,
Map<String, Object> additionalProducerProps,
Map<String, Object> additionalConsumerProps) {
this(bootstrapServers, serializer, topic, emptyList(), new SystemTime(), additionalProducerProps,
additionalConsumerProps);
}
/**
* Creates and starts an embedded Kafka broker.
*
* @param config Broker configuration settings. Used to modify, for example, the listeners
* the broker should use. Note that you cannot change some settings such as
* `log.dirs`.
*/
public KafkaEmbedded(final Properties config) throws IOException {
this.tmpFolder = new TemporaryFolder();
this.tmpFolder.create();
this.logDir = tmpFolder.newFolder();
this.effectiveConfig = effectiveConfigFrom(config, logDir);
final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, true);
log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...",
logDir, zookeeperConnect());
kafka = TestUtils.createServer(kafkaConfig, new SystemTime());
log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
brokerList(), zookeeperConnect());
}
public ProduceService() {
Properties properties = ConfigService.getZkProper();
zkConnect = properties.getProperty(ZooConfig.HOST);
MONITOR_TOPIC = ConfigService.monitorConfig.getMonitorTopic();
produceExecutor = Executors.newScheduledThreadPool(4, r -> new Thread(r, "produce-service"));
partitionHandlerExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "partition-change-handler"));
partitionNum = new AtomicInteger(1);
currentPartition = new ConcurrentHashMap<>();
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(100, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter("kmf.services"));
Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", "test");
produceMetrics = new ProduceMetrics(metrics, tags);
int existingPartitionCount = Utils.getPartitionNumByTopic(zkConnect, MONITOR_TOPIC);
if (existingPartitionCount > 0) {
partitionNum.set(existingPartitionCount);
}
initialProducer();
}
/**
* Package private constructor for unit test.
*/
AnomalyDetector(PriorityBlockingQueue<Anomaly> anomalies,
AdminClient adminClient,
long anomalyDetectionIntervalMs,
KafkaCruiseControl kafkaCruiseControl,
AnomalyNotifier anomalyNotifier,
GoalViolationDetector goalViolationDetector,
BrokerFailureDetector brokerFailureDetector,
MetricAnomalyDetector metricAnomalyDetector,
DiskFailureDetector diskFailureDetector,
TopicAnomalyDetector topicAnomalyDetector,
ScheduledExecutorService detectorScheduler) {
_anomalies = anomalies;
_adminClient = adminClient;
_anomalyDetectionIntervalMsByType = new HashMap<>(KafkaAnomalyType.cachedValues().size() - 1);
KafkaAnomalyType.cachedValues().stream().filter(type -> type != BROKER_FAILURE)
.forEach(type -> _anomalyDetectionIntervalMsByType.put(type, anomalyDetectionIntervalMs));
_brokerFailureDetectionBackoffMs = anomalyDetectionIntervalMs;
_anomalyNotifier = anomalyNotifier;
_goalViolationDetector = goalViolationDetector;
_brokerFailureDetector = brokerFailureDetector;
_metricAnomalyDetector = metricAnomalyDetector;
_diskFailureDetector = diskFailureDetector;
_topicAnomalyDetector = topicAnomalyDetector;
_kafkaCruiseControl = kafkaCruiseControl;
_detectorScheduler = detectorScheduler;
_shutdown = false;
_selfHealingGoals = Collections.emptyList();
_anomalyLoggerExecutor =
Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
_anomalyInProgress = null;
_numCheckedWithDelay = new AtomicLong();
_shutdownLock = new Object();
// Add anomaly detector state
_anomalyDetectorState = new AnomalyDetectorState(new SystemTime(), new HashMap<>(KafkaAnomalyType.cachedValues().size()), 10, null);
}
/**
* Construct the Cruise Control
*
* @param config the configuration of Cruise Control.
*/
public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwizardMetricRegistry) {
_config = config;
_time = new SystemTime();
// initialize some of the static state of Kafka Cruise Control;
ModelUtils.init(config);
ModelParameters.init(config);
// Instantiate the components.
_anomalyDetector = new AnomalyDetector(this, _time, dropwizardMetricRegistry);
_executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetector);
_loadMonitor = new LoadMonitor(config, _time, _executor, dropwizardMetricRegistry, KafkaMetricDef.commonMetricDef());
_goalOptimizerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("GoalOptimizerExecutor", true, null));
_goalOptimizer = new GoalOptimizer(config, _loadMonitor, _time, dropwizardMetricRegistry, _executor);
}
@Test
public void testNoPreComputingThread() {
Properties props = new Properties();
props.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap.servers");
props.setProperty(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG, "connect:1234");
props.setProperty(AnalyzerConfig.NUM_PROPOSAL_PRECOMPUTE_THREADS_CONFIG, "0");
props.setProperty(AnalyzerConfig.DEFAULT_GOALS_CONFIG, TestConstants.DEFAULT_GOALS_VALUES);
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(props);
GoalOptimizer goalOptimizer = new GoalOptimizer(config, EasyMock.mock(LoadMonitor.class), new SystemTime(),
new MetricRegistry(), EasyMock.mock(Executor.class));
// Should exit immediately.
goalOptimizer.run();
}
@Test
public void testStateChangeSequences() {
TopicPartition tp = new TopicPartition("topic", 0);
ExecutionTaskManager taskManager = new ExecutionTaskManager(null, new MetricRegistry(), new SystemTime(),
new KafkaCruiseControlConfig(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties()));
List<List<ExecutionTask.State>> testSequences = new ArrayList<>();
// Completed successfully.
testSequences.add(Arrays.asList(IN_PROGRESS, COMPLETED));
// Rollback succeeded.
testSequences.add(Arrays.asList(IN_PROGRESS, ABORTING, ABORTED));
// Rollback failed.
testSequences.add(Arrays.asList(IN_PROGRESS, ABORTING, DEAD));
// Cannot rollback.
testSequences.add(Arrays.asList(IN_PROGRESS, DEAD));
ReplicaPlacementInfo r0 = new ReplicaPlacementInfo(0);
ReplicaPlacementInfo r1 = new ReplicaPlacementInfo(1);
ReplicaPlacementInfo r2 = new ReplicaPlacementInfo(2);
for (List<ExecutionTask.State> sequence : testSequences) {
taskManager.clear();
// Make sure the proposal does not involve leader movement.
ExecutionProposal proposal =
new ExecutionProposal(tp, 10, r2, Arrays.asList(r0, r2), Arrays.asList(r2, r1));
taskManager.setExecutionModeForTaskTracker(false);
taskManager.addExecutionProposals(Collections.singletonList(proposal),
Collections.emptySet(),
generateExpectedCluster(proposal, tp),
null);
taskManager.setRequestedInterBrokerPartitionMovementConcurrency(null);
taskManager.setRequestedIntraBrokerPartitionMovementConcurrency(null);
taskManager.setRequestedLeadershipMovementConcurrency(null);
List<ExecutionTask> tasks = taskManager.getInterBrokerReplicaMovementTasks();
assertEquals(1, tasks.size());
ExecutionTask task = tasks.get(0);
verifyStateChangeSequence(sequence, task, taskManager);
}
}
JkesDocumentDeleter(
JestClient client,
boolean ignoreSchema,
Set<String> ignoreSchemaTopics,
String versionType,
long flushTimeoutMs,
int maxBufferedRecords,
int maxInFlightRequests,
int batchSize,
long lingerMs,
int maxRetries,
long retryBackoffMs
) {
this.client = client;
this.ignoreSchema = ignoreSchema;
this.ignoreSchemaTopics = ignoreSchemaTopics;
this.versionType = versionType;
this.flushTimeoutMs = flushTimeoutMs;
bulkProcessor = new BulkProcessor<>(
new SystemTime(),
new BulkDeletingClient(client),
maxBufferedRecords,
maxInFlightRequests,
batchSize,
lingerMs,
maxRetries,
retryBackoffMs
);
existingMappings = new HashSet<>();
}
/**
* Construct the abstract auditor.
*/
public AbstractAuditor() {
super();
this.setUncaughtExceptionHandler((t, e) -> LOG.error("Auditor died to unhandled exception", e));
this.setDaemon(true);
_time = new SystemTime();
}
/**
* Mainly contains services for three metrics:
* 1 - ConsumeAvailability metrics
* 2 - CommitOffsetAvailability metrics
* 2.1 - commitAvailabilityMetrics records offsets committed upon success. that is, no exception upon callback
* 2.2 - commitAvailabilityMetrics records offsets commit fail upon failure. that is, exception upon callback
* 3 - CommitOffsetLatency metrics
* 3.1 - commitLatencyMetrics records the latency between last successful callback and start of last recorded commit.
*
* @param name Name of the Monitor instance
* @param topicPartitionResult The completable future for topic partition
* @param consumerFactory Consumer Factory object.
* @throws ExecutionException when attempting to retrieve the result of a task that aborted by throwing an exception
* @throws InterruptedException when a thread is waiting, sleeping, or otherwise occupied and the thread is interrupted
*/
public ConsumeService(String name,
CompletableFuture<Void> topicPartitionResult,
ConsumerFactory consumerFactory)
throws ExecutionException, InterruptedException {
_baseConsumer = consumerFactory.baseConsumer();
_latencySlaMs = consumerFactory.latencySlaMs();
_name = name;
_adminClient = consumerFactory.adminClient();
_running = new AtomicBoolean(false);
// Returns a new CompletionStage (topicPartitionFuture) which
// executes the given action - code inside run() - when this stage (topicPartitionResult) completes normally,.
CompletableFuture<Void> topicPartitionFuture = topicPartitionResult.thenRun(() -> {
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, new SystemTime());
tags = new HashMap<>();
tags.put(TAGS_NAME, name);
_topic = consumerFactory.topic();
_sensors = new ConsumeMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(),
consumerFactory.latencyPercentileGranularityMs());
_commitLatencyMetrics = new CommitLatencyMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(),
consumerFactory.latencyPercentileGranularityMs());
_commitAvailabilityMetrics = new CommitAvailabilityMetrics(metrics, tags);
_consumeThread = new Thread(() -> {
try {
consume();
} catch (Exception e) {
LOG.error(name + "/ConsumeService failed", e);
}
}, name + " consume-service");
_consumeThread.setDaemon(true);
});
// In a blocking fashion, waits for this topicPartitionFuture to complete, and then returns its result.
topicPartitionFuture.get();
}
/**
* XinfraMonitor constructor creates apps and services for each of the individual clusters (properties) that's passed in.
* For example, if there are 10 clusters to be monitored, then this Constructor will create 10 * num_apps_per_cluster
* and 10 * num_services_per_cluster.
* @param allClusterProps the properties of ALL kafka clusters for which apps and services need to be appended.
* @throws Exception when exception occurs while assigning Apps and Services
*/
@SuppressWarnings({"rawtypes"})
public XinfraMonitor(Map<String, Map> allClusterProps) throws Exception {
_apps = new ConcurrentHashMap<>();
_services = new ConcurrentHashMap<>();
for (Map.Entry<String, Map> clusterProperty : allClusterProps.entrySet()) {
String name = clusterProperty.getKey();
Map props = clusterProperty.getValue();
if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG))
throw new IllegalArgumentException(name + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG);
String className = (String) props.get(XinfraMonitorConstants.CLASS_NAME_CONFIG);
Class<?> aClass = Class.forName(className);
if (App.class.isAssignableFrom(aClass)) {
App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
_apps.put(name, clusterApp);
} else if (Service.class.isAssignableFrom(aClass)) {
ServiceFactory serviceFactory = (ServiceFactory) Class.forName(className + XinfraMonitorConstants.FACTORY)
.getConstructor(Map.class, String.class)
.newInstance(props, name);
Service service = serviceFactory.createService();
_services.put(name, service);
} else {
throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName());
}
}
_executor = Executors.newSingleThreadScheduledExecutor();
_offlineRunnables = new ConcurrentHashMap<>();
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(XinfraMonitorConstants.JMX_PREFIX));
Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime());
metrics.addMetric(metrics.metricName("offline-runnable-count", XinfraMonitorConstants.METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
(config, now) -> _offlineRunnables.size());
}
/**
* Sets up test Kafka broker.
*
* @throws IOException If failed.
*/
private void setupKafkaServer() throws IOException {
kafkaCfg = new KafkaConfig(getKafkaConfig());
kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
kafkaSrv.startup();
}
private KafkaServer startBroker(Properties props) {
List<KafkaMetricsReporter> kmrList = new ArrayList<>();
Buffer<KafkaMetricsReporter> metricsList = scala.collection.JavaConversions.asScalaBuffer(kmrList);
KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime(), Option.<String>empty(), metricsList);
server.startup();
return server;
}
public KafkaStore(String bootstrapServers, Serializer<K, V> serializer, String topic) {
this(bootstrapServers, serializer, topic, emptyList(), new SystemTime(), emptyMap(), emptyMap());
}
public MongoSourceTask() {
this(new SystemTime());
}
public SelfHealingNotifier() {
this(new SystemTime());
}
/**
* The main function to run offline proposal generator.
* @param argv Arguments passed while starting offline proposal generator.
*/
public static void main(String[] argv) throws Exception {
//TODO: probably need to save this in the original model file
Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(props);
ModelUtils.init(config);
ModelParameters.init(config);
BalancingConstraint balancingConstraint = new BalancingConstraint(config);
long start = System.currentTimeMillis();
ClusterModel clusterModel = clusterModelFromFile(argv[0]);
long end = System.currentTimeMillis();
double duration = (end - start) / 1000.0;
System.out.println("Model loaded in " + duration + "s.");
ClusterModelStats origStats = clusterModel.getClusterStats(balancingConstraint);
String loadBeforeOptimization = clusterModel.brokerStats(null).toString();
// Instantiate the components.
GoalOptimizer goalOptimizer = new GoalOptimizer(config,
null,
new SystemTime(),
new MetricRegistry(),
EasyMock.mock(Executor.class));
start = System.currentTimeMillis();
OptimizerResult optimizerResult = goalOptimizer.optimizations(clusterModel, new OperationProgress());
end = System.currentTimeMillis();
duration = (end - start) / 1000.0;
String loadAfterOptimization = clusterModel.brokerStats(null).toString();
System.out.println("Optimize goals in " + duration + "s.");
System.out.println(optimizerResult.goalProposals().size());
System.out.println(loadBeforeOptimization);
System.out.println(loadAfterOptimization);
ClusterModelStats optimizedStats = clusterModel.getClusterStats(balancingConstraint);
double[] testStatistics = AnalyzerUtils.testDifference(origStats.utilizationMatrix(), optimizedStats.utilizationMatrix());
System.out.println(Arrays.stream(RawAndDerivedResource.values()).map(x -> x.toString()).collect(Collectors.joining(", ")));
System.out.println(Arrays.stream(testStatistics).boxed().map(pValue -> Double.toString(pValue)).collect(Collectors.joining(", ")));
}
public HdfsFileWatcherPolicy(FsSourceTaskConfig conf) throws IOException {
super(conf);
this.fileQueue = new ConcurrentLinkedQueue<>();
this.time = new SystemTime();
startWatchers();
}
EventStreamThread(FileSystem fs, HdfsAdmin admin, long retrySleepMs) {
this.fs = fs;
this.admin = admin;
this.retrySleepMs = retrySleepMs;
this.time = new SystemTime();
}
public CronPolicy(FsSourceTaskConfig conf) throws IOException {
super(conf);
this.time = new SystemTime();
}
public FsSourceTask() {
this.stop = new AtomicBoolean(false);
this.time = new SystemTime();
}
public ProduceService(Map<String, Object> props, String name) throws Exception {
_name = name;
ProduceServiceConfig config = new ProduceServiceConfig(props);
_brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerClass = config.getString(ProduceServiceConfig.PRODUCER_CLASS_CONFIG);
int latencyPercentileMaxMs = config.getInt(ProduceServiceConfig.LATENCY_PERCENTILE_MAX_MS_CONFIG);
int latencyPercentileGranularityMs = config.getInt(ProduceServiceConfig.LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG);
_partitioner = config.getConfiguredInstance(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG, KMPartitioner.class);
_threadsNum = config.getInt(ProduceServiceConfig.PRODUCE_THREAD_NUM_CONFIG);
_topic = config.getString(ProduceServiceConfig.TOPIC_CONFIG);
_producerId = config.getString(ProduceServiceConfig.PRODUCER_ID_CONFIG);
_produceDelayMs = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_DELAY_MS_CONFIG);
_recordSize = config.getInt(ProduceServiceConfig.PRODUCE_RECORD_SIZE_BYTE_CONFIG);
_sync = config.getBoolean(ProduceServiceConfig.PRODUCE_SYNC_CONFIG);
boolean treatZeroThroughputAsUnavailable =
config.getBoolean(ProduceServiceConfig.PRODUCER_TREAT_ZERO_THROUGHPUT_AS_UNAVAILABLE_CONFIG);
_partitionNum = new AtomicInteger(0);
_running = new AtomicBoolean(false);
_nextIndexPerPartition = new ConcurrentHashMap<>();
_producerPropsOverride = props.containsKey(ProduceServiceConfig.PRODUCER_PROPS_CONFIG)
? (Map) props.get(ProduceServiceConfig.PRODUCER_PROPS_CONFIG) : new HashMap<>();
for (String property: NON_OVERRIDABLE_PROPERTIES) {
if (_producerPropsOverride.containsKey(property)) {
throw new ConfigException("Override must not contain " + property + " config.");
}
}
_adminClient = AdminClient.create(props);
if (producerClass.equals(NewProducer.class.getCanonicalName()) || producerClass.equals(NewProducer.class.getSimpleName())) {
_producerClassName = NewProducer.class.getCanonicalName();
} else {
_producerClassName = producerClass;
}
initializeProducer(props);
_produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory());
_handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory());
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
Map<String, String> tags = new HashMap<>();
tags.put("name", _name);
_sensors =
new ProduceMetrics(metrics, tags, latencyPercentileGranularityMs, latencyPercentileMaxMs, _partitionNum,
treatZeroThroughputAsUnavailable);
}
public JenkinsSourceTask() {
this.time = new SystemTime();
}