java.util.concurrent.ArrayBlockingQueue#take ( )源码实例Demo

下面列出了java.util.concurrent.ArrayBlockingQueue#take ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public static void main(String[] args) throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
    queue.add(1);
    queue.add(2);
    queue.add(3);
    System.out.println("After addding numbers Queue: " +queue);

    int head=queue.take();
    System.out.println("Head of queue removed is " +head);
    System.out.print("After removing head,Queue: ");
    System.out.println(queue);

    head = queue.take();

    System.out.println("Head of queue removed is " + head);
    System.out.print("After removing head Queue: ");
    System.out.println(queue);
}
 
源代码2 项目: localization_nifi   文件: TestPutTCPCommon.java
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
    // check each sent FlowFile was successfully sent and received.
    for (int i = 0; i < iterations; i++) {
        for (String item : sentData) {
            List<Byte> message = recvQueue.take();
            assertNotNull(message);
            Byte[] messageBytes = new Byte[message.size()];
            assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
        }
    }

    runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
    runner.clearTransferState();

    // Check that we have no unexpected extra data.
    assertNull(recvQueue.poll());
}
 
源代码3 项目: nifi   文件: TestPutTCPCommon.java
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
    // check each sent FlowFile was successfully sent and received.
    for (int i = 0; i < iterations; i++) {
        for (String item : sentData) {
            List<Byte> message = recvQueue.take();
            assertNotNull(message);
            Byte[] messageBytes = new Byte[message.size()];
            assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
        }
    }

    runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
    runner.clearTransferState();

    // Check that we have no unexpected extra data.
    assertNull(recvQueue.poll());
}
 
源代码4 项目: Flink-CEPplus   文件: SlotPoolImplTest.java
/**
 * Tests that create report of allocated slots on a {@link TaskExecutor}.
 */
@Test
public void testCreateAllocatedSlotReport() throws Exception {

	try (SlotPoolImpl slotPool = new SlotPoolImpl(jobId)) {

		final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
		resourceManagerGateway.setRequestSlotConsumer(
				slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));

		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
		Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

		final SlotRequestId slotRequestId = new SlotRequestId();
		final CompletableFuture<LogicalSlot> slotRequestFuture = allocateSlot(scheduler, slotRequestId);

		final List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(2);
		final List<SlotOffer> slotOffers = new ArrayList<>(2);

		final AllocationID allocatedId = allocationIds.take();
		slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.UNKNOWN));
		allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));

		final AllocationID availableId = new AllocationID();
		slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.UNKNOWN));
		allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());
		slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

		// wait for the completion of slot future
		slotRequestFuture.get();

		final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
		assertThat(jobId, is(slotReport.getJobId()));
		assertThat(slotReport.getAllocatedSlotInfos(), containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
	}
}
 
源代码5 项目: flink   文件: SlotPoolImplTest.java
/**
 * Tests that create report of allocated slots on a {@link TaskExecutor}.
 */
@Test
public void testCreateAllocatedSlotReport() throws Exception {

	try (SlotPoolImpl slotPool = createSlotPoolImpl()) {

		final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
		resourceManagerGateway.setRequestSlotConsumer(
				slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));

		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
		Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

		final SlotRequestId slotRequestId = new SlotRequestId();
		final CompletableFuture<LogicalSlot> slotRequestFuture = allocateSlot(scheduler, slotRequestId);

		final List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(2);
		final List<SlotOffer> slotOffers = new ArrayList<>(2);

		final AllocationID allocatedId = allocationIds.take();
		slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.UNKNOWN));
		allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));

		final AllocationID availableId = new AllocationID();
		slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.UNKNOWN));
		allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());
		slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

		// wait for the completion of slot future
		slotRequestFuture.get();

		final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
		assertThat(jobId, is(slotReport.getJobId()));
		assertThat(slotReport.getAllocatedSlotInfos(), containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
	}
}
 
源代码6 项目: openjdk-jdk9   文件: OnExitTest.java
/**
 * Get a line from the queue and split into words on whitespace.
 * Log to stdout if requested.
 * @param queue a queue of strings
 * @return the words split from the line.
 */
private static String[] getSplitLine(ArrayBlockingQueue<String> queue) {
    try {
        String line = queue.take();
        String[] split = line.split("\\s");
        if (DEBUG) {
            System.out.printf("  Child Output: %s%n", line);
        }
        return split;
    } catch (InterruptedException ie) {
        Assert.fail("interrupted", ie);
        return null;
    }
}
 
源代码7 项目: flink   文件: SlotPoolImplTest.java
/**
 * Tests that create report of allocated slots on a {@link TaskExecutor}.
 */
