org.mockito.internal.stubbing.answers.ReturnsElementsOf#com.rabbitmq.client.Channel源码实例Demo

下面列出了org.mockito.internal.stubbing.answers.ReturnsElementsOf#com.rabbitmq.client.Channel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: demo_springboot_rabbitmq   文件: Send.java
public static void main(String[] args) throws IOException, TimeoutException {

        //建立连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置连接地址
        factory.setHost("seaof-153-125-234-173.jp-tokyo-10.arukascloud.io");
        factory.setPort(31084);
        //获取连接
        Connection connection = factory.newConnection();
        //获取渠道
        Channel channel = connection.createChannel();
        //声明队列,如果不存在就新建
        //参数1队列名称;参数2是否持久化;参数3排他性队列,连接断开自动删除;参数4是否自动删除;参数5.参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送的消息
        String message = Thread.currentThread().getName() + "Hello ";

        //参数1 交换机;参数2 路由键;参数3 基础属性;参数4 消息体
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(Thread.currentThread().getName() + "[send]" + message);
        channel.close();
        connection.close();

    }
 
源代码2 项目: seed   文件: ReceiveService.java
@RabbitListener(queues="${spring.rabbitmq.queues}", containerFactory="jadyerRabbitListenerContainerFactory")
public void receive(UserMsg userMsg, Channel channel, Message message){
    try {
        LogUtil.getLogger().info("收到消息-->[{}]", ReflectionToStringBuilder.toString(userMsg));
        //确认消费成功(第一个参数:消息编号。第二个参数:是否确认多条消息,false为确认当前消息,true为确认deliveryTag编号以前的所有消息)
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch(Exception e){
        LogUtil.getLogger().error("消息处理异常,消息ID={}, 消息体=[{}]", message.getMessageProperties().getCorrelationId(), JSON.toJSONString(userMsg), e);
        try {
            //拒绝当前消息,并把消息返回原队列(第三个参数:是否将消息放回队列,true表示放回队列,false表示丢弃消息)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //basicReject只能拒绝一条消息,而basicNack能够拒绝多条消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        } catch (IOException e1) {
            LogUtil.getLogger().error("消息basicNack时发生异常,消息ID={}", message.getMessageProperties().getCorrelationId(), e);
        }
    }
}
 
源代码3 项目: neo4j-mazerunner   文件: Worker.java
public static void sendMessage(String message)
        throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(ConfigurationLoader.getInstance().getRabbitmqNodename());
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
}
 
源代码4 项目: util4j   文件: ProducerExchangeConsumer_Topic.java
public void consumer2() throws Exception {
	//1、获取连接
       Connection connection =RabbitMqConnectionFactoy.getConnection();
       //2、声明通道
       Channel channel = connection.createChannel();
       //3、声明队列
       channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
       //绑定队列到交换机
       channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME,"test.#");//基数偶数都接收
       //同一时刻服务器只会发送一条消息给消费者(如果设置为N,则当客户端堆积N条消息后服务端不会推送给客户端了)
       //channel.basicQos(1);//每次只从服务器取1个处理
       //4、定义队列的消费者
       DeliverCallback deliverCallback = (consumerTag, delivery) -> {
           String message = new String(delivery.getBody(), "UTF-8");
           System.out.println("-->消费者2号,收到消息,msg :"+message+",header:"+delivery.getProperties().getHeaders().toString());
           channel.basicAck( delivery.getEnvelope().getDeliveryTag(), false);
       };
       channel.basicConsume(QUEUE_NAME2, autoAck, deliverCallback, consumerTag -> { });
}
 
源代码5 项目: reactive   文件: AMQPConsumer.java
public void execute(String mode) throws Exception {
	
	Channel channel = AMQPCommon.connect();
	QueueingConsumer consumer = new QueueingConsumer(channel);
	channel.basicConsume("trade.request.q", true, consumer);
	
	if (mode.equalsIgnoreCase("stable")) {lower = 300; upper = 1200;}
	if (mode.equalsIgnoreCase("better")) {lower = 300; upper = 800;}
	if (mode.equalsIgnoreCase("worse")) {lower = 800; upper = 1900;}
	if (mode.equalsIgnoreCase("erratic")) {lower = 200; upper = 5000;}

	while (true) {
		QueueingConsumer.Delivery message = consumer.nextDelivery();
		String msg = new String(message.getBody());
		System.out.println("trade order received: " + msg);
		int response = lower + (int) (Math.random() * (upper-lower));
		System.out.println("trade placed, duration = " + response);
		String newMsg = "response";
		byte[] bmsg = newMsg.getBytes();
		Thread.sleep(response);
		channel.basicPublish("", "trade.response.q", null, bmsg);
	}
}
 
