下面列出了java.util.concurrent.ArrayBlockingQueue#take ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
queue.add(1);
queue.add(2);
queue.add(3);
System.out.println("After addding numbers Queue: " +queue);
int head=queue.take();
System.out.println("Head of queue removed is " +head);
System.out.print("After removing head,Queue: ");
System.out.println(queue);
head = queue.take();
System.out.println("Head of queue removed is " + head);
System.out.print("After removing head Queue: ");
System.out.println(queue);
}
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
// check each sent FlowFile was successfully sent and received.
for (int i = 0; i < iterations; i++) {
for (String item : sentData) {
List<Byte> message = recvQueue.take();
assertNotNull(message);
Byte[] messageBytes = new Byte[message.size()];
assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
}
}
runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
runner.clearTransferState();
// Check that we have no unexpected extra data.
assertNull(recvQueue.poll());
}
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
// check each sent FlowFile was successfully sent and received.
for (int i = 0; i < iterations; i++) {
for (String item : sentData) {
List<Byte> message = recvQueue.take();
assertNotNull(message);
Byte[] messageBytes = new Byte[message.size()];
assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
}
}
runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
runner.clearTransferState();
// Check that we have no unexpected extra data.
assertNull(recvQueue.poll());
}
/**
* Tests that create report of allocated slots on a {@link TaskExecutor}.
*/
@Test
public void testCreateAllocatedSlotReport() throws Exception {
try (SlotPoolImpl slotPool = new SlotPoolImpl(jobId)) {
final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> slotRequestFuture = allocateSlot(scheduler, slotRequestId);
final List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(2);
final List<SlotOffer> slotOffers = new ArrayList<>(2);
final AllocationID allocatedId = allocationIds.take();
slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.UNKNOWN));
allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));
final AllocationID availableId = new AllocationID();
slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.UNKNOWN));
allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
// wait for the completion of slot future
slotRequestFuture.get();
final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
assertThat(jobId, is(slotReport.getJobId()));
assertThat(slotReport.getAllocatedSlotInfos(), containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
}
}
/**
* Tests that create report of allocated slots on a {@link TaskExecutor}.
*/
@Test
public void testCreateAllocatedSlotReport() throws Exception {
try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> slotRequestFuture = allocateSlot(scheduler, slotRequestId);
final List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(2);
final List<SlotOffer> slotOffers = new ArrayList<>(2);
final AllocationID allocatedId = allocationIds.take();
slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.UNKNOWN));
allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));
final AllocationID availableId = new AllocationID();
slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.UNKNOWN));
allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
// wait for the completion of slot future
slotRequestFuture.get();
final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
assertThat(jobId, is(slotReport.getJobId()));
assertThat(slotReport.getAllocatedSlotInfos(), containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
}
}
/**
* Get a line from the queue and split into words on whitespace.
* Log to stdout if requested.
* @param queue a queue of strings
* @return the words split from the line.
*/
private static String[] getSplitLine(ArrayBlockingQueue<String> queue) {
try {
String line = queue.take();
String[] split = line.split("\\s");
if (DEBUG) {
System.out.printf(" Child Output: %s%n", line);
}
return split;
} catch (InterruptedException ie) {
Assert.fail("interrupted", ie);
return null;
}
}
/**
* Tests that create report of allocated slots on a {@link TaskExecutor}.
*/
@Test
public void testCreateAllocatedSlotReport() throws Exception {
try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> slotRequestFuture = allocateSlot(scheduler, slotRequestId);
final List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(2);
final List<SlotOffer> slotOffers = new ArrayList<>(2);
final AllocationID allocatedId = allocationIds.take();
slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.ANY));
allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));
final AllocationID availableId = new AllocationID();
slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.ANY));
allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
// wait for the completion of slot future
slotRequestFuture.get();
final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
assertThat(jobId, is(slotReport.getJobId()));
assertThat(slotReport.getAllocatedSlotInfos(), containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
}
}
/**
* Tests that unused offered slots are directly used to fulfill pending slot
* requests.
*
* <p>Moreover it tests that the old slot request is canceled
*
* <p>See FLINK-8089, FLINK-8934
*/
@Test
public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
try (SlotPoolImpl slotPool = new SlotPoolImpl(jobId)) {
final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
resourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
final ArrayBlockingQueue<AllocationID> canceledAllocations = new ArrayBlockingQueue<>(2);
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
final SlotRequestId slotRequestId1 = new SlotRequestId();
final SlotRequestId slotRequestId2 = new SlotRequestId();
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
final ScheduledUnit scheduledUnit = new ScheduledUnit(
new JobVertexID(),
null,
null);
CompletableFuture<LogicalSlot> slotFuture1 = scheduler.allocateSlot(
slotRequestId1,
scheduledUnit,
SlotProfile.noRequirements(),
true,
timeout);
// wait for the first slot request
final AllocationID allocationId1 = allocationIds.take();
CompletableFuture<LogicalSlot> slotFuture2 = scheduler.allocateSlot(
slotRequestId2,
scheduledUnit,
SlotProfile.noRequirements(),
true,
timeout);
// wait for the second slot request
final AllocationID allocationId2 = allocationIds.take();
slotPool.releaseSlot(slotRequestId1, null);
try {
// this should fail with a CancellationException
slotFuture1.get();
fail("The first slot future should have failed because it was cancelled.");
} catch (ExecutionException ee) {
// expected
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof FlinkException);
}
assertEquals(allocationId1, canceledAllocations.take());
final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
// the slot offer should fulfill the second slot request
assertEquals(allocationId1, slotFuture2.get().getAllocationId());
// check that the second slot allocation has been canceled
assertEquals(allocationId2, canceledAllocations.take());
}
}
/**
* Tests that unused offered slots are directly used to fulfill pending slot
* requests.
*
* <p>Moreover it tests that the old slot request is canceled
*
* <p>See FLINK-8089, FLINK-8934
*/
@Test
public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
resourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
final ArrayBlockingQueue<AllocationID> canceledAllocations = new ArrayBlockingQueue<>(2);
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
final SlotRequestId slotRequestId1 = new SlotRequestId();
final SlotRequestId slotRequestId2 = new SlotRequestId();
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
final ScheduledUnit scheduledUnit = new ScheduledUnit(
new JobVertexID(),
null,
null);
CompletableFuture<LogicalSlot> slotFuture1 = scheduler.allocateSlot(
slotRequestId1,
scheduledUnit,
SlotProfile.noRequirements(),
true,
timeout);
// wait for the first slot request
final AllocationID allocationId1 = allocationIds.take();
CompletableFuture<LogicalSlot> slotFuture2 = scheduler.allocateSlot(
slotRequestId2,
scheduledUnit,
SlotProfile.noRequirements(),
true,
timeout);
// wait for the second slot request
final AllocationID allocationId2 = allocationIds.take();
slotPool.releaseSlot(slotRequestId1, null);
try {
// this should fail with a CancellationException
slotFuture1.get();
fail("The first slot future should have failed because it was cancelled.");
} catch (ExecutionException ee) {
// expected
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof FlinkException);
}
assertEquals(allocationId1, canceledAllocations.take());
final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
// the slot offer should fulfill the second slot request
assertEquals(allocationId1, slotFuture2.get().getAllocationId());
// check that the second slot allocation has been canceled
assertEquals(allocationId2, canceledAllocations.take());
}
}
/**
* Runs a Loader synchronously and returns the result of the load. The loader will
* be started, stopped, and destroyed by this method so it cannot be reused.
*
* @param loader The loader to run synchronously
* @return The result from the loader
*/
@SuppressLint("HandlerLeak")
public <T> T getLoaderResultSynchronously(final Loader<T> loader) {
// The test thread blocks on this queue until the loader puts it's result in
final ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(1);
// This callback runs on the "main" thread and unblocks the test thread
// when it puts the result into the blocking queue
final OnLoadCompleteListener<T> listener = new OnLoadCompleteListener<T>() {
@Override
public void onLoadComplete(Loader<T> completedLoader, T data) {
// Shut the loader down
completedLoader.unregisterListener(this);
completedLoader.stopLoading();
completedLoader.reset();
// Store the result, unblocking the test thread
queue.add(data);
}
};
// This handler runs on the "main" thread of the process since AsyncTask
// is documented as needing to run on the main thread and many Loaders use
// AsyncTask
final Handler mainThreadHandler = new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(Message msg) {
loader.registerListener(0, listener);
loader.startLoading();
}
};
// Ask the main thread to start the loading process
mainThreadHandler.sendEmptyMessage(0);
// Block on the queue waiting for the result of the load to be inserted
T result;
while (true) {
try {
result = queue.take();
break;
} catch (InterruptedException e) {
throw new RuntimeException("waiting thread interrupted", e);
}
}
return result;
}
/**
* Tests that unused offered slots are directly used to fulfill pending slot
* requests.
*
* <p>Moreover it tests that the old slot request is canceled
*
* <p>See FLINK-8089, FLINK-8934
*/
@Test
public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
resourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
final ArrayBlockingQueue<AllocationID> canceledAllocations = new ArrayBlockingQueue<>(2);
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
final SlotRequestId slotRequestId1 = new SlotRequestId();
final SlotRequestId slotRequestId2 = new SlotRequestId();
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
final ScheduledUnit scheduledUnit = new ScheduledUnit(
new JobVertexID(),
null,
null);
CompletableFuture<LogicalSlot> slotFuture1 = scheduler.allocateSlot(
slotRequestId1,
scheduledUnit,
SlotProfile.noRequirements(),
timeout);
// wait for the first slot request
final AllocationID allocationId1 = allocationIds.take();
CompletableFuture<LogicalSlot> slotFuture2 = scheduler.allocateSlot(
slotRequestId2,
scheduledUnit,
SlotProfile.noRequirements(),
timeout);
// wait for the second slot request
final AllocationID allocationId2 = allocationIds.take();
slotPool.releaseSlot(slotRequestId1, null);
try {
// this should fail with a CancellationException
slotFuture1.get();
fail("The first slot future should have failed because it was cancelled.");
} catch (ExecutionException ee) {
// expected
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof FlinkException);
}
assertEquals(allocationId1, canceledAllocations.take());
final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.ANY);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
// the slot offer should fulfill the second slot request
assertEquals(allocationId1, slotFuture2.get().getAllocationId());
// check that the second slot allocation has been canceled
assertEquals(allocationId2, canceledAllocations.take());
}
}
/**
* Tests that a slot request is retried if it times out on the task manager side.
*/
@Test
public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
final CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<>();
final Iterator<CompletableFuture<Acknowledge>> slotRequestFutureIterator = Arrays.asList(slotRequestFuture1, slotRequestFuture2).iterator();
final ArrayBlockingQueue<SlotID> slotIds = new ArrayBlockingQueue<>(2);
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(FunctionUtils.uncheckedFunction(
requestSlotParameters -> {
slotIds.put(requestSlotParameters.f0);
return slotRequestFutureIterator.next();
}))
.createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
slotManager.registerSlotRequest(slotRequest);
final SlotID firstSlotId = slotIds.take();
assertThat(slotIds, is(empty()));
TaskManagerSlot failedSlot = slotManager.getSlot(firstSlotId);
// let the first attempt fail --> this should trigger a second attempt
slotRequestFuture1.completeExceptionally(new SlotAllocationException("Test exception."));
// the second attempt succeeds
slotRequestFuture2.complete(Acknowledge.get());
final SlotID secondSlotId = slotIds.take();
assertThat(slotIds, is(empty()));
TaskManagerSlot slot = slotManager.getSlot(secondSlotId);
assertTrue(slot.getState() == TaskManagerSlot.State.ALLOCATED);
assertEquals(allocationId, slot.getAllocationId());
if (!failedSlot.getSlotId().equals(slot.getSlotId())) {
assertTrue(failedSlot.getState() == TaskManagerSlot.State.FREE);
}
}
}
/**
* Tests that pending slot requests are rejected if a slot report with a different allocation
* is received.
*/
@Test
public void testSlotReportWhileActiveSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
final Iterator<CompletableFuture<Acknowledge>> slotRequestFutureIterator = Arrays.asList(
slotRequestFuture1,
CompletableFuture.completedFuture(Acknowledge.get())).iterator();
final ArrayBlockingQueue<SlotID> slotIds = new ArrayBlockingQueue<>(2);
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(FunctionUtils.uncheckedFunction(
requestSlotParameters -> {
slotIds.put(requestSlotParameters.f0);
return slotRequestFutureIterator.next();
}))
.createTestingTaskExecutorGateway();
final ResourceID resourceId = ResourceID.generate();
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
final ScheduledExecutor mainThreadExecutor = TestingUtils.defaultScheduledExecutor();
final SlotManagerImpl slotManager = createSlotManagerBuilder()
.setScheduledExecutor(mainThreadExecutor)
.build();
try {
slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
() -> {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
return null;
},
mainThreadExecutor)
.thenAccept(
(Object value) -> {
try {
slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
throw new RuntimeException("Could not register slots.", e);
}
});
// check that no exception has been thrown
registrationFuture.get();
final SlotID requestedSlotId = slotIds.take();
final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
final SlotStatus newSlotStatus1 = new SlotStatus(requestedSlotId, resourceProfile, new JobID(), new AllocationID());
final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
CompletableFuture<Boolean> reportSlotStatusFuture = CompletableFuture.supplyAsync(
// this should update the slot with the pending slot request triggering the reassignment of it
() -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport),
mainThreadExecutor);
assertTrue(reportSlotStatusFuture.get());
final SlotID requestedSlotId2 = slotIds.take();
assertEquals(freeSlotId, requestedSlotId2);
} finally {
CompletableFuture.runAsync(
ThrowingRunnable.unchecked(slotManager::close),
mainThreadExecutor);
}
}