@Test
public void testCreateAllocatedSlotReport() throws Exception {

	try (SlotPoolImpl slotPool = createSlotPoolImpl()) {

		final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
		resourceManagerGateway.setRequestSlotConsumer(
				slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));

		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
		Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

		final SlotRequestId slotRequestId = new SlotRequestId();
		final CompletableFuture<LogicalSlot> slotRequestFuture = allocateSlot(scheduler, slotRequestId);

		final List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(2);
		final List<SlotOffer> slotOffers = new ArrayList<>(2);

		final AllocationID allocatedId = allocationIds.take();
		slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.ANY));
		allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));

		final AllocationID availableId = new AllocationID();
		slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.ANY));
		allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());
		slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

		// wait for the completion of slot future
		slotRequestFuture.get();

		final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
		assertThat(jobId, is(slotReport.getJobId()));
		assertThat(slotReport.getAllocatedSlotInfos(), containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
	}
}
 
源代码8 项目: Flink-CEPplus   文件: SlotPoolImplTest.java
/**
 * Tests that unused offered slots are directly used to fulfill pending slot
 * requests.
 *
 * <p>Moreover it tests that the old slot request is canceled
 *
 * <p>See FLINK-8089, FLINK-8934
 */
@Test
public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {

	try (SlotPoolImpl slotPool = new SlotPoolImpl(jobId)) {
		final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
		resourceManagerGateway.setRequestSlotConsumer(
			(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
		final ArrayBlockingQueue<AllocationID> canceledAllocations = new ArrayBlockingQueue<>(2);
		resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
		final SlotRequestId slotRequestId1 = new SlotRequestId();
		final SlotRequestId slotRequestId2 = new SlotRequestId();
		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
		final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

		final ScheduledUnit scheduledUnit = new ScheduledUnit(
			new JobVertexID(),
			null,
			null);

		CompletableFuture<LogicalSlot> slotFuture1 = scheduler.allocateSlot(
			slotRequestId1,
			scheduledUnit,
			SlotProfile.noRequirements(),
			true,
			timeout);

		// wait for the first slot request
		final AllocationID allocationId1 = allocationIds.take();

		CompletableFuture<LogicalSlot> slotFuture2 = scheduler.allocateSlot(
			slotRequestId2,
			scheduledUnit,
			SlotProfile.noRequirements(),
			true,
			timeout);

		// wait for the second slot request
		final AllocationID allocationId2 = allocationIds.take();

		slotPool.releaseSlot(slotRequestId1, null);

		try {
			// this should fail with a CancellationException
			slotFuture1.get();
			fail("The first slot future should have failed because it was cancelled.");
		} catch (ExecutionException ee) {
			// expected
			assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof FlinkException);
		}

		assertEquals(allocationId1, canceledAllocations.take());

		final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());

		assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));

		// the slot offer should fulfill the second slot request
		assertEquals(allocationId1, slotFuture2.get().getAllocationId());

		// check that the second slot allocation has been canceled
		assertEquals(allocationId2, canceledAllocations.take());
	}
}
 
源代码9 项目: flink   文件: SlotPoolImplTest.java
/**
 * Tests that unused offered slots are directly used to fulfill pending slot
 * requests.
 *
 * <p>Moreover it tests that the old slot request is canceled
 *
 * <p>See FLINK-8089, FLINK-8934
 */
@Test
public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {

	try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
		final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
		resourceManagerGateway.setRequestSlotConsumer(
			(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
		final ArrayBlockingQueue<AllocationID> canceledAllocations = new ArrayBlockingQueue<>(2);
		resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
		final SlotRequestId slotRequestId1 = new SlotRequestId();
		final SlotRequestId slotRequestId2 = new SlotRequestId();
		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
		final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

		final ScheduledUnit scheduledUnit = new ScheduledUnit(
			new JobVertexID(),
			null,
			null);

		CompletableFuture<LogicalSlot> slotFuture1 = scheduler.allocateSlot(
			slotRequestId1,
			scheduledUnit,
			SlotProfile.noRequirements(),
			true,
			timeout);

		// wait for the first slot request
		final AllocationID allocationId1 = allocationIds.take();

		CompletableFuture<LogicalSlot> slotFuture2 = scheduler.allocateSlot(
			slotRequestId2,
			scheduledUnit,
			SlotProfile.noRequirements(),
			true,
			timeout);

		// wait for the second slot request
		final AllocationID allocationId2 = allocationIds.take();

		slotPool.releaseSlot(slotRequestId1, null);

		try {
			// this should fail with a CancellationException
			slotFuture1.get();
			fail("The first slot future should have failed because it was cancelled.");
		} catch (ExecutionException ee) {
			// expected
			assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof FlinkException);
		}

		assertEquals(allocationId1, canceledAllocations.take());

		final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());

		assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));

		// the slot offer should fulfill the second slot request
		assertEquals(allocationId1, slotFuture2.get().getAllocationId());

		// check that the second slot allocation has been canceled
		assertEquals(allocationId2, canceledAllocations.take());
	}
}
 