源代码6 项目: rabbitmq-mock   文件: ChannelTest.java
@Test
void exchangeBind_binds_two_exchanges() throws IOException, TimeoutException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            assertThat(channel.exchangeDeclare("ex-from", BuiltinExchangeType.FANOUT)).isNotNull();
            assertThat(channel.exchangeDeclare("ex-to", BuiltinExchangeType.FANOUT)).isNotNull();
            assertThat(channel.queueDeclare()).isNotNull();

            assertThat(channel.exchangeBind("ex-to", "ex-from", "test.key")).isNotNull();
            assertThat(channel.queueBind("", "ex-to", "queue.used")).isNotNull();

            channel.basicPublish("ex-from", "unused", null, "test message".getBytes());
            GetResponse response = channel.basicGet("", false);
            assertThat(response).isNotNull();
            assertThat(new String(response.getBody())).isEqualTo("test message");
        }
    }
}
 
源代码7 项目: rabbitmq-mock   文件: MetricsCollectorTest.java
@Test
void metrics_collector_is_invoked_on_basic_reject() throws IOException, TimeoutException {
    MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
    SimpleMeterRegistry registry = new SimpleMeterRegistry();
    mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));

    try (MockConnection connection = mockConnectionFactory.newConnection();
         Channel channel = connection.createChannel(42)) {
        String queueName = channel.queueDeclare().getQueue();
        channel.basicPublish("", queueName, null, "".getBytes());
        GetResponse getResponse = channel.basicGet(queueName, false);

        assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(0);
        channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
        assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(1);
    }
}
 
源代码8 项目: rabbitmq-mock   文件: ComplexUseCasesTests.java
@Test
void expired_message_should_be_consumable_after_being_dead_lettered() throws IOException, TimeoutException, InterruptedException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            channel.exchangeDeclare("rejected-ex", BuiltinExchangeType.FANOUT);
            channel.queueDeclare("rejected", true, false, false, null);
            channel.queueBindNoWait("rejected", "rejected-ex", "unused", null);
            queue("fruits").withMessageTtl(10L).withDeadLetterExchange("rejected-ex").declare(channel);

            List<String> messages = new ArrayList<>();
            channel.basicConsume("rejected", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) {
                    messages.add(new String(body));
                }
            });

            channel.basicPublish("", "fruits", null, "banana".getBytes());
            TimeUnit.MILLISECONDS.sleep(100L);
            assertThat(messages).hasSize(1);
        }
    }
}
 
源代码9 项目: nifi   文件: AMQPPublisher.java
/**
 * Publishes message with provided AMQP properties (see
 * {@link BasicProperties}) to a pre-defined AMQP Exchange.
 *
 * @param bytes bytes representing a message.
 * @param properties instance of {@link BasicProperties}
 * @param exchange the name of AMQP exchange to which messages will be published.
 *            If not provided 'default' exchange will be used.
 * @param routingKey (required) the name of the routingKey to be used by AMQP-based
 *            system to route messages to its final destination (queue).
 */
void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
    this.validateStringProperty("routingKey", routingKey);
    exchange = exchange == null ? "" : exchange.trim();
    if (exchange.length() == 0) {
        processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
    }
    processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
            + "' exchange with '" + routingKey + "' as a routing key.");

    final Channel channel = getChannel();
    if (channel.isOpen()) {
        try {
            channel.basicPublish(exchange, routingKey, true, properties, bytes);
        } catch (Exception e) {
            throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
        }
    } else {
        throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed");
    }
}
 
