下面列出了java.util.concurrent.ArrayBlockingQueue#offer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void test() throws Exception{
Vector<Object> vector = new Vector<Object>();
ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(50000);
Random myRandom = new Random();
for (int j = 0; j<50000; j++){
if (myRandom.nextInt(2) == 0){
vector.add(i);
queue.offer(i);
}
else{
vector.add(k);
queue.offer(k);
}
}
assertTrue(Arrays.equals(vector.toArray(), queue.toArray()));
}
/**
* Put the message into internal queues, to be serialized and then send to the network channel
*
* @param source source
* @param message data
* @param target target
* @param flags flags
* @param routingParameters routing parameters
* @param pendingSendMessages the message queue
* @return true if message is accepted
*/
private boolean offerForSend(int source, Object message, int target, int flags,
RoutingParameters routingParameters,
ArrayBlockingQueue<OutMessage> pendingSendMessages) {
if (pendingSendMessages.remainingCapacity() > 0) {
int path = DEFAULT_PATH;
if (routingParameters.getExternalRoutes().size() > 0) {
path = routingParameters.getDestinationId();
}
OutMessage sendMessage = new OutMessage(source, edge,
path, target, flags, routingParameters.getInternalRoutes(),
routingParameters.getExternalRoutes(), dataType, keyType, this, message);
// now try to put this into pending
return pendingSendMessages.offer(sendMessage);
}
return false;
}
/**
* Put the message into internal queues, to be serialized and then send to the network channel
*
* @param source source
* @param message data
* @param target target
* @param flags flags
* @param routingParameters routing parameters
* @param pendingSendMessages the message queue
* @return true if message is accepted
*/
private boolean offerForSend(int source, Object message, int target, int flags,
RoutingParameters routingParameters,
ArrayBlockingQueue<OutMessage> pendingSendMessages) {
if (pendingSendMessages.remainingCapacity() > 0) {
int path = DEFAULT_PATH;
if (routingParameters.getExternalRoutes().size() > 0) {
path = routingParameters.getDestinationId();
}
OutMessage sendMessage = new OutMessage(source, edge,
path, target, flags, routingParameters.getInternalRoutes(),
routingParameters.getExternalRoutes(), dataType, keyType, this, message);
// now try to put this into pending
return pendingSendMessages.offer(sendMessage);
}
return false;
}
private static boolean addQueueData(ArrayBlockingQueue<QueueData> queue,
QueueData data, AtomicReference<Throwable> err, Thread[] threads)
throws CacheClosedException, InterruptedException {
while (!queue.offer(data, 1, TimeUnit.SECONDS)) {
// check cancellation
Misc.checkIfCacheClosing(null);
// check if a thread failed
if (err.get() != null) {
return false;
}
// should we have something else?
boolean someAlive = false;
for (Thread thr: threads) {
if (thr.isAlive()) {
someAlive = true;
break;
}
}
if (!someAlive) {
return false;
}
}
return true;
}
private static boolean addQueueData(ArrayBlockingQueue<QueueData> queue,
QueueData data, AtomicReference<Throwable> err, Thread[] threads)
throws CacheClosedException, InterruptedException {
while (!queue.offer(data, 1, TimeUnit.SECONDS)) {
// check cancellation
Misc.checkIfCacheClosing(null);
// check if a thread failed
if (err.get() != null) {
return false;
}
// should we have something else?
boolean someAlive = false;
for (Thread thr: threads) {
if (thr.isAlive()) {
someAlive = true;
break;
}
}
if (!someAlive) {
return false;
}
}
return true;
}
protected int addSet(ArrayBlockingQueue<Set<String>> ids, Set<String> set) throws InterruptedException {
int sz = set.size();
//don't bother adding if set size == 0
if (sz == 0) {
return sz;
}
boolean added = ids.offer(set, 1, TimeUnit.SECONDS);
LOG.debug("id grabber: " + added + " " + ids.size());
while (!added) {
added = ids.offer(set, 1, TimeUnit.SECONDS);
LOG.debug("waiting to add");
}
return sz;
}
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5);
queue.offer(1);
queue.offer(2);
queue.offer(3);
System.out.println("Queue Contains" + queue);
System.out.println("Removing From head: " + queue.poll());
System.out.println("Queue Contains" + queue);
System.out.println("Removing From head: " + queue.poll());
System.out.println("Queue Contains" + queue);
System.out.println("Removing From head: " + queue.poll());
System.out.println("Queue Contains" + queue);
System.out.println("Removing From head: " + queue.poll());
System.out.println("Queue Contains" + queue);
}
@Override
public boolean handleReceivedChannelMessage(ChannelMessage currentMessage) {
int src = router.mainTaskOfExecutor(instancePlan.getThisWorker(),
CommunicationContext.DEFAULT_DESTINATION);
RoutingParameters routingParameters;
if (routingParametersCache.containsKey(src)) {
routingParameters = routingParametersCache.get(src);
} else {
routingParameters = sendRoutingParameters(src, CommunicationContext.DEFAULT_DESTINATION);
}
ArrayBlockingQueue<OutMessage> pendingSendMessages = pendingSendMessagesPerSource.get(src);
// create a send message to keep track of the serialization at the initial stage
// the sub-edge is 0
int di = -1;
if (routingParameters.getExternalRoutes().size() > 0) {
di = routingParameters.getDestinationId();
}
OutMessage sendMessage = new OutMessage(src,
currentMessage.getHeader().getEdge(),
di, CommunicationContext.DEFAULT_DESTINATION, currentMessage.getHeader().getFlags(),
routingParameters.getInternalRoutes(),
routingParameters.getExternalRoutes(), dataType, this.keyType, delegate,
CommunicationContext.EMPTY_OBJECT);
sendMessage.getChannelMessages().offer(currentMessage);
// we need to update here
if (!currentMessage.isOutCountUpdated()) {
currentMessage.incrementRefCount(routingParameters.getExternalRoutes().size());
currentMessage.setOutCountUpdated(true);
}
// this is a complete message
sendMessage.setSendState(OutMessage.SendState.SERIALIZED);
// now try to put this into pending
return pendingSendMessages.offer(sendMessage);
}
private void submitResultBatch(BatchContext context, ArrayBlockingQueue<Batch> queue, Batch batch, boolean continuedInNextBatch)
throws IOException, InterruptedException {
if (!batch.isEmpty()) {
// Mark this batch as open
context.openBatch(batch);
try {
batch.setContinuedInNextBatch(continuedInNextBatch);
// Attempt to submit the batch to the result queue without blocking
if (!queue.offer(batch)) {
// The queue was full. Increment the blocked counter and synchronously wait for queue availability
_blockedRangeScans.inc();
try {
while (!queue.offer(batch, 5, TimeUnit.SECONDS)) {
context.propagateExceptionIfPresent();
}
} finally {
_blockedRangeScans.dec();
}
}
_batchesSubmitted.inc();
_batchRowsSubmitted.inc(batch.getResults().size());
} catch (IOException | InterruptedException e) {
// Batch was never submitted so un-mark that it is open
context.closeBatch(batch, e);
throw e;
}
}
}
/**
* Tests that a failing {@link JobManagerRunner} will be properly cleaned up.
*/
@Test
public void testFailingJobManagerRunnerCleanup() throws Exception {
final FlinkException testException = new FlinkException("Test exception.");
final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
dispatcher = createAndStartDispatcher(
heartbeatServices,
haServices,
new BlockingJobManagerRunnerFactory(() -> {
final Optional<Exception> take = queue.take();
final Exception exception = take.orElse(null);
if (exception != null) {
throw exception;
}
}));
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
assertThat(submissionFuture.isDone(), is(false));
queue.offer(Optional.of(testException));
try {
submissionFuture.get();
fail("Should fail because we could not instantiate the JobManagerRunner.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
}
submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
queue.offer(Optional.empty());
submissionFuture.get();
}
/**
* Tests that a failing {@link JobManagerRunner} will be properly cleaned up.
*/
@Test
public void testFailingJobManagerRunnerCleanup() throws Exception {
final FlinkException testException = new FlinkException("Test exception.");
final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
dispatcher = createAndStartDispatcher(
heartbeatServices,
haServices,
new BlockingJobManagerRunnerFactory(() -> {
final Optional<Exception> take = queue.take();
final Exception exception = take.orElse(null);
if (exception != null) {
throw exception;
}
}));
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
assertThat(submissionFuture.isDone(), is(false));
queue.offer(Optional.of(testException));
try {
submissionFuture.get();
fail("Should fail because we could not instantiate the JobManagerRunner.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
}
submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
queue.offer(Optional.empty());
submissionFuture.get();
}
/**
* Tests that a failing {@link JobManagerRunner} will be properly cleaned up.
*/
@Test
public void testFailingJobManagerRunnerCleanup() throws Exception {
final FlinkException testException = new FlinkException("Test exception.");
final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2);
dispatcher = createAndStartDispatcher(
heartbeatServices,
haServices,
new BlockingJobManagerRunnerFactory(() -> {
final Optional<Exception> take = queue.take();
final Exception exception = take.orElse(null);
if (exception != null) {
throw exception;
}
}));
dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
assertThat(submissionFuture.isDone(), is(false));
queue.offer(Optional.of(testException));
try {
submissionFuture.get();
fail("Should fail because we could not instantiate the JobManagerRunner.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true));
}
submissionFuture = dispatcherGateway.submitJob(jobGraph, TIMEOUT);
queue.offer(Optional.empty());
submissionFuture.get();
}