源代码10 项目: arca-android   文件: SupportLoaderTestCase.java
/**
    * Runs a Loader synchronously and returns the result of the load. The loader will
    * be started, stopped, and destroyed by this method so it cannot be reused.
    *
    * @param loader The loader to run synchronously
    * @return The result from the loader
    */
   @SuppressLint("HandlerLeak")
public <T> T getLoaderResultSynchronously(final Loader<T> loader) {
       // The test thread blocks on this queue until the loader puts it's result in
       final ArrayBlockingQueue<T> queue = new ArrayBlockingQueue<T>(1);

       // This callback runs on the "main" thread and unblocks the test thread
       // when it puts the result into the blocking queue
       final OnLoadCompleteListener<T> listener = new OnLoadCompleteListener<T>() {
           @Override
           public void onLoadComplete(Loader<T> completedLoader, T data) {
               // Shut the loader down
               completedLoader.unregisterListener(this);
               completedLoader.stopLoading();
               completedLoader.reset();

               // Store the result, unblocking the test thread
               queue.add(data);
           }
       };

       // This handler runs on the "main" thread of the process since AsyncTask
       // is documented as needing to run on the main thread and many Loaders use
       // AsyncTask
       final Handler mainThreadHandler = new Handler(Looper.getMainLooper()) {
           @Override
           public void handleMessage(Message msg) {
               loader.registerListener(0, listener);
               loader.startLoading();
           }
       };

       // Ask the main thread to start the loading process
       mainThreadHandler.sendEmptyMessage(0);

       // Block on the queue waiting for the result of the load to be inserted
       T result;
       while (true) {
           try {
               result = queue.take();
               break;
           } catch (InterruptedException e) {
               throw new RuntimeException("waiting thread interrupted", e);
           }
       }

       return result;
   }
 
源代码11 项目: flink   文件: SlotPoolImplTest.java
/**
 * Tests that unused offered slots are directly used to fulfill pending slot
 * requests.
 *
 * <p>Moreover it tests that the old slot request is canceled
 *
 * <p>See FLINK-8089, FLINK-8934
 */
@Test
public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {

	try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
		final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
		resourceManagerGateway.setRequestSlotConsumer(
			(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
		final ArrayBlockingQueue<AllocationID> canceledAllocations = new ArrayBlockingQueue<>(2);
		resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
		final SlotRequestId slotRequestId1 = new SlotRequestId();
		final SlotRequestId slotRequestId2 = new SlotRequestId();
		setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
		final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);

		final ScheduledUnit scheduledUnit = new ScheduledUnit(
			new JobVertexID(),
			null,
			null);

		CompletableFuture<LogicalSlot> slotFuture1 = scheduler.allocateSlot(
			slotRequestId1,
			scheduledUnit,
			SlotProfile.noRequirements(),
			timeout);

		// wait for the first slot request
		final AllocationID allocationId1 = allocationIds.take();

		CompletableFuture<LogicalSlot> slotFuture2 = scheduler.allocateSlot(
			slotRequestId2,
			scheduledUnit,
			SlotProfile.noRequirements(),
			timeout);

		// wait for the second slot request
		final AllocationID allocationId2 = allocationIds.take();

		slotPool.releaseSlot(slotRequestId1, null);

		try {
			// this should fail with a CancellationException
			slotFuture1.get();
			fail("The first slot future should have failed because it was cancelled.");
		} catch (ExecutionException ee) {
			// expected
			assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof FlinkException);
		}

		assertEquals(allocationId1, canceledAllocations.take());

		final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.ANY);

		slotPool.registerTaskManager(taskManagerLocation.getResourceID());

		assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));

		// the slot offer should fulfill the second slot request
		assertEquals(allocationId1, slotFuture2.get().getAllocationId());

		// check that the second slot allocation has been canceled
		assertEquals(allocationId2, canceledAllocations.take());
	}
}
 
源代码12 项目: flink   文件: SlotManagerImplTest.java
/**
 * Tests that a slot request is retried if it times out on the task manager side.
 */