源代码10 项目: rabbitmq-mock   文件: MetricsCollectorTest.java
@Test
void metrics_collector_is_invoked_on_basic_nack() throws IOException, TimeoutException {
    MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
    SimpleMeterRegistry registry = new SimpleMeterRegistry();
    mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));

    try (MockConnection connection = mockConnectionFactory.newConnection();
         Channel channel = connection.createChannel(42)) {
        String queueName = channel.queueDeclare().getQueue();
        channel.basicPublish("", queueName, null, "".getBytes());
        GetResponse getResponse = channel.basicGet(queueName, false);

        assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(0);
        channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, false);
        assertThat(registry.get("rabbitmq.rejected").counter().count()).isEqualTo(1);
    }
}
 
源代码11 项目: roboconf-platform   文件: RabbitMqUtilsTest.java
@Test
public void testCloseConnection() throws Exception {
	Assume.assumeTrue( rabbitMqIsRunning );

	Channel channel = RabbitMqTestUtils.createTestChannel();
	Assert.assertTrue( channel.isOpen());
	Assert.assertTrue( channel.getConnection().isOpen());

	// Close it
	RabbitMqUtils.closeConnection( channel );
	Assert.assertFalse( channel.isOpen());
	Assert.assertFalse( channel.getConnection().isOpen());

	// Make sure closing an already closed channel does not throw an exception
	RabbitMqUtils.closeConnection( channel );
	Assert.assertFalse( channel.isOpen());
	Assert.assertFalse( channel.getConnection().isOpen());
}
 
源代码12 项目: reactive   文件: AMQPContConsumer.java
public static void main(String[] args) throws Exception {
	Channel channel = AMQPCommon.connect();
	QueueingConsumer consumer = new QueueingConsumer(channel);
	channel.basicQos(1);
	channel.basicConsume("trade.eq.q", false, consumer);

	while (true) {
		QueueingConsumer.Delivery msg = consumer.nextDelivery();
		System.out.println("received: " + new String(msg.getBody()));
		Thread.sleep(2000);
		channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
	}			
}
 
源代码13 项目: flink   文件: RMQSourceTest.java
@Override
protected ConnectionFactory setupConnectionFactory() {
	ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
	Connection connection = Mockito.mock(Connection.class);
	try {
		Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
		Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class));
	} catch (IOException | TimeoutException e) {
		fail("Test environment couldn't be created.");
	}
	return connectionFactory;
}
 
源代码14 项目: Flink-CEPplus   文件: RMQSinkTest.java
@Before
public void before() throws Exception {
	serializationSchema = spy(new DummySerializationSchema());
	rmqConnectionConfig = mock(RMQConnectionConfig.class);
	connectionFactory = mock(ConnectionFactory.class);
	connection = mock(Connection.class);
	channel = mock(Channel.class);

	when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
	when(connectionFactory.newConnection()).thenReturn(connection);
	when(connection.createChannel()).thenReturn(channel);
}
 
源代码15 项目: james-project   文件: AmqpForwardAttribute.java
private void trySendContent(Stream<byte[]> content) throws IOException, TimeoutException {
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclarePassive(exchange);
        sendContentOnChannel(channel, content);
    }
}
 
源代码16 项目: james-project   文件: ChannelPoolContract.java
@Test
default void channelPoolShouldCreateDifferentChannels() {
    ChannelPool channelPool = getChannelPool(2);
    Channel channel1 = borrowChannel(channelPool);
    Channel channel2 = borrowChannel(channelPool);

    assertThat(channel1.getChannelNumber())
        .isNotEqualTo(channel2.getChannelNumber());
}
 
源代码17 项目: rabbitmq-mock   文件: ChannelTest.java
@Test
void getConnection_returns_the_actual_connection_which_created_the_channel() throws IOException, TimeoutException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            assertThat(channel.getConnection()).isEqualTo(conn);
        }
    }
}
 
