下面列出了怎么用java.util.concurrent.TransferQueue的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testConsumerSingleMessage() throws Exception {
TransferQueue<RabbitMessage> messages = new LinkedTransferQueue<>();
Channel channel = mock(Channel.class);
final Consumer consumer = new StreamSetsMessageConsumer(channel, messages);
final Envelope envelope = new Envelope(1L, false, EXCHANGE_NAME, QUEUE_NAME);
executor.submit(new Runnable() {
@Override
public void run() {
try {
consumer.handleDelivery("consumerTag", envelope, null, TEST_MESSAGE_1.getBytes());
} catch (IOException ignored) {
// no op
}
}
});
RabbitMessage message = messages.take();
assertEquals(TEST_MESSAGE_1, new String(message.getBody(), StandardCharsets.UTF_8));
}
Runnable newLinkedTransferQueueRunner() {
final TransferQueue<Integer> queue = new LinkedTransferQueue<>();
return new Runnable() {
@Override public void run() {
final ThreadLocalRandom random = ThreadLocalRandom.current();
for (;;) {
if (random.nextBoolean()) {
queue.offer(ELEMENT);
} else {
queue.poll();
}
calls.increment();
}
}
};
}
public static void main(String[] args) throws InterruptedException {
System.setProperty("java.util.logging.SimpleFormatter.format",
"[%1$tT] [%4$-7s] %5$s %n");
Thread thread = new Thread(() -> {
TransferQueue<String> queue = new LinkedTransferQueue<>();
while (!Thread.currentThread().isInterrupted()) {
try {
logger.info(() -> "For 3 seconds the thread "
+ Thread.currentThread().getName()
+ " will try to poll an element from queue ...");
queue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
logger.severe(() -> "InterruptedException! The thread "
+ Thread.currentThread().getName() + " was intrrupted!");
Thread.currentThread().interrupt(); // comment this line to see the effect
}
}
logger.info(() -> "The execution was stopped!");
});
thread.start();
Thread.sleep(1500);
thread.interrupt();
}
public DynamicBlockingQueue(final TransferQueue<E> delegate) {
this.delegate = delegate;
}
TransferQueue<RabbitMessage> getMessageQueue() {
return messages;
}
@Test
public void testHeaderProcessing() throws Exception {
((RabbitSourceConfigBean)conf).basicConfig.maxWaitTime = 1000; // Set this low so that we don't slow down the test.
stage = PowerMockito.spy(newStage());
// setup some fake data and force it onto the source's queue
RabbitSource source = (RabbitSource)stage;
TransferQueue<RabbitMessage> messages = source.getMessageQueue();
Envelope envelope = new Envelope(DELIVERY_TAG, REDELIVERED, EXCHANGE_NAME, QUEUE_NAME);
AMQP.BasicProperties.Builder propertiesBuilder = new AMQP.BasicProperties.Builder();
propertiesBuilder.contentType(CONTENT_TYPE);
Map<String, Object> customHeaders = new HashMap<>();
customHeaders.put(CUSTOM_HEADER_KEY, CUSTOM_HEADER_VAL);
propertiesBuilder.headers(customHeaders);
propertiesBuilder.clusterId(CLUSTER_ID);
AMQP.BasicProperties properties = propertiesBuilder.build();
RabbitMessage msg = new RabbitMessage(CONSUMER_TAG, envelope, properties, TEST_MESSAGE_1.getBytes());
source.getMessageQueue().put(msg);
doReturn(new ArrayList<Stage.ConfigIssue>()).when((RabbitSource)stage).init();
PowerMockito.doReturn(false).when(stage, "isConnected");
this.runner = newStageRunner("output");
// setup items which are not correctly configured in init
Channel channel = mock(Channel.class);
StreamSetsMessageConsumer consumer = new StreamSetsMessageConsumer(channel, messages);
source.setStreamSetsMessageConsumer(consumer);
DataParserFactory parserFactory = new DataParserFactoryBuilder(runner.getContext(), DataParserFormat.JSON)
.setCharset(StandardCharsets.UTF_8)
.setMode(JsonMode.MULTIPLE_OBJECTS)
.setMaxDataLen(-1)
.build();
source.setDataParserFactory(parserFactory);
runner.runInit();
StageRunner.Output output = ((SourceRunner)runner).runProduce(null, 1000);
List<Record> records = output.getRecords().get("output");
assertEquals(1, records.size());
Record record = records.get(0);
assertEquals(String.valueOf(DELIVERY_TAG), record.getHeader().getAttribute("deliveryTag"));
assertEquals(String.valueOf(REDELIVERED), record.getHeader().getAttribute("redelivered"));
assertEquals(EXCHANGE_NAME, record.getHeader().getAttribute("exchange"));
assertEquals(CONTENT_TYPE, record.getHeader().getAttribute("contentType"));
assertNull(record.getHeader().getAttribute("appId"));
assertEquals(CUSTOM_HEADER_VAL, record.getHeader().getAttribute(CUSTOM_HEADER_KEY));
runner.runDestroy();
}
Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToProduce = numberOfMessagesToProduce;
}
private boolean transfer(final LogEvent memento) {
return queue instanceof TransferQueue
? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
: queue.offer(memento);
}
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public StreamSetsMessageConsumer(Channel channel, TransferQueue<RabbitMessage> records) {
super(channel);
this.records = records;
}