@Test
public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
	final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
	final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();

	final JobID jobId = new JobID();
	final AllocationID allocationId = new AllocationID();
	final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
	final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
	final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
	final CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<>();
	final Iterator<CompletableFuture<Acknowledge>> slotRequestFutureIterator = Arrays.asList(slotRequestFuture1, slotRequestFuture2).iterator();
	final ArrayBlockingQueue<SlotID> slotIds = new ArrayBlockingQueue<>(2);

	final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
		.setRequestSlotFunction(FunctionUtils.uncheckedFunction(
			requestSlotParameters -> {
				slotIds.put(requestSlotParameters.f0);
				return slotRequestFutureIterator.next();
			}))
		.createTestingTaskExecutorGateway();

	final ResourceID resourceId = ResourceID.generate();
	final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);

	final SlotID slotId1 = new SlotID(resourceId, 0);
	final SlotID slotId2 = new SlotID(resourceId, 1);
	final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
	final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
	final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));

	try (SlotManagerImpl slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {

		slotManager.registerTaskManager(taskManagerConnection, slotReport);

		slotManager.registerSlotRequest(slotRequest);

		final SlotID firstSlotId = slotIds.take();
		assertThat(slotIds, is(empty()));

		TaskManagerSlot failedSlot = slotManager.getSlot(firstSlotId);

		// let the first attempt fail --> this should trigger a second attempt
		slotRequestFuture1.completeExceptionally(new SlotAllocationException("Test exception."));

		// the second attempt succeeds
		slotRequestFuture2.complete(Acknowledge.get());

		final SlotID secondSlotId = slotIds.take();
		assertThat(slotIds, is(empty()));

		TaskManagerSlot slot = slotManager.getSlot(secondSlotId);

		assertTrue(slot.getState() == TaskManagerSlot.State.ALLOCATED);
		assertEquals(allocationId, slot.getAllocationId());

		if (!failedSlot.getSlotId().equals(slot.getSlotId())) {
			assertTrue(failedSlot.getState() == TaskManagerSlot.State.FREE);
		}
	}
}
 
源代码13 项目: flink   文件: SlotManagerImplTest.java
/**
 * Tests that pending slot requests are rejected if a slot report with a different allocation
 * is received.
 */
@Test
public void testSlotReportWhileActiveSlotRequest() throws Exception {
	final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
	final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();

	final JobID jobId = new JobID();
	final AllocationID allocationId = new AllocationID();
	final ResourceProfile resourceProfile = ResourceProfile.fromResources(42.0, 1337);
	final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
	final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();

	final Iterator<CompletableFuture<Acknowledge>> slotRequestFutureIterator = Arrays.asList(
		slotRequestFuture1,
		CompletableFuture.completedFuture(Acknowledge.get())).iterator();
	final ArrayBlockingQueue<SlotID> slotIds = new ArrayBlockingQueue<>(2);

	final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
		.setRequestSlotFunction(FunctionUtils.uncheckedFunction(
			requestSlotParameters -> {
				slotIds.put(requestSlotParameters.f0);
				return slotRequestFutureIterator.next();
			}))
		.createTestingTaskExecutorGateway();

	final ResourceID resourceId = ResourceID.generate();
	final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);

	final SlotID slotId1 = new SlotID(resourceId, 0);
	final SlotID slotId2 = new SlotID(resourceId, 1);
	final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
	final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
	final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));

	final ScheduledExecutor mainThreadExecutor = TestingUtils.defaultScheduledExecutor();

	final SlotManagerImpl slotManager = createSlotManagerBuilder()
		.setScheduledExecutor(mainThreadExecutor)
		.build();

	try {

		slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);

		CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
			() -> {
				slotManager.registerTaskManager(taskManagerConnection, slotReport);

				return null;
			},
			mainThreadExecutor)
		.thenAccept(
			(Object value) -> {
				try {
					slotManager.registerSlotRequest(slotRequest);
				} catch (ResourceManagerException e) {
					throw new RuntimeException("Could not register slots.", e);
				}
			});

		// check that no exception has been thrown
		registrationFuture.get();

		final SlotID requestedSlotId = slotIds.take();
		final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;

		final SlotStatus newSlotStatus1 = new SlotStatus(requestedSlotId, resourceProfile, new JobID(), new AllocationID());
		final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
		final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));

		CompletableFuture<Boolean> reportSlotStatusFuture = CompletableFuture.supplyAsync(
			// this should update the slot with the pending slot request triggering the reassignment of it
			() -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport),
			mainThreadExecutor);

		assertTrue(reportSlotStatusFuture.get());

		final SlotID requestedSlotId2 = slotIds.take();

		assertEquals(freeSlotId, requestedSlotId2);
	} finally {
		CompletableFuture.runAsync(
			ThrowingRunnable.unchecked(slotManager::close),
			mainThreadExecutor);
	}
}