源代码18 项目: code   文件: Consumer.java
public static void main(String[] args) throws Exception {
	
	
	ConnectionFactory connectionFactory = new ConnectionFactory();
	connectionFactory.setHost(Constant.ip);
	connectionFactory.setPort(Constant.port);
	connectionFactory.setVirtualHost("/");
	
	Connection connection = connectionFactory.newConnection();
	Channel channel = connection.createChannel();
	
	
	String exchangeName = "test_qos_exchange";
	String queueName = "test_qos_queue";
	String routingKey = "qos.#";
	
	channel.exchangeDeclare(exchangeName, "topic", true, false, null);
	channel.queueDeclare(queueName, true, false, false, null);
	channel.queueBind(queueName, exchangeName, routingKey);
	
	/*
	 * prefetchSize:消息限制大小,一般为0,不做限制。
	 * prefetchCount:一次处理消息的个数,一般设置为1
	 * global:一般为false。true,在channel级别做限制;false,在consumer级别做限制
	 */
	channel.basicQos(0, 1, false);

	// 限流方式  第一件事就是 autoAck设置为 false
	channel.basicConsume(queueName, false, new MyConsumer(channel));
	
	
}
 
源代码19 项目: reactive   文件: Dispatcher.java
public void dispatchMessages() throws Exception {
	Channel channel = AMQPCommon.connect();
	QueueingConsumer consumer = new QueueingConsumer(channel);
	channel.basicQos(1);
	channel.basicConsume("trade.eq.q", false, consumer);

	java.util.Scanner input = new java.util.Scanner(System.in);
    System.out.print("Display Allocation Map? (y/n): ");
    display = input.next().equalsIgnoreCase("y");
	input.close();
	
	//start with 5 threads...
	for (long i=1;i<6;i++) {
		TradeProcessor processor = new TradeProcessor(this, i);
		threadpool.put(i, processor);
		processingCountMap.put(i, 0L);
		new Thread(()->processor.start()).start();
	}

	displayAllocationMap();
	while (true) {
		QueueingConsumer.Delivery msg = consumer.nextDelivery();
		channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
		String trade = new String(msg.getBody());
		String context = getContext(trade);
		Long threadId = 0L;
					
		if (allocationMap.containsKey(context)) {
			threadId = allocationMap.get(context);
		} else {
			threadId = getNextAvailableThread();
			allocationMap.put(context, threadId);
		}
		processingCountMap.put(threadId, processingCountMap.get(threadId)+1);
		if (display) System.out.println("Dispatcher: Received " + trade);
		displayAllocationMap();
		threadpool.get(threadId).addMessage(new String(msg.getBody()));				
	}			
}
 
源代码20 项目: rabbitmq-mock   文件: MetricsCollectorTest.java
@Test
void metrics_collector_reference_the_last_set_in_connection_factory() throws IOException, TimeoutException {
    MockConnectionFactory mockConnectionFactory = new MockConnectionFactory();
    SimpleMeterRegistry registry = new SimpleMeterRegistry();

    try (MockConnection connection = mockConnectionFactory.newConnection();
         Channel channel = connection.createChannel(42)) {

        mockConnectionFactory.setMetricsCollector(new MicrometerMetricsCollector(registry));

        String queueName = channel.queueDeclare().getQueue();
        channel.basicPublish("", queueName, null, "".getBytes());
        assertThat(registry.get("rabbitmq.published").counter().count()).isEqualTo(1);
    }
}
 
源代码21 项目: code   文件: Consumer.java
public static void main(String[] args) throws Exception {

		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(Constant.ip);
		connectionFactory.setPort(Constant.port);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		// 这就是一个普通的交换机 和 队列 以及路由
		String exchangeName = "test_dlx_exchange";
		String queueName = "test_dlx_queue";
		String routingKey = "dlx.#";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);

		Map<String, Object> arguments = new HashMap<>();
		arguments.put("x-dead-letter-exchange", "dlx.exchange");
		//这个agruments属性,要设置到声明队列上
		channel.queueDeclare(queueName, true, false, false, arguments);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//要进行死信队列的声明:
		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
		channel.queueDeclare("dlx.queue", true, false, false, null);
		channel.queueBind("dlx.queue", "dlx.exchange", "#");
		
		channel.basicConsume(queueName, true, new MyConsumer(channel));
		
		
	}
 
源代码22 项目: rabbitmq-mock   文件: ChannelTest.java
@Test
void rollback_without_select_throws() throws IOException, TimeoutException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            assertThatExceptionOfType(IllegalStateException.class)
                .isThrownBy(() -> channel.txRollback())
                .withMessage("No started transaction (make sure you called txSelect before txRollback");
        }
    }
}
 
