下面列出了怎么用org.apache.kafka.common.utils.Time的API类实例代码及写法,或者点击链接到github查看源代码。
static Worker createWorker(String workerId,
Time time,
Plugins plugins,
WorkerConfig config,
OffsetBackingStore offsetBackingStore,
Object connectorClientConfigOverridePolicy) throws ConnectException {
if (CTR_WORKER_22 == null) {
return new Worker(workerId, time, plugins, config, offsetBackingStore, (ConnectorClientConfigOverridePolicy)connectorClientConfigOverridePolicy);
}
try {
return (Worker)CTR_WORKER_22.newInstance(workerId, time, plugins, config, offsetBackingStore);
} catch (Throwable t) {
throw new ConnectException(t);
}
}
public KafkaStore(
String bootstrapServers,
Serializer<K, V> serializer,
String topic,
Collection<StoreUpdateObserver<K, V>> observers,
Time time,
Map<String, Object> additionalProducerProps,
Map<String, Object> additionalConsumerProps) {
this.serializer = serializer;
this.topic = topic;
this.observers = new ArrayList<>(observers);
localStore = new HashMap<>();
kafkaLog = createKafkaLog(bootstrapServers, topic, time, additionalProducerProps, additionalConsumerProps);
kafkaLog.start();
}
@SuppressWarnings({"rawtypes", "unchecked"})
private MiniKafkaCluster(List<String> brokerIds)
throws IOException, InterruptedException {
this.zkServer = new EmbeddedZooKeeper();
this.tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "mini-kafka-cluster");
this.kafkaServer = new ArrayList<>();
int port = 0;
for (String id : brokerIds) {
port = getAvailablePort();
KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port));
Seq seq =
scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq));
}
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + port);
adminClient = AdminClient.create(props);
}
void maybeElectLeader() throws Exception {
if (!_preferredLeaderElectionRequested) {
return;
}
try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(), com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS,
com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener", null)) {
if (!zkClient.reassignPartitionsInProgress()) {
List<TopicPartitionInfo> partitionInfoList = _adminClient
.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
LOGGER.info(
"MultiClusterTopicManagementService will trigger requested preferred leader election for the"
+ " topic {} in cluster.", _topic);
triggerPreferredLeaderElection(partitionInfoList, _topic);
_preferredLeaderElectionRequested = false;
}
}
}
GroupMetadataManager(OffsetConfig offsetConfig,
ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder,
ReaderBuilder<ByteBuffer> metadataTopicConsumerBuilder,
ScheduledExecutorService scheduler,
Time time,
Function<String, Integer> partitioner) {
this.offsetConfig = offsetConfig;
this.compressionType = offsetConfig.offsetsTopicCompressionType();
this.groupMetadataCache = new ConcurrentHashMap<>();
this.groupMetadataTopicPartitionCount = offsetConfig.offsetsTopicNumPartitions();
this.metadataTopicProducerBuilder = metadataTopicProducerBuilder;
this.metadataTopicReaderBuilder = metadataTopicConsumerBuilder;
this.scheduler = scheduler;
this.time = time;
this.partitioner = partitioner;
}
@Test
public void testRequestExpiry() throws Exception {
long expiration = 20L;
long start = Time.SYSTEM.hiResClockMs();
MockDelayedOperation r1 = new MockDelayedOperation(expiration);
MockDelayedOperation r2 = new MockDelayedOperation(200000L);
assertFalse(
"r1 not satisfied and hence watched",
purgatory.tryCompleteElseWatch(r1, Lists.newArrayList("test1")));
assertFalse(
"r2 not satisfied and hence watched",
purgatory.tryCompleteElseWatch(r2, Lists.newArrayList("test2")));
r1.awaitExpiration();
long elapsed = Time.SYSTEM.hiResClockMs() - start;
assertTrue
("r1 completed due to expiration",
r1.isCompleted());
assertFalse("r2 hasn't completed", r2.isCompleted());
assertTrue(
"Time for expiration $elapsed should at least " + expiration,
elapsed >= expiration);
}
private ByteBuffer newMemoryRecordsBuffer(List<SimpleRecord> records,
long producerId,
short producerEpoch,
boolean isTxnOffsetCommit) {
TimestampType timestampType = TimestampType.CREATE_TIME;
long timestamp = Time.SYSTEM.milliseconds();
ByteBuffer buffer = ByteBuffer.allocate(
AbstractRecords.estimateSizeInBytes(
RecordBatch.CURRENT_MAGIC_VALUE, offsetConfig.offsetsTopicCompressionType(), records
)
);
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer, RecordBatch.CURRENT_MAGIC_VALUE, offsetConfig.offsetsTopicCompressionType(),
timestampType, 0L, timestamp,
producerId,
producerEpoch,
0,
isTxnOffsetCommit,
RecordBatch.NO_PARTITION_LEADER_EPOCH
);
records.forEach(builder::append);
return builder.build().buffer();
}
private int completeTransactionalOffsetCommit(ByteBuffer buffer,
long producerId,
short producerEpoch,
long baseOffset,
boolean isCommit) {
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.LOG_APPEND_TIME, baseOffset, Time.SYSTEM.milliseconds(),
producerId, producerEpoch, 0, true, true,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
ControlRecordType controlRecordType;
if (isCommit) {
controlRecordType = ControlRecordType.COMMIT;
} else {
controlRecordType = ControlRecordType.ABORT;
}
builder.appendEndTxnMarker(Time.SYSTEM.milliseconds(), new EndTransactionMarker(controlRecordType, 0));
builder.build();
return 1;
}
/**
* The constructor of The Execution task manager.
*
* @param adminClient The adminClient use to query logdir information of replicas.
* @param dropwizardMetricRegistry The metric registry.
* @param time The time object to get the time.
* @param config config object that holds all Kafka Cruise control related configs
*/
public ExecutionTaskManager(AdminClient adminClient,
MetricRegistry dropwizardMetricRegistry,
Time time,
KafkaCruiseControlConfig config) {
_inProgressInterBrokerReplicaMovementsByBrokerId = new HashMap<>();
_inProgressIntraBrokerReplicaMovementsByBrokerId = new HashMap<>();
_inProgressPartitionsForInterBrokerMovement = new HashSet<>();
_executionTaskTracker = new ExecutionTaskTracker(dropwizardMetricRegistry, time);
_executionTaskPlanner = new ExecutionTaskPlanner(adminClient, config);
_defaultInterBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
_defaultIntraBrokerPartitionMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG);
_defaultLeadershipMovementConcurrency = config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG);
_maxNumClusterMovementConcurrency = config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG);
_brokersToSkipConcurrencyCheck = new HashSet<>();
_isKafkaAssignerMode = false;
_requestedInterBrokerPartitionMovementConcurrency = null;
_requestedIntraBrokerPartitionMovementConcurrency = null;
_requestedLeadershipMovementConcurrency = null;
}
@Override
public NetworkClient createNetworkClient(long connectionMaxIdleMS,
Metrics metrics,
Time time,
String metricGrpPrefix,
ChannelBuilder channelBuilder,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
boolean discoverBrokerVersions,
ApiVersions apiVersions) {
return new NetworkClient(new Selector(connectionMaxIdleMS, metrics, time, metricGrpPrefix, channelBuilder, new LogContext()),
metadata, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, discoverBrokerVersions, apiVersions, new LogContext());
}
private List<String> getTopicList(List<String> ignoreTopicList) throws Exception {
List<String> ret = new ArrayList<String>();
int sessionTimeout = 5000;
int connectionTimeout = 10000;
ZooKeeperClient zookeeperClient = new ZooKeeperClient(zookeeperConnect, sessionTimeout, connectionTimeout,
1, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty());
try (KafkaZkClient kafkaZkClient = new KafkaZkClient(zookeeperClient, true, Time.SYSTEM)) {
Iterator<String> iter = kafkaZkClient.getAllTopicsInCluster().iterator();
while (iter.hasNext()) {
String topic = iter.next();
if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) {
ret.add(topic);
}
}
}
return ret;
}
@Test
public void testLoadFailedBrokersFromZK() throws Exception {
Time mockTime = getMockTime();
Queue<Anomaly> anomalies = new PriorityBlockingQueue<>(ANOMALY_DETECTOR_INITIAL_QUEUE_SIZE, anomalyComparator());
BrokerFailureDetector detector = createBrokerFailureDetector(anomalies, mockTime);
try {
detector.startDetection();
int brokerId = 0;
killBroker(brokerId);
long start = System.currentTimeMillis();
while (anomalies.isEmpty() && System.currentTimeMillis() < start + 30000) {
// Wait for the anomalies to be drained.
}
assertEquals(Collections.singletonMap(brokerId, 100L), detector.failedBrokers());
// shutdown, advance the clock and create a new detector.
detector.shutdown();
mockTime.sleep(100L);
detector = createBrokerFailureDetector(anomalies, mockTime);
// start the newly created detector and the broker down time should remain previous time.
detector.startDetection();
assertEquals(Collections.singletonMap(brokerId, 100L), detector.failedBrokers());
} finally {
detector.shutdown();
}
}
@Test
void shouldKeepMetersWhenMetricsDoNotChange() {
//Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
kafkaMetrics = new KafkaMetrics(supplier);
MeterRegistry registry = new SimpleMeterRegistry();
kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
kafkaMetrics.checkAndBindMetrics(registry);
assertThat(registry.getMeters()).hasSize(1);
}
public ConnectEmbedded(Properties workerConfig, Properties... connectorConfigs) throws Exception {
Time time = new SystemTime();
DistributedConfig config = new DistributedConfig(Utils.propsToStringMap(workerConfig));
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
//not sure if this is going to work but because we don't have advertised url we can get at least a fairly random
String workerId = UUID.randomUUID().toString();
worker = new Worker(workerId, time, config, offsetBackingStore);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter());
configBackingStore.configure(config);
//advertisedUrl = "" as we don't have the rest server - hopefully this will not break anything
herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore, "");
this.connectorConfigs = connectorConfigs;
shutdownHook = new ShutdownHook();
}
private Keeper buildKeeper(WorkerConfig config, Worker worker, Time time, String restUrl, String bootMode) {
Keeper keeper;
if (BootMode.DISTRIBUTED.equals(bootMode)) {
keeper = new WorkerKeeper(
config,
time,
worker,
new TaskStatusManager(DataLinkFactory.getObject(TaskStatusService.class)),
new TaskConfigManager(config.getString(WorkerConfig.GROUP_ID_CONFIG), DataLinkFactory.getObject(TaskConfigService.class)),
restUrl
);
} else if (BootMode.STANDALONE.equals(bootMode)) {
keeper = new StandaloneWorkerKeeper(
config,
worker,
time,
new TaskConfigManager(config.getString(WorkerConfig.GROUP_ID_CONFIG), DataLinkFactory.getObject(TaskConfigService.class))
);
} else {
throw new DatalinkException("invalid boot mode : " + bootMode);
}
return keeper;
}
/**
* Creates a new network client with the given properties.
*
* @return A new network client with the given properties.
*/
NetworkClient createNetworkClient(long connectionMaxIdleMS,
Metrics metrics,
Time time,
String metricGrpPrefix,
ChannelBuilder channelBuilder,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
boolean discoverBrokerVersions,
ApiVersions apiVersions);
private KafkaZkClient createZkClient() {
// The calling method should make sure to close connection
return new KafkaZkClient(
new ZooKeeperClient(
kafkaZookeeper.getZookeeperConnectionString(),
zookeeperSettings.getZkSessionTimeoutMs(),
zookeeperSettings.getZkConnectionTimeoutMs(),
zookeeperSettings.getMaxInFlightRequests(),
Time.SYSTEM,
ZookeeperSettings.METRIC_GROUP,
ZookeeperSettings.METRIC_TYPE
),
false,
Time.SYSTEM
);
}
/**
* Package private constructor for unit tests w/o static state initialization.
*/
KafkaCruiseControl(KafkaCruiseControlConfig config,
Time time,
AnomalyDetector anomalyDetector,
Executor executor,
LoadMonitor loadMonitor,
ExecutorService goalOptimizerExecutor,
GoalOptimizer goalOptimizer) {
_config = config;
_time = time;
_anomalyDetector = anomalyDetector;
_executor = executor;
_loadMonitor = loadMonitor;
_goalOptimizerExecutor = goalOptimizerExecutor;
_goalOptimizer = goalOptimizer;
}
/**
* here does not seem to be a public interface for embedding a Kafka connect runtime,
* therefore, this code is modeled from the behavior taken from
* https://github.com/apache/kafka/blob/2.1/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
* and performs the initialization in a roughly similar manner.
*
*/
private void init() {
LOG.info("Started worked initialization");
Time time = Time.SYSTEM;
// Initializes the system runtime information and logs some of the information
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
Properties props = kafkaConnectPropertyFactory.getProperties();
Map<String, String> standAloneProperties = Utils.propsToStringMap(props);
// Not needed, but we need this one to initialize the worker
Plugins plugins = new Plugins(standAloneProperties);
StandaloneConfig config = new StandaloneConfig(standAloneProperties);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
AllConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
RestServer rest = new RestServer(config);
rest.initializeServer();
/*
According to the Kafka source code "... Worker runs a (dynamic) set of tasks
in a set of threads, doing the work of actually moving data to/from Kafka ..."
*/
Worker worker = new Worker(bootstrapServer, time, plugins, config, new FileOffsetBackingStore(), allConnectorClientConfigOverridePolicy);
/*
From Kafka source code: " ... The herder interface tracks and manages workers
and connectors ..."
*/
herder = new StandaloneHerder(worker, kafkaClusterId, allConnectorClientConfigOverridePolicy);
connect = new Connect(herder, rest);
LOG.info("Finished initializing the worker");
}
UserTaskManager(long sessionExpiryMs,
int maxActiveUserTasks,
long completedUserTaskRetentionTimeMs,
int maxCachedCompletedUserTasks,
Time time) {
this(sessionExpiryMs, maxActiveUserTasks, completedUserTaskRetentionTimeMs, maxCachedCompletedUserTasks, time, new UUIDGenerator());
}
@Before
public void setup() {
this.timer = SystemTimer.builder()
.executorName("test")
.tickMs(1)
.wheelSize(3)
.startMs(Time.SYSTEM.hiResClockMs())
.build();
}
BootstrapTask(long startMs,
long endMs,
boolean clearMetrics,
MetadataClient metadataClient,
KafkaPartitionMetricSampleAggregator metricSampleAggregator,
LoadMonitorTaskRunner loadMonitorTaskRunner,
MetricFetcherManager metricFetcherManager,
SampleStore sampleStore,
int configuredNumSnapshots,
long configuredSnapshotWindowMs,
long samplingIntervalMs,
Time time) {
if (startMs < 0 || endMs < 0 || endMs <= startMs) {
throw new IllegalArgumentException(String.format("Invalid bootstrap time range [%d, %d]. The bootstrap end "
+ "time must be non negative and the end time "
+ "must be greater than start time.", startMs, endMs));
}
_mode = BootstrapMode.RANGE;
_startMs = startMs;
_endMs = endMs;
_clearMetrics = clearMetrics;
_metadataClient = metadataClient;
_metricSampleAggregator = metricSampleAggregator;
_loadMonitorTaskRunner = loadMonitorTaskRunner;
_metricFetcherManager = metricFetcherManager;
_sampleStore = sampleStore;
_configuredNumSnapshots = configuredNumSnapshots;
_configuredSnapshotWindowMs = configuredSnapshotWindowMs;
_samplingIntervalMs = samplingIntervalMs;
_time = time;
_bootstrappedRangeStartMs = startMs;
_bootstrappedRangeEndMs = startMs;
}
SourceRecordDequeImpl(Time time,
int maximumCapacity,
int batchSize,
int emptyWaitMs, int maximumCapacityWaitMs, int maximumCapacityTimeoutMs, RateLimiter writeRateLimit) {
super();
this.time = time;
this.batchSize = batchSize;
this.maximumCapacity = maximumCapacity;
this.emptyWaitMs = emptyWaitMs;
this.maximumCapacityWaitMs = maximumCapacityWaitMs;
this.maximumCapacityTimeoutMs = maximumCapacityTimeoutMs;
this.writeRateLimit = writeRateLimit;
}
public static void createTopic(String kafkaTopic, int numOfPartitions, String zkStr, String replicatorFactor) {
// TopicCommand.main() will call System.exit() finally, which will break maven-surefire-plugin
try {
String[] args = new String[]{"--create", "--zookeeper", zkStr, "--replication-factor", replicatorFactor,
"--partitions", String.valueOf(numOfPartitions), "--topic", kafkaTopic};
KafkaZkClient zkClient = KafkaZkClient
.apply(zkStr, false, 3000, 3000, Integer.MAX_VALUE, Time.SYSTEM, "kafka.server",
"SessionExpireListener");
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);
TopicCommand.createTopic(zkClient, opts);
} catch (TopicExistsException e) {
// Catch TopicExistsException otherwise it will break maven-surefire-plugin
System.out.println("Topic already existed");
}
}
void start() {
final Map<String, String> workerProps = new HashMap<>();
workerProps.put("bootstrap.servers", bootstrapServers);
workerProps.put("offset.flush.interval.ms", Integer.toString(offsetFlushInterval));
// These don't matter much (each connector sets its own converters), but need to be filled with valid classes.
workerProps.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter.schemas.enable", "false");
// Don't need it since we'll memory MemoryOffsetBackingStore.
workerProps.put("offset.storage.file.filename", "");
workerProps.put("plugin.path", pluginDir.getPath());
final Time time = Time.SYSTEM;
final String workerId = "test-worker";
final Plugins plugins = new Plugins(workerProps);
final StandaloneConfig config = new StandaloneConfig(workerProps);
final Worker worker = new Worker(
workerId, time, plugins, config, new MemoryOffsetBackingStore());
herder = new StandaloneHerder(worker);
final RestServer rest = new RestServer(config);
connect = new Connect(herder, rest);
connect.start();
}
@SuppressWarnings("unchecked")
ConnectStandalone(final Properties workerProperties) {
Time time = Time.SYSTEM;
LOGGER.info("Kafka Connect standalone worker initializing ...");
long initStart = time.hiResClockMs();
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
Map<String, String> workerProps = (Map) workerProperties;
LOGGER.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
StandaloneConfig config = new StandaloneConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
LOGGER.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
this.herder = new StandaloneHerder(worker, kafkaClusterId);
connectionString = advertisedUrl.toString() + herder.kafkaClusterId();
this.connect = new Connect(herder, rest);
LOGGER.info(
"Kafka Connect standalone worker initialization took {}ms",
time.hiResClockMs() - initStart);
}
BootstrapTask(boolean clearMetrics,
MetadataClient metadataClient,
KafkaPartitionMetricSampleAggregator metricSampleAggregator,
LoadMonitorTaskRunner loadMonitorTaskRunner,
MetricFetcherManager metricFetcherManager,
SampleStore sampleStore,
int configuredNumSnapshots,
long configuredSnapshotWindowMs,
long samplingIntervalMs,
Time time) {
_mode = BootstrapMode.RECENT;
_startMs = -1L;
_endMs = -1L;
_clearMetrics = clearMetrics;
_metadataClient = metadataClient;
_metricSampleAggregator = metricSampleAggregator;
_loadMonitorTaskRunner = loadMonitorTaskRunner;
_metricFetcherManager = metricFetcherManager;
_sampleStore = sampleStore;
_configuredNumSnapshots = configuredNumSnapshots;
_configuredSnapshotWindowMs = configuredSnapshotWindowMs;
_samplingIntervalMs = samplingIntervalMs;
_time = time;
long now = _time.milliseconds();
_bootstrappedRangeStartMs = now;
_bootstrappedRangeEndMs = now;
}
public Worker(String workerId, Time time, WorkerConfig config, TaskPositionManager taskPositionManager, TaskSyncStatusManager taskSyncStatusManager,
ProbeManager probeManager) {
this.executor = Executors.newCachedThreadPool(new NamedThreadFactory("Task-Container"));
this.workerId = workerId;
this.time = time;
this.config = config;
this.taskPositionManager = taskPositionManager;
this.taskSyncStatusManager = taskSyncStatusManager;
this.probeManager = probeManager;
}
public StandaloneWorkerKeeper(WorkerConfig config,
Worker worker,
Time time,
TaskConfigManager taskConfigManager
) {
this.workerConfig = config;
this.workerId = worker.workerId();
this.worker = worker;
this.time = time;
this.taskConfigManager = taskConfigManager;
this.stopping = new AtomicBoolean(false);
this.configState = ClusterConfigState.EMPTY;
this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
taskConfigManager.setUpdateListener(new ConfigUpdateListener());
}
@Test
public void oldEnough() throws IOException {
long timestamp = 1559653835123L;
this.inputFile.setLastModified(timestamp);
timestamp += 5000L;
Time time = time(timestamp);
InputFileDequeue.MinimumFileAgePredicate predicate = new InputFileDequeue.MinimumFileAgePredicate(
1000,
time
);
assertTrue(predicate.test(this.inputFile), "File should be old enough");
}