下面列出了com.google.common.collect.Queues#newLinkedBlockingQueue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public EventReporter(Builder builder) {
super(builder.context, builder.name, builder.filter, builder.rateUnit, builder.durationUnit);
this.closer = Closer.create();
this.immediateReportExecutor = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1,
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("EventReporter-" + builder.name + "-%d"))),
5, TimeUnit.MINUTES);
this.metricContext = builder.context;
this.notificationTargetKey = builder.context.addNotificationTarget(new Function<Notification, Void>() {
@Nullable
@Override
public Void apply(Notification notification) {
notificationCallback(notification);
return null;
}
});
this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY);
}
@Inject
StateMachineProxy(@Nonnull @StateExecutor Executor executor, @Nonnull StateMachine stateMachine) {
this.executor = checkNotNull(executor);
this.stateMachine = checkNotNull(stateMachine);
this.operations = Queues.newLinkedBlockingQueue();
this.running = new AtomicBoolean(false);
}
@Override
public JavaDStream<Long> getDStream() throws Exception {
List<Long> list = Lists.newArrayList();
for (int i = 0; i < rowsPerBatch; i++) {
list.add(counter++);
}
JavaRDD<Long> longs = Contexts.getJavaStreamingContext().sparkContext().parallelize(list);
Queue<JavaRDD<Long>> queue = Queues.newLinkedBlockingQueue();
queue.add(longs);
LOG.info("Created stream queue with {} rows", list.size());
return Contexts.getJavaStreamingContext().queueStream(queue, true);
}
private AsyncAppender(Appender<ILoggingEvent> delegate) {
this.delegate = delegate;
this.queue = Queues.newLinkedBlockingQueue();
this.batch = Lists.newArrayListWithCapacity(BATCH_SIZE);
this.dispatcher = THREAD_FACTORY.newThread(this);
setContext(delegate.getContext());
}
/**
* Test that duplicate cache flushes are filtered by
* DistributedCacheManagerDecorator.CacheMessageSendingTransactionSynchronization.exhaustQueue(...)
* to just the unique set, and that duplicate cache entry flushes are filtered to just the unique set as well.
*/
@Test
public void testDuplicateCacheRemovalCase3() {
Queue<CacheTarget> targets = Queues.newLinkedBlockingQueue();
// duplicate caches, we expect these to be filtered to the unique set
targets.add(CacheTarget.entireCache(ROLE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(ROLE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(DELEGATE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(DELEGATE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(PERMISSION_TYPE));
targets.add(CacheTarget.entireCache(PERMISSION_TYPE));
// duplicate cache entries, we expect these to be filtered down to the unique set.
targets.add(CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key1"));
targets.add(CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key1"));
targets.add(CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key2"));
targets.add(CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key2"));
// the expected result is the unique set of caches, and the unique set of specified cache entries
ArrayList<CacheTarget> correctResults = Lists.newArrayList(
CacheTarget.entireCache(ROLE_TYPE_CACHE),
CacheTarget.entireCache(DELEGATE_TYPE_CACHE),
CacheTarget.entireCache(PERMISSION_TYPE),
CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key1"),
CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key2"));
Collection<CacheTarget> results = new ArrayList<CacheTarget>(invokeExhaustQueue(targets));
assertTrue(CollectionUtils.diff(correctResults, results).isEmpty());
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
this.rpcParams = RpcParameterFactory.get(this.systemConf);
this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue();
this.serviceTracker = ServiceTrackerFactory.get(systemConf);
this.workerContext.getNodeResourceManager().getDispatcher().register(NodeStatusEvent.EventType.class, this);
this.heartBeatInterval = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL);
this.updaterThread = new StatusUpdaterThread();
this.updaterThread.setName("NodeStatusUpdater");
super.serviceInit(conf);
}
/**
* Test that duplicate cache flushes are filtered by
* DistributedCacheManagerDecorator.CacheMessageSendingTransactionSynchronization.exhaustQueue(...)
* to just the unique set, and that duplicate cache entry flushes are filtered to just the unique set as well.
*/
@Test
public void testDuplicateCacheRemovalCase2() {
Queue<CacheTarget> targets = Queues.newLinkedBlockingQueue();
// duplicate caches, we expect these to be filtered to the unique set
targets.add(CacheTarget.entireCache(ROLE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(ROLE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(DELEGATE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(DELEGATE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(PERMISSION_TYPE));
targets.add(CacheTarget.entireCache(PERMISSION_TYPE));
// cache entries -- we expect no filtering, since (1) the caches these entries are in are not being
// flushed in their entirety, and (2) the cache + key combinations are unique
targets.add(CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key1"));
targets.add(CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key2"));
targets.add(CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key3"));
targets.add(CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key4"));
// the expected result is the unique set of caches, and each of the specified cache entries
ArrayList<CacheTarget> correctResults = Lists.newArrayList(
CacheTarget.entireCache(ROLE_TYPE_CACHE),
CacheTarget.entireCache(DELEGATE_TYPE_CACHE),
CacheTarget.entireCache(PERMISSION_TYPE),
CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key1"),
CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key2"),
CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key3"),
CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key4"));
Collection<CacheTarget> results = new ArrayList<CacheTarget>(invokeExhaustQueue(targets));
assertTrue(CollectionUtils.diff(correctResults, results).isEmpty());
}
@Override
public void executeApp() throws Exception {
executor =
new ThreadPoolExecutor(1, 1, 60, MILLISECONDS, Queues.newLinkedBlockingQueue());
// need to pre-create threads, otherwise lambda execution will be captured by the
// initial thread run, and won't really test lambda execution capture
executor.prestartAllCoreThreads();
transactionMarker();
}
@Override
public void executeApp() throws Exception {
executor =
new ThreadPoolExecutor(1, 1, 60, MILLISECONDS, Queues.newLinkedBlockingQueue());
// need to pre-create threads, otherwise lambda execution will be captured by the
// initial thread run, and won't really test lambda execution capture
executor.prestartAllCoreThreads();
transactionMarker();
}
/**
* Test that duplicate cache flushes are filtered by
* DistributedCacheManagerDecorator.CacheMessageSendingTransactionSynchronization.exhaustQueue(...)
* to just the unique set, and that individual cache entry flushes are filtered if the containing caches are also
* being flushed in their entirety.
*/
@Test
public void testDuplicateCacheRemovalCase1() {
// duplicate caches, we expect these to be filtered to the unique set
Queue<CacheTarget> targets = Queues.newLinkedBlockingQueue();
targets.add(CacheTarget.entireCache(ROLE_RESPONSIBILITY_CACHE));
targets.add(CacheTarget.entireCache(ROLE_RESPONSIBILITY_CACHE));
targets.add(CacheTarget.entireCache(ROLE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(ROLE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(DELEGATE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(DELEGATE_TYPE_CACHE));
targets.add(CacheTarget.entireCache(ROLE_MEMBER_TYPE));
targets.add(CacheTarget.entireCache(ROLE_MEMBER_TYPE));
targets.add(CacheTarget.entireCache(PERMISSION_TYPE));
targets.add(CacheTarget.entireCache(PERMISSION_TYPE));
// specific cache entries by key. We expect these all to be filtered out because the entire caches
// are being flushed based on the targets added above.
targets.add(CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key1"));
targets.add(CacheTarget.singleEntry(ROLE_MEMBER_TYPE, "key2"));
targets.add(CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key3"));
targets.add(CacheTarget.singleEntry(ROLE_RESPONSIBILITY_CACHE, "key4"));
// the expected result is the unique set of caches from targets
ArrayList<CacheTarget> correctResults = Lists.newArrayList(
CacheTarget.entireCache(ROLE_RESPONSIBILITY_CACHE),
CacheTarget.entireCache(ROLE_MEMBER_TYPE),
CacheTarget.entireCache(ROLE_TYPE_CACHE),
CacheTarget.entireCache(DELEGATE_TYPE_CACHE),
CacheTarget.entireCache(PERMISSION_TYPE));
Collection<CacheTarget> results = new ArrayList<CacheTarget>(invokeExhaustQueue(targets));
assertTrue(CollectionUtils.diff(correctResults, results).isEmpty());
}
@Test
public void testEventOrdering() throws Exception
{
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_QTY);
BlockingQueue<Event> events = Queues.newLinkedBlockingQueue();
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
T cache = null;
try
{
client.start();
client.create().forPath("/root");
cache = newCache(client, "/root", events);
final Random random = new Random();
final Callable<Void> task = new Callable<Void>()
{
@Override
public Void call() throws Exception
{
for ( int i = 0; i < ITERATIONS; ++i )
{
String node = "/root/" + random.nextInt(NODE_QTY);
try
{
switch ( random.nextInt(3) )
{
default:
case 0:
client.create().forPath(node);
break;
case 1:
client.setData().forPath(node, "new".getBytes());
break;
case 2:
client.delete().forPath(node);
break;
}
}
catch ( KeeperException ignore )
{
// ignore
}
}
return null;
}
};
final CountDownLatch latch = new CountDownLatch(THREAD_QTY);
for ( int i = 0; i < THREAD_QTY; ++i )
{
Callable<Void> wrapped = new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try
{
return task.call();
}
finally
{
latch.countDown();
}
}
};
executorService.submit(wrapped);
}
Assert.assertTrue(timing.awaitLatch(latch));
timing.sleepABit();
List<Event> localEvents = Lists.newArrayList();
int eventSuggestedQty = 0;
while ( events.size() > 0 )
{
Event event = events.take();
localEvents.add(event);
eventSuggestedQty += (event.eventType == EventType.ADDED) ? 1 : -1;
}
int actualQty = getActualQty(cache);
Assert.assertEquals(actualQty, eventSuggestedQty, String.format("actual %s expected %s:\n %s", actualQty, eventSuggestedQty, asString(localEvents)));
}
finally
{
executorService.shutdownNow();
//noinspection ThrowFromFinallyBlock
executorService.awaitTermination(timing.milliseconds(), TimeUnit.MILLISECONDS);
CloseableUtils.closeQuietly(cache);
CloseableUtils.closeQuietly(client);
}
}
@Test
public void testBrokerChange() throws Exception {
// Create a new namespace in ZK for Kafka server for this test case
String connectionStr = zkServer.getConnectionStr() + "/broker_change";
ZKClientService zkClient = ZKClientService.Builder.of(connectionStr).build();
zkClient.startAndWait();
zkClient.create("/", null, CreateMode.PERSISTENT).get();
// Start a new kafka server
File logDir = TMP_FOLDER.newFolder();
EmbeddedKafkaServer server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir));
server.startAndWait();
// Start a Kafka client
KafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
kafkaClient.startAndWait();
// Attach a consumer
final BlockingQueue<String> consumedMessages = Queues.newLinkedBlockingQueue();
kafkaClient.getConsumer()
.prepare().addFromBeginning("test", 0).consume(new KafkaConsumer.MessageCallback() {
@Override
public long onReceived(Iterator<FetchedMessage> messages) {
long nextOffset = -1L;
while (messages.hasNext()) {
FetchedMessage message = messages.next();
nextOffset = message.getNextOffset();
consumedMessages.add(Charsets.UTF_8.decode(message.getPayload()).toString());
}
return nextOffset;
}
@Override
public void finished() {
// No-op
}
});
// Get a publisher and publish a message
KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.FIRE_AND_FORGET, Compression.NONE);
publisher.prepare("test").add(Charsets.UTF_8.encode("Message 0"), 0).send().get();
// Should receive one message
Assert.assertEquals("Message 0", consumedMessages.poll(5, TimeUnit.SECONDS));
// Now shutdown and restart the server on different port
server.stopAndWait();
server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir));
server.startAndWait();
// Wait a little while to make sure changes is reflected in broker service
TimeUnit.SECONDS.sleep(3);
// Now publish again with the same publisher. It should succeed and the consumer should receive the message.
publisher.prepare("test").add(Charsets.UTF_8.encode("Message 1"), 0).send().get();
Assert.assertEquals("Message 1", consumedMessages.poll(5, TimeUnit.SECONDS));
kafkaClient.stopAndWait();
zkClient.stopAndWait();
server.stopAndWait();
}
ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
chunks = Queues.newLinkedBlockingQueue();
this.responseStream = responseStream;
}
public MockRequester(LimiterServerResource limiterServer, long latencyMillis, int requestHandlerThreads) {
this.limiterServer = limiterServer;
this.latencyMillis = latencyMillis;
this.requestHandlerThreads = requestHandlerThreads;
this.requestAndCallbackQueue = Queues.newLinkedBlockingQueue();
}
@Test
public void testNonParallelCommit()
throws Exception {
Properties jobProps = new Properties();
jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test");
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_id_12345");
jobProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");
Map<String, JobState.DatasetState> datasetStateMap = Maps.newHashMap();
for (int i = 0; i < 2; i++) {
datasetStateMap.put(Integer.toString(i), new JobState.DatasetState());
}
final BlockingQueue<ControllableCallable<Void>> callables = Queues.newLinkedBlockingQueue();
final JobContext jobContext =
new ControllableCommitJobContext(jobProps, log, datasetStateMap, new Predicate<String>() {
@Override
public boolean apply(@Nullable String input) {
return true;
}
}, callables);
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new Runnable() {
@Override
public void run() {
try {
jobContext.commit();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
});
// Not parallelized, should only one commit running
ControllableCallable<Void> callable = callables.poll(1, TimeUnit.SECONDS);
Assert.assertNotNull(callable);
Assert.assertNull(callables.poll(200, TimeUnit.MILLISECONDS));
// unblock first commit, should see a second commit
callable.unblock();
callable = callables.poll(1, TimeUnit.SECONDS);
Assert.assertNotNull(callable);
Assert.assertNull(callables.poll(200, TimeUnit.MILLISECONDS));
Assert.assertFalse(future.isDone());
// unblock second commit, commit should complete
callable.unblock();
future.get(1, TimeUnit.SECONDS);
Assert.assertEquals(jobContext.getJobState().getState(), JobState.RunningState.COMMITTED);
}
@Test
public void testSingleExceptionSemantics()
throws Exception {
Properties jobProps = new Properties();
jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test");
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_id_12345");
jobProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");
Map<String, JobState.DatasetState> datasetStateMap = Maps.newHashMap();
for (int i = 0; i < 3; i++) {
datasetStateMap.put(Integer.toString(i), new JobState.DatasetState());
}
final BlockingQueue<ControllableCallable<Void>> callables = Queues.newLinkedBlockingQueue();
// There are three datasets, "0", "1", and "2", middle one will fail
final JobContext jobContext =
new ControllableCommitJobContext(jobProps, log, datasetStateMap, new Predicate<String>() {
@Override
public boolean apply(@Nullable String input) {
return !input.equals("1");
}
}, callables);
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new Runnable() {
@Override
public void run() {
try {
jobContext.commit();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
});
// All three commits should be run (even though second one fails)
callables.poll(1, TimeUnit.SECONDS).unblock();
callables.poll(1, TimeUnit.SECONDS).unblock();
callables.poll(1, TimeUnit.SECONDS).unblock();
try {
// check future is done
future.get(1, TimeUnit.SECONDS);
Assert.fail();
} catch (ExecutionException ee) {
// future should fail
}
// job failed
Assert.assertEquals(jobContext.getJobState().getState(), JobState.RunningState.FAILED);
}
@Test
public void testBasic() throws Exception
{
WorkflowListenerManager workflowListenerManager = null;
TestTaskExecutor taskExecutor = new TestTaskExecutor(6);
TaskType taskType = new TaskType("test", "1", true);
WorkflowManager workflowManager = WorkflowManagerBuilder.builder()
.addingTaskExecutor(taskExecutor, 10, taskType)
.withCurator(curator, "test", "1")
.build();
try
{
Task task = new Task(new TaskId(), taskType);
BlockingQueue<WorkflowEvent> eventQueue = Queues.newLinkedBlockingQueue();
workflowListenerManager = workflowManager.newWorkflowListenerManager();
workflowListenerManager.getListenable().addListener(eventQueue::add);
workflowManager.start();
workflowListenerManager.start();
RunId runId = workflowManager.submitTask(task);
timing.sleepABit();
WorkflowEvent runStarted = new WorkflowEvent(WorkflowEvent.EventType.RUN_STARTED, runId);
WorkflowEvent taskStarted = new WorkflowEvent(WorkflowEvent.EventType.TASK_STARTED, runId, task.getTaskId());
WorkflowEvent event1 = eventQueue.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
WorkflowEvent event2 = eventQueue.poll(timing.milliseconds(), TimeUnit.MILLISECONDS);
// due to timing, task start might come first
Assert.assertTrue((event1.equals(runStarted) && event2.equals(taskStarted)) || (event2.equals(runStarted) && event1.equals(taskStarted)));
Assert.assertEquals(eventQueue.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), new WorkflowEvent(WorkflowEvent.EventType.TASK_COMPLETED, runId, task.getTaskId()));
Assert.assertEquals(eventQueue.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), new WorkflowEvent(WorkflowEvent.EventType.RUN_UPDATED, runId));
}
finally
{
CloseableUtils.closeQuietly(workflowListenerManager);
CloseableUtils.closeQuietly(workflowManager);
}
}
@Test(enabled = false) // this test is very flakey - it needs to be re-written at some point
public void testMissedDelete() throws Exception
{
Timing timing = new Timing();
PathChildrenCache cache = null;
CuratorFramework client1 = null;
CuratorFramework client2 = null;
TestingCluster cluster = createAndStartCluster(3);
try
{
// client 1 only connects to 1 server
InstanceSpec client1Instance = cluster.getInstances().iterator().next();
client1 = CuratorFrameworkFactory.newClient(client1Instance.getConnectString(), 1000, 1000, new RetryOneTime(1));
cache = new PathChildrenCache(client1, "/test", true);
final BlockingQueue<PathChildrenCacheEvent.Type> events = Queues.newLinkedBlockingQueue();
PathChildrenCacheListener listener = new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
events.add(event.getType());
}
};
cache.getListenable().addListener(listener);
client2 = CuratorFrameworkFactory.newClient(cluster.getConnectString(), 1000, 1000, new RetryOneTime(1));
client1.start();
client2.start();
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.INITIALIZED);
client2.create().creatingParentsIfNeeded().forPath("/test/node", "first".getBytes());
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
cluster.killServer(client1Instance);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_LOST);
client2.delete().forPath("/test/node");
client2.create().forPath("/test/node", "second".getBytes());
cluster.restartServer(client1Instance);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED);
Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); // "/test/node" is different - should register as updated
}
finally
{
CloseableUtils.closeQuietly(client1);
CloseableUtils.closeQuietly(client2);
CloseableUtils.closeQuietly(cache);
CloseableUtils.closeQuietly(cluster);
}
}
/**
* The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to
* {@link StreamObserver} must be synchronized across threads, no further calls should be made
* directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}.
*/
public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
this.chunks = Queues.newLinkedBlockingQueue();
this.responseStream = responseStream;
}
/**
* The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to
* {@link StreamObserver} must be synchronized across threads, no further calls should be made
* directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}.
*/
public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
this.chunks = Queues.newLinkedBlockingQueue();
this.responseStream = responseStream;
}