下面列出了java.util.concurrent.ScheduledExecutorService#submit ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void futureCancelTest() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(2);
BlockingTestRunnable btr = new BlockingTestRunnable();
try {
final Future<?> f = scheduler.submit(btr);
new Thread(new Runnable() {
@Override
public void run() {
TestUtils.sleep(DELAY_TIME);
f.cancel(true);
}
}).start();
try {
f.get();
fail("exception should have been thrown");
} catch (CancellationException e) {
// expected
}
} finally {
btr.unblock();
scheduler.shutdownNow();
}
}
/**
* The executeCDFW method first call the schedule method to get the schedule list of the CDFW.
* Then, it invokes the buildCDFWJob method to build the job object for the scheduled graphs.
*/
public void executeCDFW(DataFlowGraph... graph) {
if (!(driverState == DriverState.JOB_FINISHED || driverState == DriverState.INITIALIZE)) {
// now we need to send messages
throw new RuntimeException("Invalid state to execute a job: " + driverState);
}
CDFWScheduler cdfwScheduler = new CDFWScheduler(this.executionEnv.getWorkerInfoList());
Map<DataFlowGraph, Set<Integer>> scheduleGraphMap = cdfwScheduler.schedule(graph);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(scheduleGraphMap.size());
for (Map.Entry<DataFlowGraph, Set<Integer>> entry : scheduleGraphMap.entrySet()) {
CDFWExecutorTask cdfwSchedulerTask = new CDFWExecutorTask(entry.getKey(), entry.getValue());
executor.submit(cdfwSchedulerTask);
}
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new Twister2RuntimeException(e);
} finally {
executor.shutdown();
}
}
public void testInvocationOfSuperInterfaceMethod() throws Exception {
final boolean[] testResult = new boolean[1];
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
if (method.getName().equals("submit")) {
testResult[0] = true;
}
return null;
}
};
ScheduledExecutorService service = (ScheduledExecutorService) Proxy.newProxyInstance(
ProxyTest.class.getClassLoader(),
new Class<?>[] { ScheduledExecutorService.class },
handler);
// Invoke submit(), which is defined in ScheduledExecutorService's super-interface,
// ExecutorService.
service.submit(new Runnable() {
@Override
public void run() {}
});
assertTrue("proxied submit method not invoked", testResult[0]);
}
public void start() {
final InternalResourceManager manager = this.cache.getResourceManager();
ScheduledExecutorService ex = manager.getExecutor();
synchronized (this.futureLock) {
manager.addInProgressRebalance(this);
future = ex.submit(new Callable<RebalanceResults>() {
public RebalanceResults call() {
SystemFailure.checkFailure();
cache.getCancelCriterion().checkCancelInProgress(null);
try {
return RebalanceOperationImpl.this.call();
}
catch (RuntimeException e) {
cache.getLogger().fine(
"Unexpected exception in rebalancing", e);
throw e;
} finally {
manager.removeInProgressRebalance(RebalanceOperationImpl.this);
}
}
});
}
}
/**
* スレッドを開始します
*/
private void startThread() {
ScheduledExecutorService executor = ThreadManager.getExecutorService();
// プロキシサーバーを開始する
executor.submit(new ProxyServer(AppConfig.get().getListenPort(), this.shell));
// 非同期で画面を更新するスレッド
executor.scheduleAtFixedRate(new AsyncExecApplicationMain(this), 0, 1, TimeUnit.SECONDS);
// 非同期でログを出すスレッド
executor.scheduleWithFixedDelay(new AsyncExecConsole(this.console), 0, 500, TimeUnit.MILLISECONDS);
// サウンドを出すスレッド
executor.scheduleWithFixedDelay(new PlayerThread(), 0, 500, TimeUnit.MILLISECONDS);
// アップデートチェックする
if (AppConfig.get().isCheckUpdate()) {
executor.submit(new AsyncExecUpdateCheck(this.shell));
}
}
public void start() {
final InternalResourceManager manager = this.cache.getResourceManager();
ScheduledExecutorService ex = manager.getExecutor();
synchronized (this.futureLock) {
manager.addInProgressRebalance(this);
future = ex.submit(new Callable<RebalanceResults>() {
public RebalanceResults call() {
SystemFailure.checkFailure();
cache.getCancelCriterion().checkCancelInProgress(null);
try {
return RebalanceOperationImpl.this.call();
}
catch (RuntimeException e) {
cache.getLogger().fine(
"Unexpected exception in rebalancing", e);
throw e;
} finally {
manager.removeInProgressRebalance(RebalanceOperationImpl.this);
}
}
});
}
}
static Disposable directSchedule(ScheduledExecutorService exec,
Runnable task,
@Nullable Disposable parent,
long delay,
TimeUnit unit) {
task = onSchedule(task);
SchedulerTask sr = new SchedulerTask(task, parent);
Future<?> f;
if (delay <= 0L) {
f = exec.submit((Callable<?>) sr);
}
else {
f = exec.schedule((Callable<?>) sr, delay, unit);
}
sr.setFuture(f);
return sr;
}
@Test(timeout = 10000)
public void testStart() throws InterruptedException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessorNode.class.getResource("/conf/nifi.properties").getFile());
final ProcessorThatThrowsExceptionOnScheduled processor = new ProcessorThatThrowsExceptionOnScheduled();
final String uuid = UUID.randomUUID().toString();
ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, null, null, null, null);
processor.initialize(initContext);
final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null,
NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY, Mockito.mock(ComponentLog.class));
final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true);
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, null);
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
@Override
public void postMonitor() {
}
@Override
public Future<?> invokeMonitoringTask(final Callable<?> task) {
return taskScheduler.submit(task);
}
@Override
public void trigger() {
Assert.fail("Should not have completed");
}
};
procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback);
Thread.sleep(1000L);
assertEquals(1, processor.onScheduledCount);
assertEquals(1, processor.onUnscheduledCount);
assertEquals(1, processor.onStoppedCount);
}
@Test (expected = ExecutionException.class)
public void futureGetExecutionFail() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
Future<?> f = scheduler.submit(new TestRuntimeFailureRunnable());
f.get();
fail("Exception should have been thrown");
} finally {
scheduler.shutdownNow();
}
}
@Test
public void submitCallableTest() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
TestCallable tc = new TestCallable(0);
Future<Object> f = scheduler.submit(tc);
assertTrue(f.get() == tc.getReturnedResult());
} finally {
scheduler.shutdownNow();
}
}
/**
* This test ensures that the flush() on a segment is released only after sealed segment callback is invoked.
* The callback implemented in EventStreamWriter appends this segment to its sealedSegmentQueue.
*/
@Test(timeout = 10000)
public void testFlushIsBlockedUntilCallBackInvoked() throws Exception {
// Segment sealed callback will finish execution only when the latch is released;
ReusableLatch latch = new ReusableLatch(false);
final Consumer<Segment> segmentSealedCallback = segment -> Exceptions.handleInterrupted(() -> latch.await());
UUID cid = UUID.randomUUID();
PravegaNodeUri uri = new PravegaNodeUri("endpoint", SERVICE_PORT);
MockConnectionFactoryImpl cf = new MockConnectionFactoryImpl();
cf.setExecutor(executorService());
MockController controller = new MockController(uri.getEndpoint(), uri.getPort(), cf, true);
ClientConnection connection = mock(ClientConnection.class);
cf.provideConnection(uri, connection);
InOrder order = Mockito.inOrder(connection);
SegmentOutputStreamImpl output = new SegmentOutputStreamImpl(SEGMENT, true, controller, cf, cid, segmentSealedCallback,
RETRY_SCHEDULE, DelegationTokenProviderFactory.createWithEmptyToken());
output.reconnect();
order.verify(connection).send(new SetupAppend(output.getRequestId(), cid, SEGMENT, ""));
cf.getProcessor(uri).appendSetup(new AppendSetup(output.getRequestId(), SEGMENT, cid, 0));
ByteBuffer data = getBuffer("test");
CompletableFuture<Void> ack = new CompletableFuture<>();
output.write(PendingEvent.withoutHeader(null, data, ack));
order.verify(connection).send(new Append(SEGMENT, cid, 1, 1, Unpooled.wrappedBuffer(data), null, output.getRequestId()));
assertEquals(false, ack.isDone());
@Cleanup("shutdownNow")
ScheduledExecutorService executor = ExecutorServiceHelpers.newScheduledThreadPool(1, "netty-callback");
//simulate a SegmentIsSealed WireCommand from SegmentStore.
executor.submit(() -> cf.getProcessor(uri).segmentIsSealed(new WireCommands.SegmentIsSealed(output.getRequestId(), SEGMENT, "SomeException", 1)));
AssertExtensions.assertBlocks(() -> {
AssertExtensions.assertThrows(SegmentSealedException.class, () -> output.flush());
}, () -> latch.release());
AssertExtensions.assertThrows(SegmentSealedException.class, () -> output.flush());
}
/**
* Asynchronously executes a PUT on the resource with a payload and provides the result to a given callback.
*
* @param payloadCallbackPair object which holds the payload and callback process the PUT request
* @param scheduler scheduler to be used for sending commands
*/
public void asyncPut(PayloadCallbackPair payloadCallbackPair, ScheduledExecutorService scheduler) {
synchronized (this.commandsQueue) {
if (this.commandsQueue.isEmpty()) {
this.commandsQueue.offer(payloadCallbackPair);
if (job == null || job.isDone()) {
job = scheduler.submit(() -> executeCommands());
}
} else {
this.commandsQueue.offer(payloadCallbackPair);
}
}
}
@Test (expected = TimeoutException.class)
public void futureGetTimeoutFail() throws InterruptedException, ExecutionException, TimeoutException {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
TestCallable tc = new TestCallable(100);
Future<Object> f = scheduler.submit(tc);
f.get(1, TimeUnit.MILLISECONDS);
fail("Exception should have been thrown");
} finally {
scheduler.shutdownNow();
}
}
@Test
public void submitWithResultTest() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
Object expectedResult = new Object();
Future<Object> f = scheduler.submit(DoNothingRunnable.instance(), expectedResult);
assertTrue(f.get() == expectedResult);
} finally {
scheduler.shutdownNow();
}
}
static Disposable workerSchedule(ScheduledExecutorService exec,
Disposable.Composite tasks,
Runnable task,
long delay,
TimeUnit unit) {
task = onSchedule(task);
WorkerTask sr = new WorkerTask(task, tasks);
if (!tasks.add(sr)) {
throw Exceptions.failWithRejected();
}
try {
Future<?> f;
if (delay <= 0L) {
f = exec.submit((Callable<?>) sr);
}
else {
f = exec.schedule((Callable<?>) sr, delay, unit);
}
sr.setFuture(f);
}
catch (RejectedExecutionException ex) {
sr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
return sr;
}
@Test(expected = SafeScheduledExecutorServiceRethrowsException.class)
public void testSubmitReturnFutureThrowsException() throws Throwable {
ScheduledExecutorService executorService = new SafeScheduledExecutorService(1, "test");
Future<?> future = executorService.submit(new RunnableWhichThrows());
try {
future.get();
} catch (ExecutionException e) {
throw e.getCause();
}
}
@Test
public void testReceiveRecords() throws Exception {
String hostname = TLSTestUtils.getHostname();
File testDir = new File("target", UUID.randomUUID().toString()).getAbsoluteFile();
File keyStore = new File(testDir, "keystore.jks");
final File trustStore = new File(testDir, "truststore.jks");
final Configs configs = new Configs();
configs.appId = () -> "appId";
configs.tlsConfigBean.tlsEnabled = false;
configs.tlsConfigBean.keyStoreFilePath = keyStore.toString();
configs.tlsConfigBean.keyStorePassword = () -> "keystore";
configs.port = randomPort;
configs.maxWaitTimeSecs = 5;
configs.maxRecordSize = 10000;
Source source = new SdcIpcWithDiskBufferSource(configs, 900, 100, 1000);
final SourceRunner runner = new SourceRunner.Builder(SdcIpcWithDiskBufferDSource.class, source).addOutputLane("lane").build();
try {
runner.runInit();
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
//valid init
Future<Boolean> future = executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
HttpURLConnection conn = getConnection(Constants.IPC_PATH, configs.appId, runner.getContext(),
TLSTestUtils.getHostname() + ":" + configs.port, false,
trustStore.toString(), "truststore");
conn.setRequestMethod("GET");
conn.setDefaultUseCaches(false);
conn.setDoOutput(false);
return conn.getResponseCode() == HttpURLConnection.HTTP_OK &&
Constants.X_SDC_PING_VALUE.equals(conn.getHeaderField(Constants.X_SDC_PING_HEADER));
}
});
Assert.assertTrue(future.get(5, TimeUnit.SECONDS));
//valid IPC
future = executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Record r1 = RecordCreator.create();
r1.set(Field.create(true));
Record r2 = RecordCreator.create();
r2.set(Field.create(false));
List<Record> records = ImmutableList.of(r1, r2);
return sendRecords(configs.appId, runner.getContext(), TLSTestUtils.getHostname() + ":" + configs.port, false,
trustStore.toString(), "truststore", false, records);
}
});
StageRunner.Output output = runner.runProduce(null, 10);
Assert.assertNotNull(output.getNewOffset());
Assert.assertEquals(2, output.getRecords().get("lane").size());
Assert.assertTrue(runner.getErrorRecords().isEmpty());
Assert.assertTrue(runner.getErrors().isEmpty());
Assert.assertTrue(output.getRecords().get("lane").get(0).get().getValueAsBoolean());
Assert.assertFalse(output.getRecords().get("lane").get(1).get().getValueAsBoolean());
Assert.assertTrue(future.get(5, TimeUnit.SECONDS));
} finally {
runner.runDestroy();
}
}
@Test
public void fixedPoolTwoAcquire()
throws ExecutionException, InterruptedException, IOException {
final ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
int echoServerPort = SocketUtils.findAvailableTcpPort();
TcpClientTests.EchoServer echoServer = new TcpClientTests.EchoServer(echoServerPort);
java.util.concurrent.Future<?> f1 = null;
java.util.concurrent.Future<?> f2 = null;
ScheduledFuture<?> sf = null;
try {
final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", echoServerPort);
ConnectionProvider pool = ConnectionProvider.create("fixedPoolTwoAcquire", 2);
Supplier<? extends SocketAddress> remoteAddress = () -> address;
ConnectionObserver observer = ConnectionObserver.emptyListener();
EventLoopGroup group = new NioEventLoopGroup(2);
ClientTransportConfig<?> config =
new ClientTransportConfigImpl(group, pool, Collections.emptyMap(), remoteAddress);
//fail a couple
StepVerifier.create(pool.acquire(config, observer, remoteAddress, config.resolver()))
.verifyErrorMatches(msg -> msg.getMessage().contains("Connection refused"));
StepVerifier.create(pool.acquire(config, observer, remoteAddress, config.resolver()))
.verifyErrorMatches(msg -> msg.getMessage().contains("Connection refused"));
//start the echo server
f1 = service.submit(echoServer);
Thread.sleep(100);
//acquire 2
final PooledConnection c1 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver())
.block(Duration.ofSeconds(30));
assertThat(c1).isNotNull();
final PooledConnection c2 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver())
.block(Duration.ofSeconds(30));
assertThat(c2).isNotNull();
//make room for 1 more
c2.disposeNow();
final PooledConnection c3 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver())
.block(Duration.ofSeconds(30));
assertThat(c3).isNotNull();
//next one will block until a previous one is released
long start = System.currentTimeMillis();
sf = service.schedule(() -> c1.onStateChange(c1, ConnectionObserver.State.DISCONNECTING), 500, TimeUnit.MILLISECONDS);
final PooledConnection c4 = (PooledConnection) pool.acquire(config, observer, remoteAddress, config.resolver())
.block(Duration.ofSeconds(30));
assertThat(c4).isNotNull();
long end = System.currentTimeMillis();
assertThat(end - start)
.as("channel4 acquire blocked until channel1 released")
.isGreaterThanOrEqualTo(500);
c3.onStateChange(c3, ConnectionObserver.State.DISCONNECTING);
c4.onStateChange(c4, ConnectionObserver.State.DISCONNECTING);
assertThat(c1).isEqualTo(c4);
assertThat(c1.pool).isEqualTo(c2.pool)
.isEqualTo(c3.pool)
.isEqualTo(c4.pool);
InstrumentedPool<PooledConnection> defaultPool = c1.pool;
CountDownLatch latch = new CountDownLatch(1);
f2 = service.submit(() -> {
while (defaultPool.metrics().acquiredSize() > 0) {
LockSupport.parkNanos(100);
}
latch.countDown();
});
assertThat(latch.await(5, TimeUnit.SECONDS))
.as("activeConnections fully released")
.isTrue();
}
finally {
service.shutdownNow();
echoServer.close();
assertThat(f1).isNotNull();
assertThat(f1.get()).isNull();
assertThat(f2).isNotNull();
assertThat(f2.get()).isNull();
assertNotNull(sf);
assertThat(sf.get()).isNull();
}
}
private static void multiInstanceListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
final HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(JDBC_URL);
hikariConfig.setUsername(JDBC_USERNAME);
hikariConfig.setPassword(JDBC_PASSWORD);
final DataSource dataSource = new HikariDataSource(hikariConfig);
final ZignAccessTokenProvider accessTokenProvider = new ZignAccessTokenProvider();
final AtomicInteger name = new AtomicInteger();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
for (int i = 0; i < 12; i++) {
final String instanceName = "consumer-" + name.getAndIncrement();
final JdbcPartitionManager partitionManager = new JdbcPartitionManager(dataSource, "fahrschein-demo");
final JdbcCursorManager cursorManager = new JdbcCursorManager(dataSource, "fahrschein-demo");
final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI)
.withAccessTokenProvider(accessTokenProvider)
.withCursorManager(cursorManager)
.build();
final List<Partition> partitions = nakadiClient.getPartitions(SALES_ORDER_SERVICE_ORDER_PLACED);
final IORunnable instance = () -> {
final IORunnable runnable = () -> {
final Optional<Lock> optionalLock = partitionManager.lockPartitions(SALES_ORDER_SERVICE_ORDER_PLACED, partitions, instanceName);
if (optionalLock.isPresent()) {
final Lock lock = optionalLock.get();
try {
nakadiClient.stream(SALES_ORDER_SERVICE_ORDER_PLACED)
.withLock(lock)
.withObjectMapper(objectMapper)
.withStreamParameters(new StreamParameters().withStreamLimit(10))
.withBackoffStrategy(new NoBackoffStrategy())
.listen(SalesOrderPlaced.class, listener);
} finally {
partitionManager.unlockPartitions(lock);
}
}
};
scheduledExecutorService.scheduleWithFixedDelay(runnable.unchecked(), 0, 1, TimeUnit.SECONDS);
};
scheduledExecutorService.submit(instance.unchecked());
}
try {
Thread.sleep(60L*1000);
scheduledExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Test(timeout = 2000)
public void noCrossCancel() throws Exception {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(2);
try {
Future<Integer> f = exec.schedule(() -> 1, 1500, TimeUnit.MILLISECONDS);
TestSubscriber<Integer> ts = new TestSubscriber<>();
exec.submit(() -> new PublisherFuture<>(f).subscribe(ts));
Thread.sleep(500);
ts.cancel();
Thread.sleep(100);
Assert.assertFalse("Future done?", f.isDone());
Assert.assertFalse("Future cancelled?", f.isCancelled());
ts.assertNoValues()
.assertNoError()
.assertNotComplete();
} finally {
exec.shutdown();
}
}