下面列出了java.util.concurrent.BlockingQueue#put ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void putMessage(BlockingQueue<DataSyncMessageHandler> messageQ,
PassiveSynchronizationChannel<EhcacheEntityMessage> syncChannel,
Map<Long, Chain> mappingsToSend) {
try {
if (syncChannel != null) {
final EhcacheDataSyncMessage msg = new EhcacheDataSyncMessage(mappingsToSend);
messageQ.put(() -> {
syncChannel.synchronizeToPassive(msg);
return true;
});
} else {
// we are done
messageQ.put(() -> false);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
if (terminate.get() || noNeedRows)
return true;
MySQLConnection mySQLConn = (MySQLConnection) conn;
BlockingQueue<HeapItem> queue = queues.get(mySQLConn);
if (queue == null)
return true;
HeapItem item = new HeapItem(row, rowPacket, mySQLConn);
try {
queue.put(item);
} catch (InterruptedException e) {
//ignore error
}
return false;
}
@Test
public void shouldReadFromQueueForeverAndPushToSink() throws InterruptedException {
BlockingQueue<Records> queue = new LinkedBlockingQueue<>();
BqQueueWorker worker = new BqQueueWorker("bq-worker", successfulSink, queueConfig, committer, queue, workerState);
Records messages2 = mock(Records.class);
when(committer.acknowledge(any())).thenReturn(true);
queue.put(messages);
queue.put(messages2);
Thread workerThread = new Thread(worker);
workerThread.start();
await().atMost(10, TimeUnit.SECONDS).until(() -> queue.isEmpty());
workerState.closeWorker();
workerThread.join();
verify(successfulSink).push(messages);
verify(successfulSink).push(messages2);
}
private YoutubeProvider buildProvider(YoutubeConfiguration config) {
return new YoutubeProvider(config) {
@Override
protected YouTube createYouTubeClient() throws IOException {
return mock(YouTube.class);
}
@Override
protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
final BlockingQueue<StreamsDatum> q = queue;
return () -> {
try {
q.put(new StreamsDatum(null));
} catch (InterruptedException ie) {
fail("Test was interrupted");
}
};
}
};
}
/**
* Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)} uninterruptibly.
*
* @throws ClassCastException if the class of the specified element prevents it from being added
* to the given queue
* @throws IllegalArgumentException if some property of the specified element prevents it from
* being added to the given queue
*/
@GwtIncompatible // concurrency
public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
boolean interrupted = false;
try {
while (true) {
try {
queue.put(element);
return;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)} uninterruptibly.
*
* @throws ClassCastException if the class of the specified element prevents it from being added
* to the given queue
* @throws IllegalArgumentException if some property of the specified element prevents it from
* being added to the given queue
*/
@GwtIncompatible // concurrency
public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
boolean interrupted = false;
try {
while (true) {
try {
queue.put(element);
return;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
public void getAll(Lobstack stack, BlockingQueue<Map.Entry<String, ByteBuffer> > consumer)
throws IOException, InterruptedException
{
for(String key : children.keySet())
{
NodeEntry ne = children.get(key);
if (ne.node)
{
stack.loadNodeAt(ne.location).getAll(stack, consumer);
}
else
{
String data_key = key.substring(0, key.length()-1);
consumer.put(new SimpleEntry<String,ByteBuffer>(data_key, stack.loadAtLocation(ne.location)));
}
}
}
/**
* Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)} uninterruptibly.
*
* @throws ClassCastException if the class of the specified element prevents it from being added
* to the given queue
* @throws IllegalArgumentException if some property of the specified element prevents it from
* being added to the given queue
*/
@GwtIncompatible // concurrency
public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
boolean interrupted = false;
try {
while (true) {
try {
queue.put(element);
return;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
@Test(timeOut = 10000)
public void pollTimeout2() throws Exception {
BlockingQueue<Integer> queue = new GrowableArrayBlockingQueue<>();
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
queue.poll(1, TimeUnit.HOURS);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// Make sure background thread is waiting on poll
Thread.sleep(100);
queue.put(1);
latch.await();
}
/**
* Put a new message onto the queue. This method is blocking if the queue buffer is full.
* @param message - Message to be added to the queue.
* @throws InterruptedException - thrown if a thread is interrupted while blocked adding to the queue.
*/
@Override
public void put(final Message message) throws InterruptedException {
// Grab the source virtual spoutId
final VirtualSpoutIdentifier virtualSpoutId = message.getMessageId().getSrcVirtualSpoutId();
// Add to correct buffer
BlockingQueue<Message> virtualSpoutQueue = messageBuffer.get(virtualSpoutId);
// If our queue doesn't exist
if (virtualSpoutQueue == null) {
// Attempt to put it
messageBuffer.putIfAbsent(virtualSpoutId, createNewEmptyQueue());
// Grab a reference.
virtualSpoutQueue = messageBuffer.get(virtualSpoutId);
}
// Put it.
virtualSpoutQueue.put(message);
}
@Test
public void shouldReadFromQueueAndPushToSink() throws InterruptedException {
BlockingQueue<Records> queue = new LinkedBlockingQueue<>();
BqQueueWorker worker = new BqQueueWorker("bq-worker", successfulSink, queueConfig, committer, queue, workerState);
queue.put(messages);
Thread thread = new Thread(worker);
thread.start();
WorkerUtil.closeWorker(worker, workerState, 100);
thread.join();
verify(successfulSink).push(messages);
}
/**
* Create a new pool that uses the supplied Channel for queuing, and
* with all default parameter settings except for pool size.
**/
public PooledExecutorWithDMStats(BlockingQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout) {
this(initQ(q), maxPoolSize, stats, tf, msTimeout, initREH(q));
if (!(q instanceof SynchronousQueue)) {
this.bufferQueue = q;
// create a thread that takes from bufferQueue and puts into result
final BlockingQueue<Runnable> takeQueue = q;
final BlockingQueue<Runnable> putQueue = getQueue();
Runnable r = new Runnable() {
public void run() {
try {
for (;;) {
SystemFailure.checkFailure();
putQueue.put(takeQueue.take());
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
// this thread is being shutdown so just return;
return;
}
}
};
this.bufferConsumer = tf.newThread(r);
this.bufferConsumer.start();
}
}
private void transferPreviousRepliesToResponse(BlockingQueue<OperationStatus> operations) throws InterruptedException {
OperationStatus status = feedReplies.poll();
while (status != null) {
outstandingOperations.decrementAndGet();
operations.put(status);
status = feedReplies.poll();
}
}
private void sendShutdownToAllQueues() {
for (BlockingQueue<FetchedDataChunk> queue : queues) {
try {
queue.put(SHUTDOWN_COMMAND);
} catch (InterruptedException e) {
logger.warn(e.getMessage(),e);
}
}
}
public static void main(String[] args) throws Exception {
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
// 1. PriorityBlockingQueue put(Object) 方法不阻塞
// 2. PriorityBlockingQueue offer(Object) 方法不限制
// 3. PriorityBlockingQueue 插入对象会做排序,默认参照元素 Comparable 实现,
// 或者显示地传递 Comparator
queue.put(9);
queue.put(1);
queue.put(8);
System.out.println("queue.size() = " + queue.size());
System.out.println("queue.take() = " + queue.take());
System.out.println("queue = " + queue);
}
private void enqueueUninterruptibly(IncomingMessageEnvelope envelope) {
final BlockingQueue<IncomingMessageEnvelope> queue =
queues.get(envelope.getSystemStreamPartition());
while (true) {
try {
queue.put(envelope);
return;
} catch (InterruptedException e) {
// Some events require that we post an envelope to the queue even if the interrupt
// flag was set (i.e. during a call to stop) to ensure that the consumer properly
// shuts down. Consequently, if we receive an interrupt here we ignore it and retry
// the put operation.
}
}
}
private static <T> void put(BlockingQueue<T> queue, T items) {
try {
queue.put(items);
} catch (InterruptedException e) {
// ignore
}
}
boolean queueInput(InputStream input, BlockingQueue<InputStream> queue, int batchSize) {
if (input == null) return false;
try {
queue.put(input);
} catch (InterruptedException e) {
throw new IllegalStateException("InputStream was not added to the queue." + e.getMessage());
}
return checkQueue(queue, batchSize);
}
/**
* Lets wait until there are enough Ready pods of the given Deployment
*/
private void waitUntilDeploymentConfigIsScaled(final int count) {
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
final AtomicReference<Integer> replicasRef = new AtomicReference<>(0);
final String name = checkName(getItem());
final String namespace = checkNamespace(getItem());
final Runnable deploymentPoller = () -> {
try {
DeploymentConfig deploymentConfig = get();
//If the rs is gone, we shouldn't wait.
if (deploymentConfig == null) {
if (count == 0) {
queue.put(true);
return;
} else {
queue.put(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."));
return;
}
}
replicasRef.set(deploymentConfig.getStatus().getReplicas());
int currentReplicas = deploymentConfig.getStatus().getReplicas() != null ? deploymentConfig.getStatus().getReplicas() : 0;
if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) {
queue.put(true);
} else {
LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...",
deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), deploymentConfig.getMetadata().getName(), namespace);
}
} catch (Throwable t) {
LOG.error("Error while waiting for Deployment to be scaled.", t);
}
};
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
try {
if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.",
replicasRef.get(), count, name, namespace);
} else {
LOG.error("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {} after waiting for {} seconds so giving up",
replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout()));
}
} finally {
poller.cancel(true);
executor.shutdown();
}
}
private static RejectedExecutionHandler initREH(
final BlockingQueue<Runnable> q, boolean forFnExec)
{
if (forFnExec) {
return new RejectedExecutionHandler() {
public void rejectedExecution(final Runnable r,
ThreadPoolExecutor executor)
{
if (executor.isShutdown()) {
throw new RejectedExecutionException(
LocalizedStrings.PooledExecutorWithDMStats_EXECUTOR_HAS_BEEN_SHUTDOWN
.toLocalizedString());
}
else {
// System.out.println("Asif: Rejection called");
if (Thread.currentThread() == ((FunctionExecutionPooledExecutor)executor).bufferConsumer) {
Thread th = executor.getThreadFactory().newThread(
(new Runnable() {
public void run()
{
r.run();
}
}));
th.start();
}
else {
try {
q.put(r);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
// this thread is being shutdown so just return;
return;
}
}
}
}
};
}
else {
if (q instanceof SynchronousQueue || q instanceof SynchronousQueueNoSpin) {
return new CallerRunsPolicy();
// return new BlockHandler();
}
else {
// create a thread that takes from bufferQueue and puts into result
return new BufferHandler();
}
}
}