下面列出了java.util.concurrent.BlockingQueue#offer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testRemoveObj() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new DisruptorBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
for(int i=0; i<cap; i+=2) {
dbq.remove(Integer.valueOf(i));
}
Assert.assertEquals(dbq.size(), cap/2);
for(int i=1; i<cap; i+=2) {
Assert.assertEquals(Integer.valueOf(i), dbq.poll());
}
}
@Test
public void testClear() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new DisruptorBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
Assert.assertTrue(dbq.containsAll(si));
dbq.clear();
Assert.assertFalse(dbq.containsAll(si));
Assert.assertEquals(0, dbq.size());
Assert.assertTrue(dbq.isEmpty());
Assert.assertNull(dbq.poll());
}
public Thread tryMatchInThread(final CharSequence input, final String regex, final BlockingQueue<Object> atFinish) {
Thread t = new Thread() {
public void run() {
boolean result;
try {
result = tryMatch(input,regex);
} catch (Exception e) {
atFinish.offer(e);
return;
}
atFinish.offer(result);
}
};
t.start();
return t;
}
@Test
public void testPeek() {
final int cap = 10;
BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);
try {
Assert.assertNull(dbq.peek());
} catch(NoSuchElementException nsex) {
Assert.fail();
}
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
Assert.assertEquals(Integer.valueOf(0), dbq.peek());
}
for(int i=0; i<cap; i++) {
Assert.assertEquals(Integer.valueOf(i), dbq.peek());
dbq.poll(); // count up values checking peeks
}
}
@Test
public void testClear() {
final int cap = 100;
final BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
for(int i=0; i<cap; i++) {
dbq.offer(Integer.valueOf(i));
}
Set<Integer> si = new HashSet(cap);
for(int i=0; i<cap/10; i++) {
si.add(Integer.valueOf(i));
}
Assert.assertTrue(dbq.containsAll(si));
dbq.clear();
Assert.assertFalse(dbq.containsAll(si));
Assert.assertEquals(0, dbq.size());
Assert.assertTrue(dbq.isEmpty());
Assert.assertNull(dbq.poll());
}
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
int queueIndex;
if (isWriteRequest(call.getHeader(), call.getParam())) {
queueIndex = writeBalancer.getNextQueue();
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
if (queue.size() >= currentQueueLimit) {
return false;
}
return queue.offer(callTask);
}
/** Blocking retrieval and removal of the object to share. */
public V get(String key) {
try {
BlockingQueue<V> queue = retrieveSharedQueue(key);
V objToShare = queue.take();
if (!queue.offer(objToShare)) {
throw new RuntimeException("Error: Concurrent modification of the broker slot for key '" + key + "'.");
}
return objToShare;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean receive(int target, Object object) {
BlockingQueue<IMessage> messages = outMessages.get(target);
TaskMessage msg = new TaskMessage<>(object, inEdge, target);
return messages.offer(msg);
}
@ExpectWarning("RV")
public static void main(String args[]) throws Exception {
String str = " ttesting ";
str.trim();
str.toLowerCase();
str.toUpperCase();
str.replace(" ", "");
str.replace(' ', '.');
str.substring(0, 10);
str.equals("testing");
Semaphore s = new Semaphore(17, true);
s.tryAcquire();
s.tryAcquire(12, TimeUnit.MILLISECONDS);
BlockingQueue<Object> q = new LinkedBlockingQueue<Object>();
q.offer(new Object());
q.offer(new Object(), 12, TimeUnit.MILLISECONDS);
q.poll(12, TimeUnit.MILLISECONDS);
q.poll();
Lock l = new ReentrantLock();
Condition c = l.newCondition();
l.lock();
try {
c.awaitNanos(12);
c.awaitUntil(new Date());
c.await(12, TimeUnit.NANOSECONDS);
} finally {
l.unlock();
}
q.poll();
}
@Test
public void testPushPullBlockingQueueTestC1() {
final int cap = 10;
BlockingQueue<Integer> dbq = new PushPullBlockingQueue<Integer>(cap);
while(dbq.offer(Integer.valueOf(0)));
Assert.assertEquals(16, dbq.size());
}
public FunctionExecutionPooledExecutor(BlockingQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, final boolean forFnExec) {
this(initQ(q), maxPoolSize, stats, tf, msTimeout, initREH(q,forFnExec));
final int retryFor = Integer.getInteger("gemfire.RETRY_INTERVAL", 5000).intValue();
if (!(q instanceof SynchronousQueue)) {
this.bufferQueue = q;
// create a thread that takes from bufferQueue and puts into result
final BlockingQueue<Runnable> takeQueue = q;
final BlockingQueue<Runnable> putQueue = getQueue();
Runnable r = new Runnable() {
public void run() {
try {
for (;;) {
SystemFailure.checkFailure();
Runnable task = takeQueue.take();
if(forFnExec) {
if(!putQueue.offer(task,retryFor , TimeUnit.MILLISECONDS)){
submit(task);
}
}else {
putQueue.put(task);
}
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
// this thread is being shutdown so just return;
return;
}
}
};
this.bufferConsumer = tf.newThread(r);
this.bufferConsumer.start();
}
}
@Override
public boolean receive(int target, Object object) {
TaskMessage msg = new TaskMessage<>(object, inEdge, target);
BlockingQueue<IMessage> messages = outMessages.get(target);
if (messages != null) {
if (messages.offer(msg)) {
return true;
}
}
return true;
}
@Test
public void textIntMaxValue() {
// the blocking queue depends on sequence numbers that are integers
// be sure the blocking queue operates normally over
// a range spanning integer values
if(ALLOW_LONG_RUN) {
final int cap = 3;
final BlockingQueue<Integer> dbq = new MPMCBlockingQueue<>(cap);
long nIter = 0;
for(int i=0; i<Integer.MAX_VALUE; i++) {
for(int a=0; a<cap; a++) {
Assert.assertEquals(dbq.size(), a);
dbq.offer(Integer.valueOf(a));
nIter++;
}
for(int a=0; a<cap; a++) {
Assert.assertEquals(dbq.size(), cap-a);
Assert.assertEquals("At i="+i, dbq.poll(),Integer.valueOf(a));
}
if(nIter % Integer.MAX_VALUE == 0) System.out.println(nIter+"times MAX_VALUE");
}
} else {
System.out.println("max value test not executed");
}
}
/**
* Put and offer follow the same pattern:
* 1. Get a priorityLevel from the scheduler
* 2. Get the nth sub-queue matching this priorityLevel
* 3. delegate the call to this sub-queue.
*
* But differ in how they handle overflow:
* - Put will move on to the next queue until it lands on the last queue
* - Offer does not attempt other queues on overflow
*/
@Override
public void put(E e) throws InterruptedException {
int priorityLevel = scheduler.getPriorityLevel(e);
final int numLevels = this.queues.size();
while (true) {
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean res = q.offer(e);
if (!res) {
// Update stats
this.overflowedCalls.get(priorityLevel).getAndIncrement();
// If we failed to insert, try again on the next level
priorityLevel++;
if (priorityLevel == numLevels) {
// That was the last one, we will block on put in the last queue
// Delete this line to drop the call
this.queues.get(priorityLevel-1).put(e);
break;
}
} else {
break;
}
}
signalNotEmpty();
}
@Override
public boolean offer(E e) {
int priorityLevel = scheduler.getPriorityLevel(e);
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e);
signalNotEmpty();
return ret;
}
@Override
public boolean receive(int target, Object object) {
TaskMessage msg = new TaskMessage<>(object, inEdge, target);
BlockingQueue<IMessage> messages = outMessages.get(target);
if (messages != null) {
if (messages.offer(msg)) {
return true;
}
}
return true;
}
/**
* Tests that the {@link TaskExecutor} tries to reconnect if the initial slot report
* fails.
*/
@Test
public void testInitialSlotReportFailure() throws Exception {
final TaskSlotTable<Task> taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
.build();
final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices);
taskExecutor.start();
try {
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(2);
testingResourceManagerGateway.setSendSlotReportFunction(
resourceIDInstanceIDSlotReportTuple3 -> {
try {
return responseQueue.take();
} catch (InterruptedException e) {
return FutureUtils.completedExceptionally(e);
}
});
final CompletableFuture<RegistrationResponse> registrationResponse = CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
testingResourceManagerGateway.getOwnResourceId(),
new ClusterInformation("foobar", 1234)));
final CountDownLatch numberRegistrations = new CountDownLatch(2);
testingResourceManagerGateway.setRegisterTaskExecutorFunction(taskExecutorRegistration -> {
numberRegistrations.countDown();
return registrationResponse;
});
responseQueue.offer(FutureUtils.completedExceptionally(new FlinkException("Test exception")));
responseQueue.offer(CompletableFuture.completedFuture(Acknowledge.get()));
rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
//wait for the second registration attempt
numberRegistrations.await();
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
}
/**
* Tests that the {@link TaskExecutor} tries to reconnect if the initial slot report
* fails.
*/
@Test
public void testInitialSlotReportFailure() throws Exception {
final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskSlotTable(taskSlotTable)
.setTaskManagerLocation(taskManagerLocation)
.build();
final TaskExecutor taskExecutor = createTaskExecutor(taskManagerServices);
taskExecutor.start();
try {
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(2);
testingResourceManagerGateway.setSendSlotReportFunction(
resourceIDInstanceIDSlotReportTuple3 -> {
try {
return responseQueue.take();
} catch (InterruptedException e) {
return FutureUtils.completedExceptionally(e);
}
});
final CompletableFuture<RegistrationResponse> registrationResponse = CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
testingResourceManagerGateway.getOwnResourceId(),
new ClusterInformation("foobar", 1234)));
final CountDownLatch numberRegistrations = new CountDownLatch(2);
testingResourceManagerGateway.setRegisterTaskExecutorFunction(new Function<Tuple4<String, ResourceID, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>>() {
@Override
public CompletableFuture<RegistrationResponse> apply(Tuple4<String, ResourceID, Integer, HardwareDescription> stringResourceIDIntegerHardwareDescriptionTuple4) {
numberRegistrations.countDown();
return registrationResponse;
}
});
responseQueue.offer(FutureUtils.completedExceptionally(new FlinkException("Test exception")));
responseQueue.offer(CompletableFuture.completedFuture(Acknowledge.get()));
rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
//wait for the second registration attempt
numberRegistrations.await();
} finally {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
}
}
/**
* Tests that pending request is removed if task executor reports a slot with its allocation id.
*/
@Test
public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
new TestingResourceActionsBuilder().build())) {
final JobID jobID = new JobID();
final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest1);
final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
try {
return responseQueue.take();
} catch (InterruptedException ignored) {
return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
}
})
.createTestingTaskExecutorGateway();
final ResourceID taskExecutorResourceId = ResourceID.generate();
final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(firstManualSlotRequestResponse);
slotManager.registerTaskManager(taskExecutionConnection, slotReport);
final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
responseQueue.offer(secondManualSlotRequestResponse);
final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest2);
// fail first request
firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
// fail second request
secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
assertThat(secondRequest.f0, equalTo(firstRequest.f0));
secondManualSlotRequestResponse.complete(Acknowledge.get());
final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
assertThat(slotManager.getNumberRegisteredSlots(), is(1));
}
}
private static <T> void forceAdd(BlockingQueue<T> queue, T value) {
while (!queue.offer(value)) {
LOG.warning("Failed to add to queue, trying again...");
}
}