源代码23 项目: conductor   文件: AMQPObservableQueueTest.java
@Test
public void testGetMessagesFromExistingQueueAndDefaultConfiguration()
        throws IOException, TimeoutException {
    // Mock channel and connection
    Channel channel = mockBaseChannel();
    Connection connection = mockGoodConnection(channel);
    testGetMessagesFromQueueAndDefaultConfiguration(channel, connection,true, true);
}
 
RabbitMQMessageQueue(Channel channel, String name, Class<T> type, MetricRegistry metrics) {
    this.channel = channel;
    this.name = name;
    this.type = type;
    this.metrics = metrics;
    this.publish = metrics.meter(MetricRegistry.name("queue", type.getSimpleName(), name, "publish"));
    try {
        channel.queueDeclare(name, true, false, false, null);
    } catch (IOException e) {
        throw new MessageQueueException("Unable to declare queue.", e);
    }
}
 
源代码25 项目: elasticactors   文件: RabbitMQMessagingService.java
@Override
public void run() {
    try {
        Channel producerChannel = producerChannels.get(getBucket(this.queueName));
        ensureQueueExists(producerChannel,queueName);
        this.messageQueue =  new RemoteMessageQueue(RabbitMQMessagingService.this, queueExecutor, producerChannel, exchangeName, queueName);
        messageQueue.initialize();
    } catch(Exception e) {
        this.exception = e;
    } finally {
        waitLatch.countDown();
    }
}
 
源代码26 项目: uavstack   文件: RabbitmqIT.java
@Override
public void catchInvokeException(Channel t, Object proxy, Method method, Object[] args, Throwable e) {

    if (needDoCap(method, args)) {
        doCap(-1, t, method, e);
    }

}
 
源代码27 项目: code   文件: Consumer4DirectExchange.java
public static void main(String[] args) throws Exception {


        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.ip);
        connectionFactory.setPort(Constant.port);
        connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";

        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息  
        while (true) {
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
 
源代码28 项目: rabbitmq-mock   文件: ChannelTest.java
@Test
void commit_without_select_throws() throws IOException, TimeoutException {
    try (Connection conn = new MockConnectionFactory().newConnection()) {
        try (Channel channel = conn.createChannel()) {
            assertThatExceptionOfType(IllegalStateException.class)
                .isThrownBy(() -> channel.txCommit())
                .withMessage("No started transaction (make sure you called txSelect before txCommit");
        }
    }
}
 
源代码29 项目: xian   文件: RabbitMqClientDemo.java
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(XianConfig.get("rabbitmqUserName"));
    factory.setPassword(XianConfig.get("rabbitmqPwd"));
    factory.setVirtualHost("/");
    factory.setHost("production-internet-mq.apaycloud.com");
    factory.setPort(5672);
    Connection conn = factory.newConnection();
    Channel channel = conn.createChannel();


    String exchangeName = "yy-exchange";
    String routingKey = "yy-routingKey";
    String queueName = "yy-queueName";

    channel.exchangeDeclare(exchangeName, "direct", true);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);

    byte[] messageBodyBytes = "Hello, world2!".getBytes();
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);


    Thread.sleep(1000 * 60);

    channel.close();
    conn.close();
}
 
源代码30 项目: Insights   文件: MessagePublisherFactory.java
public static void publish(String routingKey, Object data) throws Exception{
	ConnectionFactory factory = new ConnectionFactory();
       MessageQueueDataModel messageQueueConfig = ApplicationConfigProvider.getInstance().getMessageQueue();
	factory.setHost(messageQueueConfig.getHost());
       factory.setUsername(messageQueueConfig.getUser());
	factory.setPassword(messageQueueConfig.getPassword());
       Connection connection = factory.newConnection();
       Channel channel = connection.createChannel();
       channel.exchangeDeclare(MessageConstants.EXCHANGE_NAME, MessageConstants.EXCHANGE_TYPE);
       
       String message = new GsonBuilder().disableHtmlEscaping().create().toJson(data);
       channel.basicPublish(MessageConstants.EXCHANGE_NAME, routingKey, null, message.getBytes());
       connection.close();
}