下面列出了java.util.concurrent.ArrayBlockingQueue#put ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final ArrayBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
public Tuple read() throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
ArrayBlockingQueue<Tuple> queue = new ArrayBlockingQueue(10000);
while(true) {
Tuple tuple = stream.read();
if (!tuple.EOF) {
try {
queue.put(tuple);
} catch (InterruptedException e) {
throw new IOException(e);
}
executorService.execute(new StreamTask(queue, streamFactory, streamContext));
} else {
return tuple;
}
}
}
/**
* receive response
*/
public void putResponse(String message) throws Exception {
String[] request = StringUtils.split(message, "|");
if (request.length < 2) {
throw new Exception("参数错误!切分的长度必须大于1" + message);
}
int requestId = Integer.parseInt(request[0]);
if (!responses.containsKey(requestId)) {
log.warn("give up the response,request id is:" + requestId + ",maybe because timeout!");
return;
}
try {
ArrayBlockingQueue<String[]> queue = responses.get(requestId);
if (queue != null) {
queue.put(request);
} else {
log.warn("give up the response,request id is:" + requestId + ",because queue is null");
}
} catch (InterruptedException e) {
log.error("put response error,request id is:" + requestId, e);
}
}
/**
* drainTo empties full queue, unblocking a waiting put.
*/
public void testDrainToWithActivePut() throws InterruptedException {
final ArrayBlockingQueue q = populatedQueue(SIZE);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
q.put(new Integer(SIZE + 1));
}});
t.start();
ArrayList l = new ArrayList();
q.drainTo(l);
assertTrue(l.size() >= SIZE);
for (int i = 0; i < SIZE; ++i)
assertEquals(l.get(i), new Integer(i));
t.join();
assertTrue(q.size() + l.size() >= SIZE);
}
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
try {
queue.put(1);
queue.put(2);
queue.put(3);
// queue.put(4); // 如果不注释,将会一直阻塞当前线程(main),除非有容量空出。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("queue contains " + queue);
}
/**
* all elements successfully put are contained
*/
public void testPut() throws InterruptedException {
ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
Integer x = new Integer(i);
q.put(x);
assertTrue(q.contains(x));
}
assertEquals(0, q.remainingCapacity());
}
/**
* all elements successfully put are contained
*/
public void testPut() throws InterruptedException {
ArrayBlockingQueue q = new ArrayBlockingQueue(SIZE);
for (int i = 0; i < SIZE; ++i) {
Integer x = new Integer(i);
q.put(x);
assertTrue(q.contains(x));
}
assertEquals(0, q.remainingCapacity());
}
@Test(timeout = WAIT_FOR_TEST_THREADS_TIMEOUT)
public void canStartupMultipleWorkersInParallel() throws InterruptedException, IOException {
ArrayBlockingQueue<Future<WorkerProcess>> workers = new ArrayBlockingQueue<>(1);
WorkerProcessPool pool = createPool(2, workers);
// thread 1, attempting to borrow a worker
testThreads.startThread(borrowWorkerProcessWithoutReturning(pool, concurrentSet()));
// transfer an incomplete future to thread 1. Thread 1 is blocked on @{link Future#get}.
workers.put(new CompletableFuture<>());
// transfer a completable future to thread 2
// .put will block until thread 1 takes the un-completable future added first
FakeWorkerProcess createdWorker = new FakeWorkerProcess(ImmutableMap.of());
Future<WorkerProcess> worker = CompletableFuture.completedFuture(createdWorker);
workers.put(worker);
// thread 2, attempting to borrow a worker
Set<WorkerProcess> createdWorkers = concurrentSet();
Thread secondThread =
testThreads.startThread(borrowWorkerProcessWithoutReturning(pool, createdWorkers));
// wait until thread 2 ends
secondThread.join();
// here, the second thread has finished running, and has thus added the worker it borrowed to
// `createdWorkers`.
assertThat(createdWorkers, equalTo(ImmutableSet.of(createdWorker)));
}