下面列出了怎么用org.apache.kafka.common.utils.LogContext的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public NetworkClient createNetworkClient(long connectionMaxIdleMS,
Metrics metrics,
Time time,
String metricGrpPrefix,
ChannelBuilder channelBuilder,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
boolean discoverBrokerVersions,
ApiVersions apiVersions) {
return new NetworkClient(new Selector(connectionMaxIdleMS, metrics, time, metricGrpPrefix, channelBuilder, new LogContext()),
metadata, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, discoverBrokerVersions, apiVersions, new LogContext());
}
/**
* Construct a load monitor.
*
* @param config The load monitor configuration.
* @param time The time object.
* @param executor The proposal executor.
* @param dropwizardMetricRegistry The sensor registry for cruise control
* @param metricDef The metric definitions.
*/
public LoadMonitor(KafkaCruiseControlConfig config,
Time time,
Executor executor,
MetricRegistry dropwizardMetricRegistry,
MetricDef metricDef) {
this(config,
new MetadataClient(config,
new Metadata(METADATA_REFRESH_BACKOFF,
config.getLong(MonitorConfig.METADATA_MAX_AGE_CONFIG),
new LogContext(),
new ClusterResourceListeners()),
METADATA_TTL,
time),
KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config)),
time,
executor,
dropwizardMetricRegistry,
metricDef);
}
public KafkaNodeClient(int id, String host, int port) {
node = new Node(id, host, port);
//
LogContext logContext = new LogContext("ctx");
ConfigDef defConf = new ConfigDef();
defConf.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC);
defConf.define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM,
ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC);
metrics = new Metrics(Time.SYSTEM);
AbstractConfig config = new AbstractConfig(defConf, new Properties());
channelBuilder = ClientUtils.createChannelBuilder(config);
selector = new Selector(1000L, metrics, Time.SYSTEM, "cc", channelBuilder, logContext);
client = new NetworkClient(selector, new Metadata(0, Long.MAX_VALUE, false),
CLIENT_ID, 10, 1000L, 1000L, 1, 1024, 1000, Time.SYSTEM, true, new ApiVersions(),
null, logContext);
}
/**
* Initialize the coordination manager.
*/
public KarelDbCoordinator(
LogContext logContext,
ConsumerNetworkClient client,
String groupId,
int rebalanceTimeoutMs,
int sessionTimeoutMs,
int heartbeatIntervalMs,
Metrics metrics,
String metricGrpPrefix,
Time time,
long retryBackoffMs,
KarelDbIdentity identity,
KarelDbRebalanceListener listener) {
super(
new GroupRebalanceConfig(
sessionTimeoutMs,
rebalanceTimeoutMs,
heartbeatIntervalMs,
groupId,
Optional.empty(),
retryBackoffMs,
true
),
logContext,
client,
metrics,
metricGrpPrefix,
time
);
this.identity = identity;
this.assignmentSnapshot = null;
this.listener = listener;
}
@Test
public void testSamplingError() {
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getLoadMonitorProperties());
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners());
MetadataClient metadataClient = new MetadataClient(config, metadata, -1L, TIME);
MockPartitionMetricSampleAggregator mockMetricSampleAggregator =
new MockPartitionMetricSampleAggregator(config, metadata);
KafkaBrokerMetricSampleAggregator mockBrokerMetricSampleAggregator =
EasyMock.mock(KafkaBrokerMetricSampleAggregator.class);
MetricRegistry dropwizardMetricRegistry = new MetricRegistry();
MetricSampler sampler = new MockSampler(0);
MetricFetcherManager fetcherManager =
new MetricFetcherManager(config, mockMetricSampleAggregator, mockBrokerMetricSampleAggregator, metadataClient,
METRIC_DEF, TIME, dropwizardMetricRegistry, null, sampler);
LoadMonitorTaskRunner loadMonitorTaskRunner =
new LoadMonitorTaskRunner(config, fetcherManager, mockMetricSampleAggregator, mockBrokerMetricSampleAggregator,
metadataClient, null, TIME);
while (metadata.fetch().topics().size() < 100) {
metadataClient.refreshMetadata();
}
loadMonitorTaskRunner.start(true);
int numSamples = 0;
long startMs = System.currentTimeMillis();
BlockingQueue<PartitionMetricSample> sampleQueue = mockMetricSampleAggregator.metricSampleQueue();
while (numSamples < (NUM_PARTITIONS * NUM_TOPICS) * 10 && System.currentTimeMillis() < startMs + 10000) {
PartitionMetricSample sample = sampleQueue.poll();
if (sample != null) {
numSamples++;
}
}
int expectedNumSamples = NUM_TOPICS * NUM_PARTITIONS;
assertEquals("Only see " + numSamples + " samples. Expecting " + expectedNumSamples + " samples",
expectedNumSamples, numSamples);
fetcherManager.shutdown();
}
public KarelDbLeaderElector(KarelDbConfig config, KarelDbEngine engine) throws KarelDbElectionException {
try {
this.engine = engine;
this.clientId = "kdb-" + KDB_CLIENT_ID_SEQUENCE.getAndIncrement();
this.myIdentity = findIdentity(
config.getList(KarelDbConfig.LISTENERS_CONFIG),
config.getBoolean(KarelDbConfig.LEADER_ELIGIBILITY_CONFIG));
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().tags(metricsTags);
List<MetricsReporter> reporters = Collections.singletonList(new JmxReporter(JMX_PREFIX));
Time time = Time.SYSTEM;
ClientConfig clientConfig = new ClientConfig(config.originalsWithPrefix("kafkacache."), false);
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = clientConfig.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
String groupId = config.getString(KarelDbConfig.CLUSTER_GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[KarelDB clientId=" + clientId + ", groupId="
+ groupId + "] ");
this.metadata = new Metadata(
retryBackoffMs,
clientConfig.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
logContext,
new ClusterResourceListeners()
);
List<String> bootstrapServers
= config.getList(KarelDbConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(bootstrapServers,
clientConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.bootstrap(addresses);
String metricGrpPrefix = "kareldb";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(clientConfig, time);
long maxIdleMs = clientConfig.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG);
NetworkClient netClient = new NetworkClient(
new Selector(maxIdleMs, metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
clientConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
clientConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
clientConfig.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
clientConfig.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
clientConfig.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
ClientDnsLookup.forConfig(clientConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
new ApiVersions(),
logContext);
this.client = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
clientConfig.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
Integer.MAX_VALUE
);
this.coordinator = new KarelDbCoordinator(
logContext,
this.client,
groupId,
300000, // Default MAX_POLL_INTERVAL_MS_CONFIG
10000, // Default SESSION_TIMEOUT_MS_CONFIG)
3000, // Default HEARTBEAT_INTERVAL_MS_CONFIG
metrics,
metricGrpPrefix,
time,
retryBackoffMs,
myIdentity,
this
);
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
initTimeout = config.getInt(KarelDbConfig.KAFKACACHE_INIT_TIMEOUT_CONFIG);
LOG.debug("Group member created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
stop(true);
// now propagate the exception
throw new KarelDbElectionException("Failed to construct kafka consumer", t);
}
}
@Before
public void setup() {
this.time = new MockTime();
this.metadata = new Metadata(0, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners());
this.client = new MockClient(time, new MockClient.MockMetadataUpdater() {
@Override
public List<Node> fetchNodes() {
return cluster.nodes();
}
@Override
public boolean isUpdateNeeded() {
return false;
}
@Override
public void update(Time time, MockClient.MetadataUpdate update) {
throw new UnsupportedOperationException();
}
});
LogContext logContext = new LogContext();
this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, Integer.MAX_VALUE);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.coordinator = new KarelDbCoordinator(
logContext,
consumerClient,
groupId,
rebalanceTimeoutMs,
sessionTimeoutMs,
heartbeatIntervalMs,
metrics,
"kdb-" + groupId,
time,
retryBackoffMs,
LEADER_INFO,
rebalanceListener
);
}
/**
* The executor class that execute the proposals generated by optimizer.
* Package private for unit test.
*
* @param config The configurations for Cruise Control.
*/
Executor(KafkaCruiseControlConfig config,
Time time,
MetricRegistry dropwizardMetricRegistry,
MetadataClient metadataClient,
long demotionHistoryRetentionTimeMs,
long removalHistoryRetentionTimeMs,
ExecutorNotifier executorNotifier,
UserTaskManager userTaskManager,
AnomalyDetector anomalyDetector) {
String zkUrl = config.getString(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
_numExecutionStopped = new AtomicInteger(0);
_numExecutionStoppedByUser = new AtomicInteger(0);
_executionStoppedByUser = new AtomicBoolean(false);
_ongoingExecutionIsBeingModified = new AtomicBoolean(false);
_numExecutionStartedInKafkaAssignerMode = new AtomicInteger(0);
_numExecutionStartedInNonKafkaAssignerMode = new AtomicInteger(0);
_isKafkaAssignerMode = false;
_config = config;
// Register gauge sensors.
registerGaugeSensors(dropwizardMetricRegistry);
_time = time;
boolean zkSecurityEnabled = config.getBoolean(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG);
_kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zkUrl, ZK_EXECUTOR_METRIC_GROUP, ZK_EXECUTOR_METRIC_TYPE,
zkSecurityEnabled);
_adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(config));
_executionTaskManager = new ExecutionTaskManager(_adminClient, dropwizardMetricRegistry, time, config);
_metadataClient = metadataClient != null ? metadataClient
: new MetadataClient(config,
new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners()),
-1L,
time);
_defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
_leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG);
_requestedExecutionProgressCheckIntervalMs = null;
_proposalExecutor =
Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG));
_latestDemoteStartTimeMsByBrokerId = new ConcurrentHashMap<>();
_latestRemoveStartTimeMsByBrokerId = new ConcurrentHashMap<>();
_executorState = ExecutorState.noTaskInProgress(recentlyDemotedBrokers(), recentlyRemovedBrokers());
_stopSignal = new AtomicInteger(NO_STOP_EXECUTION);
_hasOngoingExecution = false;
_uuid = null;
_reasonSupplier = null;
_executorNotifier = executorNotifier != null ? executorNotifier
: config.getConfiguredInstance(ExecutorConfig.EXECUTOR_NOTIFIER_CLASS_CONFIG,
ExecutorNotifier.class);
_userTaskManager = userTaskManager;
_anomalyDetector = anomalyDetector;
_demotionHistoryRetentionTimeMs = demotionHistoryRetentionTimeMs;
_removalHistoryRetentionTimeMs = removalHistoryRetentionTimeMs;
_executionHistoryScannerExecutor = Executors.newSingleThreadScheduledExecutor(
new KafkaCruiseControlThreadFactory("ExecutionHistoryScanner", true, null));
_executionHistoryScannerExecutor.scheduleAtFixedRate(new ExecutionHistoryScanner(),
EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS,
EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS,
TimeUnit.SECONDS);
}
@Test
public void testSimpleFetch() throws InterruptedException {
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getLoadMonitorProperties());
Metadata metadata = new Metadata(METADATA_REFRESH_BACKOFF,
METADATA_EXPIRY_MS,
new LogContext(),
new ClusterResourceListeners());
MetadataClient metadataClient = new MetadataClient(config, metadata, -1L, TIME);
MockPartitionMetricSampleAggregator mockPartitionMetricSampleAggregator =
new MockPartitionMetricSampleAggregator(config, metadata);
KafkaBrokerMetricSampleAggregator mockBrokerMetricSampleAggregator =
EasyMock.mock(KafkaBrokerMetricSampleAggregator.class);
MetricRegistry dropwizardMetricRegistry = new MetricRegistry();
MetricSampler sampler = new MockSampler(0);
MetricFetcherManager fetcherManager =
new MetricFetcherManager(config, mockPartitionMetricSampleAggregator, mockBrokerMetricSampleAggregator,
metadataClient, METRIC_DEF, TIME, dropwizardMetricRegistry, null, sampler);
LoadMonitorTaskRunner loadMonitorTaskRunner =
new LoadMonitorTaskRunner(config, fetcherManager, mockPartitionMetricSampleAggregator,
mockBrokerMetricSampleAggregator, metadataClient, null, TIME);
while (metadata.fetch().topics().size() < NUM_TOPICS) {
Thread.sleep(10);
metadataClient.refreshMetadata();
}
loadMonitorTaskRunner.start(true);
Set<TopicPartition> partitionsToSample = new HashSet<>(NUM_TOPICS * NUM_PARTITIONS);
for (int i = 0; i < NUM_TOPICS; i++) {
for (int j = 0; j < NUM_PARTITIONS; j++) {
partitionsToSample.add(new TopicPartition("topic-" + i, j));
}
}
long startMs = System.currentTimeMillis();
BlockingQueue<PartitionMetricSample> sampleQueue = mockPartitionMetricSampleAggregator.metricSampleQueue();
while (!partitionsToSample.isEmpty() && System.currentTimeMillis() < startMs + 10000) {
PartitionMetricSample sample = sampleQueue.poll();
if (sample != null) {
assertTrue("The topic partition should have been sampled and sampled only once.",
partitionsToSample.contains(sample.entity().tp()));
partitionsToSample.remove(sample.entity().tp());
}
}
assertTrue("Did not see sample for partitions " + Arrays.toString(partitionsToSample.toArray()),
partitionsToSample.isEmpty());
fetcherManager.shutdown();
assertTrue(sampleQueue.isEmpty());
}