下面列出了org.mockito.internal.stubbing.answers.ReturnsElementsOf#com.rabbitmq.client.Channel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置连接地址
factory.setHost("seaof-153-125-234-173.jp-tokyo-10.arukascloud.io");
factory.setPort(31084);
//获取连接
Connection connection = factory.newConnection();
//获取渠道
Channel channel = connection.createChannel();
//声明队列,如果不存在就新建
//参数1队列名称;参数2是否持久化;参数3排他性队列,连接断开自动删除;参数4是否自动删除;参数5.参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = Thread.currentThread().getName() + "Hello ";
//参数1 交换机;参数2 路由键;参数3 基础属性;参数4 消息体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(Thread.currentThread().getName() + "[send]" + message);
channel.close();
connection.close();
}
@RabbitListener(queues="${spring.rabbitmq.queues}", containerFactory="jadyerRabbitListenerContainerFactory")
public void receive(UserMsg userMsg, Channel channel, Message message){
try {
LogUtil.getLogger().info("收到消息-->[{}]", ReflectionToStringBuilder.toString(userMsg));
//确认消费成功(第一个参数:消息编号。第二个参数:是否确认多条消息,false为确认当前消息,true为确认deliveryTag编号以前的所有消息)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
LogUtil.getLogger().error("消息处理异常,消息ID={}, 消息体=[{}]", message.getMessageProperties().getCorrelationId(), JSON.toJSONString(userMsg), e);
try {
//拒绝当前消息,并把消息返回原队列(第三个参数:是否将消息放回队列,true表示放回队列,false表示丢弃消息)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//basicReject只能拒绝一条消息,而basicNack能够拒绝多条消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException e1) {
LogUtil.getLogger().error("消息basicNack时发生异常,消息ID={}", message.getMessageProperties().getCorrelationId(), e);
}
}
}
public static void sendMessage(String message)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ConfigurationLoader.getInstance().getRabbitmqNodename());
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
public void consumer2() throws Exception {
//1、获取连接
Connection connection =RabbitMqConnectionFactoy.getConnection();
//2、声明通道
Channel channel = connection.createChannel();
//3、声明队列
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME,"test.#");//基数偶数都接收
//同一时刻服务器只会发送一条消息给消费者(如果设置为N,则当客户端堆积N条消息后服务端不会推送给客户端了)
//channel.basicQos(1);//每次只从服务器取1个处理
//4、定义队列的消费者
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("-->消费者2号,收到消息,msg :"+message+",header:"+delivery.getProperties().getHeaders().toString());
channel.basicAck( delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME2, autoAck, deliverCallback, consumerTag -> { });
}
public void execute(String mode) throws Exception {
Channel channel = AMQPCommon.connect();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("trade.request.q", true, consumer);
if (mode.equalsIgnoreCase("stable")) {lower = 300; upper = 1200;}
if (mode.equalsIgnoreCase("better")) {lower = 300; upper = 800;}
if (mode.equalsIgnoreCase("worse")) {lower = 800; upper = 1900;}
if (mode.equalsIgnoreCase("erratic")) {lower = 200; upper = 5000;}
while (true) {
QueueingConsumer.Delivery message = consumer.nextDelivery();
String msg = new String(message.getBody());
System.out.println("trade order received: " + msg);
int response = lower + (int) (Math.random() * (upper-lower));
System.out.println("trade placed, duration = " + response);
String newMsg = "response";
byte[] bmsg = newMsg.getBytes();
Thread.sleep(response);
channel.basicPublish("", "trade.response.q", null, bmsg);
}
}
@Test
void exchangeBind_binds_two_exchanges() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThat(channel.exchangeDeclare("ex-from", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.exchangeDeclare("ex-to", BuiltinExchangeType.FANOUT)).isNotNull();
assertThat(channel.queueDeclare()).isNotNull();
assertThat(channel.exchangeBind("ex-to", "ex-from", "test.key")).isNotNull();
assertThat(channel.queueBind("", "ex-to", "queue.used")).isNotNull();
channel.basicPublish("ex-from", "unused", null, "test message".getBytes());
GetResponse response = channel.basicGet("", false);
assertThat(response).isNotNull();
assertThat(new String(response.getBody())).isEqualTo("test message");
}
}
}
@Test
void metrics_collector_is_invoked_on_basic_reject() throws IOException, TimeoutException {
MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
SimpleMeterRegistry registry = new SimpleMeterRegistry();
mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));
try (MockConnection connection = mockConnectionFactory.newConnection();
Channel channel = connection.createChannel(42)) {
String queueName = channel.queueDeclare().getQueue();
channel.basicPublish("", queueName, null, "".getBytes());
GetResponse getResponse = channel.basicGet(queueName, false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(0);
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(1);
}
}
@Test
void expired_message_should_be_consumable_after_being_dead_lettered() throws IOException, TimeoutException, InterruptedException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
channel.queueDeclare("rejected", true, false, false, null);
channel.queueBindNoWait("rejected", "rejected-ex", "unused", null);
queue("fruits").withMessageTtl(10L).withDeadLetterExchange("rejected-ex").declare(channel);
List<String> messages = new ArrayList<>();
channel.basicConsume("rejected", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
messages.add(new String(body));
}
});
channel.basicPublish("", "fruits", null, "banana".getBytes());
TimeUnit.MILLISECONDS.sleep(100L);
assertThat(messages).hasSize(1);
}
}
}
/**
* Publishes message with provided AMQP properties (see
* {@link BasicProperties}) to a pre-defined AMQP Exchange.
*
* @param bytes bytes representing a message.
* @param properties instance of {@link BasicProperties}
* @param exchange the name of AMQP exchange to which messages will be published.
* If not provided 'default' exchange will be used.
* @param routingKey (required) the name of the routingKey to be used by AMQP-based
* system to route messages to its final destination (queue).
*/
void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
this.validateStringProperty("routingKey", routingKey);
exchange = exchange == null ? "" : exchange.trim();
if (exchange.length() == 0) {
processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
+ "' exchange with '" + routingKey + "' as a routing key.");
final Channel channel = getChannel();
if (channel.isOpen()) {
try {
channel.basicPublish(exchange, routingKey, true, properties, bytes);
} catch (Exception e) {
throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
}
} else {
throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed");
}
}
@Test
void metrics_collector_is_invoked_on_basic_nack() throws IOException, TimeoutException {
MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
SimpleMeterRegistry registry = new SimpleMeterRegistry();
mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));
try (MockConnection connection = mockConnectionFactory.newConnection();
Channel channel = connection.createChannel(42)) {
String queueName = channel.queueDeclare().getQueue();
channel.basicPublish("", queueName, null, "".getBytes());
GetResponse getResponse = channel.basicGet(queueName, false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(0);
channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, false);
assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(1);
}
}
@Test
public void testCloseConnection() throws Exception {
Assume.assumeTrue( rabbitMqIsRunning );
Channel channel = RabbitMqTestUtils.createTestChannel();
Assert.assertTrue( channel.isOpen());
Assert.assertTrue( channel.getConnection().isOpen());
// Close it
RabbitMqUtils.closeConnection( channel );
Assert.assertFalse( channel.isOpen());
Assert.assertFalse( channel.getConnection().isOpen());
// Make sure closing an already closed channel does not throw an exception
RabbitMqUtils.closeConnection( channel );
Assert.assertFalse( channel.isOpen());
Assert.assertFalse( channel.getConnection().isOpen());
}
public static void main(String[] args) throws Exception {
Channel channel = AMQPCommon.connect();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume("trade.eq.q", false, consumer);
while (true) {
QueueingConsumer.Delivery msg = consumer.nextDelivery();
System.out.println("received: " + new String(msg.getBody()));
Thread.sleep(2000);
channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
}
}
@Override
protected ConnectionFactory setupConnectionFactory() {
ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
Connection connection = Mockito.mock(Connection.class);
try {
Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class));
} catch (IOException | TimeoutException e) {
fail("Test environment couldn't be created.");
}
return connectionFactory;
}
@Before
public void before() throws Exception {
serializationSchema = spy(new DummySerializationSchema());
rmqConnectionConfig = mock(RMQConnectionConfig.class);
connectionFactory = mock(ConnectionFactory.class);
connection = mock(Connection.class);
channel = mock(Channel.class);
when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
when(connectionFactory.newConnection()).thenReturn(connection);
when(connection.createChannel()).thenReturn(channel);
}
private void trySendContent(Stream<byte[]> content) throws IOException, TimeoutException {
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclarePassive(exchange);
sendContentOnChannel(channel, content);
}
}
@Test
default void channelPoolShouldCreateDifferentChannels() {
ChannelPool channelPool = getChannelPool(2);
Channel channel1 = borrowChannel(channelPool);
Channel channel2 = borrowChannel(channelPool);
assertThat(channel1.getChannelNumber())
.isNotEqualTo(channel2.getChannelNumber());
}
@Test
void getConnection_returns_the_actual_connection_which_created_the_channel() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThat(channel.getConnection()).isEqualTo(conn);
}
}
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
/*
* prefetchSize:消息限制大小,一般为0,不做限制。
* prefetchCount:一次处理消息的个数,一般设置为1
* global:一般为false。true,在channel级别做限制;false,在consumer级别做限制
*/
channel.basicQos(0, 1, false);
// 限流方式 第一件事就是 autoAck设置为 false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
public void dispatchMessages() throws Exception {
Channel channel = AMQPCommon.connect();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume("trade.eq.q", false, consumer);
java.util.Scanner input = new java.util.Scanner(System.in);
System.out.print("Display Allocation Map? (y/n): ");
display = input.next().equalsIgnoreCase("y");
input.close();
//start with 5 threads...
for (long i=1;i<6;i++) {
TradeProcessor processor = new TradeProcessor(this, i);
threadpool.put(i, processor);
processingCountMap.put(i, 0L);
new Thread(()->processor.start()).start();
}
displayAllocationMap();
while (true) {
QueueingConsumer.Delivery msg = consumer.nextDelivery();
channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
String trade = new String(msg.getBody());
String context = getContext(trade);
Long threadId = 0L;
if (allocationMap.containsKey(context)) {
threadId = allocationMap.get(context);
} else {
threadId = getNextAvailableThread();
allocationMap.put(context, threadId);
}
processingCountMap.put(threadId, processingCountMap.get(threadId)+1);
if (display) System.out.println("Dispatcher: Received " + trade);
displayAllocationMap();
threadpool.get(threadId).addMessage(new String(msg.getBody()));
}
}
@Test
void metrics_collector_reference_the_last_set_in_connection_factory() throws IOException, TimeoutException {
MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
SimpleMeterRegistry registry = new SimpleMeterRegistry();
try (MockConnection connection = mockConnectionFactory.newConnection();
Channel channel = connection.createChannel(42)) {
mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));
String queueName = channel.queueDeclare().getQueue();
channel.basicPublish("", queueName, null, "".getBytes());
assertThat(registry.get("rabbitmq.published").counter().count()).isEqualTo(1);
}
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "dlx.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
@Test
void rollback_without_select_throws() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> channel.txRollback())
.withMessage("No started transaction (make sure you called txSelect before txRollback");
}
}
}
@Test
public void testGetMessagesFromExistingQueueAndDefaultConfiguration()
throws IOException, TimeoutException {
// Mock channel and connection
Channel channel = mockBaseChannel();
Connection connection = mockGoodConnection(channel);
testGetMessagesFromQueueAndDefaultConfiguration(channel, connection,true, true);
}
RabbitMQMessageQueue(Channel channel, String name, Class<T> type, MetricRegistry metrics) {
this.channel = channel;
this.name = name;
this.type = type;
this.metrics = metrics;
this.publish = metrics.meter(MetricRegistry.name("queue", type.getSimpleName(), name, "publish"));
try {
channel.queueDeclare(name, true, false, false, null);
} catch (IOException e) {
throw new MessageQueueException("Unable to declare queue.", e);
}
}
@Override
public void run() {
try {
Channel producerChannel = producerChannels.get(getBucket(this.queueName));
ensureQueueExists(producerChannel,queueName);
this.messageQueue = new RemoteMessageQueue(RabbitMQMessagingService.this, queueExecutor, producerChannel, exchangeName, queueName);
messageQueue.initialize();
} catch(Exception e) {
this.exception = e;
} finally {
waitLatch.countDown();
}
}
@Override
public void catchInvokeException(Channel t, Object proxy, Method method, Object[] args, Throwable e) {
if (needDoCap(method, args)) {
doCap(-1, t, method, e);
}
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示声明了一个交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示声明了一个队列
channel.queueDeclare(queueName, false, false, false, null);
//建立一个绑定关系:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while (true) {
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
@Test
void commit_without_select_throws() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> channel.txCommit())
.withMessage("No started transaction (make sure you called txSelect before txCommit");
}
}
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(XianConfig.get("rabbitmqUserName"));
factory.setPassword(XianConfig.get("rabbitmqPwd"));
factory.setVirtualHost("/");
factory.setHost("production-internet-mq.apaycloud.com");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String exchangeName = "yy-exchange";
String routingKey = "yy-routingKey";
String queueName = "yy-queueName";
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
byte[] messageBodyBytes = "Hello, world2!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
Thread.sleep(1000 * 60);
channel.close();
conn.close();
}
public static void publish(String routingKey, Object data) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
MessageQueueDataModel messageQueueConfig = ApplicationConfigProvider.getInstance().getMessageQueue();
factory.setHost(messageQueueConfig.getHost());
factory.setUsername(messageQueueConfig.getUser());
factory.setPassword(messageQueueConfig.getPassword());
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(MessageConstants.EXCHANGE_NAME, MessageConstants.EXCHANGE_TYPE);
String message = new GsonBuilder().disableHtmlEscaping().create().toJson(data);
channel.basicPublish(MessageConstants.EXCHANGE_NAME, routingKey, null, message.getBytes());
connection.close();
}