下面列出了java.util.concurrent.BlockingQueue#take ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly.
*/
@GwtIncompatible // concurrency
public static <E> E takeUninterruptibly(BlockingQueue<E> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Test
public void testActionPropertyBadEval() throws Exception {
ITool tool = new FastTool("", "BasicMultiTrace", "MCActionPropBadEval", new SimpleFilenameToStream());
StateVec initStates = tool.getInitStates();
ILiveCheck liveCheck = new NoOpLiveCheck(tool, "BasicMultiTrace");
BlockingQueue<SimulationWorkerResult> resultQueue = new LinkedBlockingQueue<>();
SimulationWorker worker = new SimulationWorker(0, tool, initStates, resultQueue, 0, 100, 100, false, null,
liveCheck, new LongAdder(), new LongAdder());
worker.start();
SimulationWorkerResult res = resultQueue.take();
assertTrue(res.isError());
SimulationWorkerError err = res.error();
assertEquals(EC.TLC_ACTION_PROPERTY_EVALUATION_FAILED, err.errorCode);
worker.join();
assertFalse(worker.isAlive());
}
@Test
public void testProvider() throws MalformedQueryException, InterruptedException, UnsupportedQueryException {
String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
+ "select ?id (count(?obs) as ?total) where {" // n
+ "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n
+ "?obs <uri:hasTime> ?time. " // n
+ "?obs <uri:hasId> ?id } group by ?id"; // n
BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>();
PeriodicNotificationCoordinatorExecutor coord = new PeriodicNotificationCoordinatorExecutor(2, notifications);
PeriodicNotificationProvider provider = new PeriodicNotificationProvider();
CreateFluoPcj pcj = new CreateFluoPcj();
String id = null;
try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, Sets.newHashSet(), fluo).getQueryId();
provider.processRegisteredNotifications(coord, fluo.newSnapshot());
}
TimestampedNotification notification = notifications.take();
Assert.assertEquals(5000, notification.getInitialDelay());
Assert.assertEquals(15000, notification.getPeriod());
Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit());
Assert.assertEquals(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notification.getId());
}
AckClosure removeAck(String guid) {
AckClosure ackClosure;
BlockingQueue<PubAck> pac;
TimerTask timerTask = null;
this.lock();
try {
ackClosure = pubAckMap.get(guid);
if (ackClosure != null) {
timerTask = ackClosure.ackTask;
pubAckMap.remove(guid);
}
pac = pubAckChan;
} finally {
this.unlock();
}
// Cancel timer if needed
if (timerTask != null) {
timerTask.cancel();
}
// Remove from channel to unblock async publish
if (ackClosure != null && pac.size() > 0) {
try {
// remove from queue to unblock publish
pac.take();
} catch (InterruptedException e) {
// TODO: Ignore, but re-evaluate this
}
}
return ackClosure;
}
@ParameterizedTest
@ArgumentsSource(ParametersProvider.class)
void testHelloServiceAsync(
ClientOptions clientOptions, SerializationFormat format, SessionProtocol protocol)
throws Exception {
final HelloService.AsyncIface client =
Clients.builder(uri(Handlers.HELLO, format, protocol))
.options(clientOptions)
.build(Handlers.HELLO.asyncIface());
final int testCount = 10;
final BlockingQueue<AbstractMap.SimpleEntry<Integer, ?>> resultQueue =
new LinkedBlockingDeque<>(testCount);
for (int i = 0; i < testCount; i++) {
final int num = i;
client.hello("kukuman" + num, new AsyncMethodCallback<String>() {
@Override
public void onComplete(String response) {
assertThat(resultQueue.add(new AbstractMap.SimpleEntry<>(num, response))).isTrue();
}
@Override
public void onError(Exception exception) {
assertThat(resultQueue.add(new AbstractMap.SimpleEntry<>(num, exception))).isTrue();
}
});
}
for (int i = 0; i < testCount; i++) {
final AbstractMap.SimpleEntry<Integer, ?> pair = resultQueue.take();
assertThat(pair.getValue()).isEqualTo("Hello, kukuman" + pair.getKey() + '!');
}
}
static String takeQ(BlockingQueue<String> q) {
String r = null;
try {
r = q.take();
} catch (InterruptedException e) {}
return r;
}
@Test
public void testIfPublisherIgnoresStaleEventsAndContinuesWithNextFlowWhenDeliveryQueueIsNotEmpty()
throws InterruptedException {
FanOutRecordsPublisher fanOutRecordsPublisher = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);
FanOutRecordsPublisher.RecordFlow recordFlow =
new FanOutRecordsPublisher.RecordFlow(fanOutRecordsPublisher, Instant.now(), "shard-001");
final int[] totalRecordsRetrieved = { 0 };
BlockingQueue<BatchUniqueIdentifier> ackQueue = new LinkedBlockingQueue<>();
fanOutRecordsPublisher.subscribe(new Subscriber<RecordsRetrieved>() {
@Override public void onSubscribe(Subscription subscription) {}
@Override public void onNext(RecordsRetrieved recordsRetrieved) {
totalRecordsRetrieved[0]++;
// Enqueue the ack for bursty delivery
ackQueue.add(recordsRetrieved.batchUniqueIdentifier());
// Send stale event periodically
}
@Override public void onError(Throwable throwable) {}
@Override public void onComplete() {}
});
IntStream.rangeClosed(1, 10).forEach(i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(
new FanOutRecordsPublisher.FanoutRecordsRetrieved(ProcessRecordsInput.builder().build(), i + "", recordFlow.getSubscribeToShardId()),
recordFlow));
BatchUniqueIdentifier batchUniqueIdentifierQueued;
int count = 0;
// Now that we allowed upto 10 elements queued up, send a pair of good and stale ack to verify records
// delivered as expected.
while(count++ < 10 && (batchUniqueIdentifierQueued = ackQueue.take()) != null) {
final BatchUniqueIdentifier batchUniqueIdentifierFinal = batchUniqueIdentifierQueued;
fanOutRecordsPublisher
.evictAckedEventAndScheduleNextEvent(() -> batchUniqueIdentifierFinal);
fanOutRecordsPublisher.evictAckedEventAndScheduleNextEvent(
() -> new BatchUniqueIdentifier("some_uuid_str", "some_old_flow"));
}
assertEquals(10, totalRecordsRetrieved[0]);
}
@Test(timeout = TIMEOUT)
public void performWithDataByCustomInterface() throws URISyntaxException, InterruptedException, IOException {
final BlockingQueue<String> events = new LinkedBlockingQueue<String>();
final MockWebServer mockWebServer = new MockWebServer();
final MockResponse response = new MockResponse();
response.withWebSocketUpgrade(new DefaultWebSocketListener() {
@Override
public void onMessage(BufferedSource payload, WebSocket.PayloadType type) throws IOException {
events.offer(payload.readUtf8());
payload.close();
}
});
mockWebServer.enqueue(response);
mockWebServer.start();
final Consumer consumer = new Consumer(mockWebServer.url("/").uri());
final Subscription subscription = consumer.getSubscriptions().create(new Channel("CommentsChannel"), CustomSubscription.class);
consumer.connect();
events.take(); // { command: subscribe }
final JsonObject data = new JsonObject();
data.addProperty("foo", "bar");
subscription.perform("follow", data);
final JsonObject expected = new JsonObject();
expected.addProperty("command", "message");
expected.addProperty("identifier", subscription.getIdentifier());
expected.addProperty("data", data.toString());
assertThat(events.take(), is(expected.toString()));
mockWebServer.shutdown();
}
public void testNoninterruptible() throws InterruptedException {
BlockingQueue<Object> q = new LinkedBlockingQueue<Object>();
Thread t = tryMatchInThread(INPUT, BACKTRACKER, q);
Thread.sleep(1000);
t.interrupt();
Object result = q.take();
assertTrue("mismatch uncompleted",Boolean.FALSE.equals(result));
}
@Test
public void testJavaErrorResponse() throws Exception {
BlockingQueue<StreamObserver<BeamFnApi.InstructionRequest>> outboundServerObservers =
new LinkedBlockingQueue<>();
BlockingQueue<Throwable> error = new LinkedBlockingQueue<>();
CallStreamObserver<BeamFnApi.InstructionResponse> inboundServerObserver =
TestStreams.<BeamFnApi.InstructionResponse>withOnNext(
response -> fail(String.format("Unexpected Response %s", response)))
.withOnError(error::add)
.build();
Endpoints.ApiServiceDescriptor apiServiceDescriptor =
Endpoints.ApiServiceDescriptor.newBuilder()
.setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
.build();
Server server =
InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
.addService(
new BeamFnControlGrpc.BeamFnControlImplBase() {
@Override
public StreamObserver<BeamFnApi.InstructionResponse> control(
StreamObserver<BeamFnApi.InstructionRequest> outboundObserver) {
Uninterruptibles.putUninterruptibly(outboundServerObservers, outboundObserver);
return inboundServerObserver;
}
})
.build();
server.start();
try {
EnumMap<
BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>>
handlers = new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);
handlers.put(
BeamFnApi.InstructionRequest.RequestCase.REGISTER,
value -> {
throw new Error("Test Error");
});
BeamFnControlClient client =
new BeamFnControlClient(
"",
apiServiceDescriptor,
InProcessManagedChannelFactory.create(),
OutboundObserverFactory.trivial(),
handlers);
// Get the connected client and attempt to send and receive an instruction
StreamObserver<BeamFnApi.InstructionRequest> outboundServerObserver =
outboundServerObservers.take();
ExecutorService executor = Executors.newCachedThreadPool();
Future<Void> future =
executor.submit(
() -> {
client.processInstructionRequests(executor);
return null;
});
// Ensure that all exceptions are caught and translated to failures
outboundServerObserver.onNext(
InstructionRequest.newBuilder()
.setInstructionId("0")
.setRegister(RegisterRequest.getDefaultInstance())
.build());
// There should be an error reported to the StreamObserver.
assertThat(error.take(), not(nullValue()));
// Ensure that the client shuts down when an Error is thrown from the harness
try {
future.get();
throw new IllegalStateException("The future should have terminated with an error");
} catch (ExecutionException errorWrapper) {
assertThat(errorWrapper.getCause().getMessage(), containsString("Test Error"));
}
} finally {
server.shutdownNow();
}
}
/**
* Creates a randomized activity and randomized date range.
* <p></p>
* The activity feed is separated into three chunks,
* |. . . data too recent to be in date range . . .||. . . data in date range. . .||. . . data too old to be in date range|
* [index 0, ............................................................................................., index length-1]
* <p></p>
* Inside of those chunks data has no order, but the list is ordered by those three chunks.
* <p></p>
* The test will check to see if the num of data in the date range make onto the output queue.
*/
@Test
@Repeat(iterations = 3)
public void testWithBeforeAndAfterDates() throws InterruptedException {
//initialize counts assuming no date ranges will be used
int numActivities = randomIntBetween(0, 1000);
int numActivitiesInDateRange = numActivities;
int numberOutOfRange = 0;
int numBeforeRange = 0;
int numAfterRange = 0;
//determine if date ranges will be used
DateTime beforeDate = null;
DateTime afterDate = null;
if (randomInt() % 2 == 0) {
beforeDate = DateTime.now().minusDays(randomIntBetween(1, 5));
}
if (randomInt() % 2 == 0) {
if (beforeDate == null) {
afterDate = DateTime.now().minusDays(randomIntBetween(1, 10));
} else {
afterDate = beforeDate.minusDays(randomIntBetween(1, 10));
}
}
//update counts if date ranges are going to be used.
if (beforeDate != null || afterDate != null) { //assign amount to be in range
numActivitiesInDateRange = randomIntBetween(0, numActivities);
numberOutOfRange = numActivities - numActivitiesInDateRange;
}
if (beforeDate == null && afterDate != null) { //assign all out of range to be before the start of the range
numBeforeRange = numberOutOfRange;
} else if (beforeDate != null && afterDate == null) { //assign all out of range to be after the start of the range
numAfterRange = numberOutOfRange;
} else if (beforeDate != null && afterDate != null) { //assign half before range and half after the range
numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2);
numBeforeRange = numberOutOfRange / 2;
}
Plus plus = createMockPlus(numBeforeRange, numAfterRange, numActivitiesInDateRange, afterDate, beforeDate);
BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1);
BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
UserInfo userInfo = new UserInfo();
userInfo.setUserId("A");
userInfo.setAfterDate(afterDate);
userInfo.setBeforeDate(beforeDate);
GPlusUserActivityCollector collector = new GPlusUserActivityCollector(plus, datums, strategy, userInfo);
collector.run();
Assert.assertEquals(numActivitiesInDateRange, datums.size());
while (!datums.isEmpty()) {
StreamsDatum datum = datums.take();
Assert.assertNotNull(datum);
Assert.assertNotNull(datum.getDocument());
Assert.assertTrue(datum.getDocument() instanceof String);
Assert.assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); //only in range documents are on the out going queue.
}
}
/**
* Tests queued slot scheduling with multiple slot sharing groups.
*/
@Test
public void testQueuedMultipleSlotSharingGroups() throws Exception {
final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(4);
final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final SlotSharingGroupId slotSharingGroupId1 = new SlotSharingGroupId();
final SlotSharingGroupId slotSharingGroupId2 = new SlotSharingGroupId();
final JobVertexID jobVertexId1 = new JobVertexID();
final JobVertexID jobVertexId2 = new JobVertexID();
final JobVertexID jobVertexId3 = new JobVertexID();
final JobVertexID jobVertexId4 = new JobVertexID();
final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId1,
slotSharingGroupId1,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId2,
slotSharingGroupId1,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId3,
slotSharingGroupId2,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId4,
slotSharingGroupId2,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
assertFalse(logicalSlotFuture1.isDone());
assertFalse(logicalSlotFuture2.isDone());
assertFalse(logicalSlotFuture3.isDone());
assertFalse(logicalSlotFuture4.isDone());
// we expect two slot requests
final AllocationID allocationId1 = allocationIds.take();
final AllocationID allocationId2 = allocationIds.take();
boolean offerFuture1 = slotPool.offerSlot(
taskManagerLocation,
new SimpleAckingTaskManagerGateway(),
new SlotOffer(
allocationId1,
0,
ResourceProfile.UNKNOWN));
boolean offerFuture2 = slotPool.offerSlot(
taskManagerLocation,
new SimpleAckingTaskManagerGateway(),
new SlotOffer(
allocationId2,
0,
ResourceProfile.UNKNOWN));
assertTrue(offerFuture1);
assertTrue(offerFuture2);
LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
LogicalSlot logicalSlot4 = logicalSlotFuture4.get();
assertEquals(logicalSlot1.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
assertEquals(allocationId1, logicalSlot1.getAllocationId());
assertEquals(allocationId2, logicalSlot3.getAllocationId());
}
private static int batchModeRun(TelnetClient telnet, List<String> commands, final int executionTimeout)
throws IOException, InterruptedException {
if (commands.size() == 0) {
return STATUS_OK;
}
long startTime = System.currentTimeMillis();
final InputStream inputStream = telnet.getInputStream();
final OutputStream outputStream = telnet.getOutputStream();
final BlockingQueue<String> receviedPromptQueue = new LinkedBlockingQueue<String>(1);
Thread printResultThread = new Thread(new Runnable() {
@Override
public void run() {
try {
StringBuilder line = new StringBuilder();
BufferedReader in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
int b = -1;
while (true) {
b = in.read();
if (b == -1) {
break;
}
line.appendCodePoint(b);
// 检查到有 [[email protected] 时,意味着可以执行下一个命令了
int index = line.indexOf(PROMPT);
if (index > 0) {
line.delete(0, index + PROMPT.length());
receviedPromptQueue.put("");
}
System.out.print(Character.toChars(b));
}
} catch (Exception e) {
// ignore
}
}
});
printResultThread.start();
// send commands to arthas server
for (String command : commands) {
if (command.trim().isEmpty()) {
continue;
}
// try poll prompt and check timeout
while (receviedPromptQueue.poll(100, TimeUnit.MILLISECONDS) == null) {
if (executionTimeout > 0) {
long now = System.currentTimeMillis();
if (now - startTime > executionTimeout) {
return STATUS_EXEC_TIMEOUT;
}
}
}
// send command to server
outputStream.write((command + " | plaintext\n").getBytes());
outputStream.flush();
}
// 读到最后一个命令执行后的 prompt ,可以直接发 quit命令了。
receviedPromptQueue.take();
outputStream.write("quit\n".getBytes());
outputStream.flush();
System.out.println();
return STATUS_OK;
}
@Test
void longPollingDisabledOnStop() throws Exception {
final BlockingQueue<RequestLog> healthCheckRequestLogs = new LinkedTransferQueue<>();
this.healthCheckRequestLogs = healthCheckRequestLogs;
final Endpoint endpoint = Endpoint.of("127.0.0.1", server.httpPort());
try (HealthCheckedEndpointGroup endpointGroup = build(
HealthCheckedEndpointGroup.builder(endpoint, HEALTH_CHECK_PATH))) {
// Check the initial state (healthy).
assertThat(endpointGroup.endpoints()).containsExactly(endpoint);
// Drop the first request.
healthCheckRequestLogs.take();
// Stop the server.
server.stop();
waitForGroup(endpointGroup, null);
// Must receive the '503 Service Unavailable' response with long polling disabled,
// so that the next health check respects the backoff.
for (;;) {
final ResponseHeaders stoppingResponseHeaders = healthCheckRequestLogs.take().responseHeaders();
if (stoppingResponseHeaders.status() == HttpStatus.OK) {
// It is possible to get '200 OK' if the server sent a response before the shutdown.
// Just try again so that another health check request is sent.
continue;
}
assertThat(stoppingResponseHeaders.status()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);
assertThat(stoppingResponseHeaders.getLong("armeria-lphc")).isNull();
break;
}
// Check the next check respected backoff, because there's no point of
// sending a request immediately only to get a 'connection refused' error.
final Stopwatch stopwatch = Stopwatch.createStarted();
healthCheckRequestLogs.take();
assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS))
.isGreaterThan(RETRY_INTERVAL.toMillis() * 4 / 5);
}
}
@SuppressWarnings("unchecked")
private void verifyNextElement(BlockingQueue<Object> output, long expectedElement) throws InterruptedException {
Object next = output.take();
assertTrue("next element is not an event", next instanceof StreamRecord);
assertEquals("wrong event", expectedElement, ((StreamRecord<Long>) next).getValue().longValue());
}
/**
* Tests queued slot scheduling with multiple slot sharing groups.
*/
@Test
public void testQueuedMultipleSlotSharingGroups() throws Exception {
final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(4);
final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final SlotSharingGroupId slotSharingGroupId1 = new SlotSharingGroupId();
final SlotSharingGroupId slotSharingGroupId2 = new SlotSharingGroupId();
final JobVertexID jobVertexId1 = new JobVertexID();
final JobVertexID jobVertexId2 = new JobVertexID();
final JobVertexID jobVertexId3 = new JobVertexID();
final JobVertexID jobVertexId4 = new JobVertexID();
final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId1,
slotSharingGroupId1,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId2,
slotSharingGroupId1,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId3,
slotSharingGroupId2,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
CompletableFuture<LogicalSlot> logicalSlotFuture4 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId4,
slotSharingGroupId2,
null),
true,
SlotProfile.noRequirements(),
TestingUtils.infiniteTime());
assertFalse(logicalSlotFuture1.isDone());
assertFalse(logicalSlotFuture2.isDone());
assertFalse(logicalSlotFuture3.isDone());
assertFalse(logicalSlotFuture4.isDone());
// we expect two slot requests
final AllocationID allocationId1 = allocationIds.take();
final AllocationID allocationId2 = allocationIds.take();
boolean offerFuture1 = slotPool.offerSlot(
taskManagerLocation,
new SimpleAckingTaskManagerGateway(),
new SlotOffer(
allocationId1,
0,
ResourceProfile.UNKNOWN));
boolean offerFuture2 = slotPool.offerSlot(
taskManagerLocation,
new SimpleAckingTaskManagerGateway(),
new SlotOffer(
allocationId2,
0,
ResourceProfile.UNKNOWN));
assertTrue(offerFuture1);
assertTrue(offerFuture2);
LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
LogicalSlot logicalSlot4 = logicalSlotFuture4.get();
assertEquals(logicalSlot1.getTaskManagerLocation(), logicalSlot2.getTaskManagerLocation());
assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
assertEquals(allocationId1, logicalSlot1.getAllocationId());
assertEquals(allocationId2, logicalSlot3.getAllocationId());
}
@Test
public void testCloseWindowAllAggregatorsEvent() throws StageException, InterruptedException {
AggregationDProcessor processor = new AggregationDProcessor();
processor.config = new AggregationConfigBean();
processor.config.windowType = WindowType.ROLLING;
processor.config.timeWindow = TimeWindow.TW_1D;
processor.config.timeZoneID = "UTC";
processor.config.timeWindowsToRemember = 1;
processor.config.aggregatorConfigs = new ArrayList<>();
processor.config.allAggregatorsEvent = true;
processor.config.perAggregatorEvents = false;
ProcessorRunner runner =
new ProcessorRunner.Builder(AggregationDProcessor.class, processor).addOutputLane("a").build();
Processor.Context context = (Processor.Context) runner.getContext();
processor.config.init(context);
processor.config.aggregatorConfigs = getAggregationConfigs();
BlockingQueue<EventRecord> queue = new ArrayBlockingQueue<>(10);
AggregationEvaluators evaluators = new AggregationEvaluators(context, processor.config, queue);
evaluators.init();
AggregationEvaluator evaluator = evaluators.getEvaluators().get(0);
Record record = RecordCreator.create();
evaluators.evaluate(record);
Assert.assertEquals(
1L,
((Map)((Map)evaluator.getMetric().getGaugeData().get(0)).get("value")).values().iterator().next()
);
evaluators.closeWindow();
Assert.assertEquals(
0L,
((Map)((Map)evaluator.getMetric().getGaugeData().get(0)).get("value")).values().iterator().next()
);
Assert.assertEquals(1, queue.size());
EventRecord event = queue.take();
Assert.assertEquals(
WindowType.ROLLING + AggregationEvaluators.ALL_AGGREGATORS_EVENT,
event.getEventType()
);
evaluators.destroy();
}
/**
* Tests that pending request is removed if task executor reports a slot with its allocation id.
*/
@Test
public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(),
new TestingResourceActionsBuilder().build())) {
final JobID jobID = new JobID();
final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest1);
final BlockingQueue<Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple6);
try {
return responseQueue.take();
} catch (InterruptedException ignored) {
return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
}
})
.createTestingTaskExecutorGateway();
final ResourceID taskExecutorResourceId = ResourceID.generate();
final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
final SlotReport slotReport = new SlotReport(createEmptySlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.ANY));
final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(firstManualSlotRequestResponse);
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(secondManualSlotRequestResponse);
final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest2);
// fail first request
firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
final Tuple6<SlotID, JobID, AllocationID, ResourceProfile, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
// fail second request
secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
assertThat(secondRequest.f0, equalTo(firstRequest.f0));
secondManualSlotRequestResponse.complete(Acknowledge.get());
final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
assertThat(slotManager.getNumberRegisteredSlots(), is(1));
}
}
/**
* Tests that pending request is removed if task executor reports a slot with its allocation id.
*/
@Test
public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
try (final SlotManagerImpl slotManager = createSlotManager(ResourceManagerId.generate(),
new TestingResourceActionsBuilder().build())) {
final JobID jobID = new JobID();
final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest1);
final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
try {
return responseQueue.take();
} catch (InterruptedException ignored) {
return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
}
})
.createTestingTaskExecutorGateway();
final ResourceID taskExecutorResourceId = ResourceID.generate();
final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(firstManualSlotRequestResponse);
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(secondManualSlotRequestResponse);
final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest2);
// fail first request
firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
// fail second request
secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
assertThat(secondRequest.f0, equalTo(firstRequest.f0));
secondManualSlotRequestResponse.complete(Acknowledge.get());
final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
assertThat(slotManager.getNumberRegisteredSlots(), is(1));
}
}
private void verifyWatermark(BlockingQueue<Object> output, Watermark expectedWatermark) throws InterruptedException {
Object next = output.take();
assertTrue("next element is not a watermark", next instanceof Watermark);
assertEquals("wrong watermark", expectedWatermark, next);
}