类com.rabbitmq.client.QueueingConsumer.Delivery源码实例Demo

下面列出了怎么用com.rabbitmq.client.QueueingConsumer.Delivery的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: bluima   文件: RabbitReader.java
@Override
public boolean hasNext() throws IOException, CollectionException {

    try {
        Delivery d = consumer.nextDelivery(timeout * 1000);
        if (d == null) {
            LOG.info(" [RabbitReader] timout, exiting reader!");
            return false;
        }
        nextDelivery = d.getBody();
        deliveryTag = d.getEnvelope().getDeliveryTag();
        return true;

    } catch (InterruptedException ie) {
        LOG.info(" [RabbitReader] timout2, exiting reader!");
        return false;
    } catch (Exception e) {
        throw new CollectionException(e);
    }
}
 
源代码2 项目: code   文件: Consumer.java
public static void main(String[] args) throws Exception {

        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.ip);
        connectionFactory.setPort(Constant.port);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();

        //4 声明(创建)一个队列
        String queueName = "test001";
        channel.queueDeclare(queueName, true, false, false, null);

        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //6 设置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            //7 获取消息
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端: " + msg);
            Map<String, Object> headers = delivery.getProperties().getHeaders();
            System.err.println("headers get my1 value: " + headers.get("my1") + "\tmy1 value:" + headers.get("my2"));

            //Envelope envelope = delivery.getEnvelope();
        }

    }
 
源代码3 项目: code   文件: Consumer4FanoutExchange.java
public static void main(String[] args) throws Exception {
	
       ConnectionFactory connectionFactory = new ConnectionFactory() ;
       connectionFactory.setHost(Constant.ip);
       connectionFactory.setPort(Constant.port);
	connectionFactory.setVirtualHost("/");
	
       connectionFactory.setAutomaticRecoveryEnabled(true);
       connectionFactory.setNetworkRecoveryInterval(3000);
       Connection connection = connectionFactory.newConnection();
       
       Channel channel = connection.createChannel();  
	//4 声明
	String exchangeName = "test_fanout_exchange";
	String exchangeType = "fanout";
	String queueName = "test_fanout_queue";
	String routingKey = "";	//不设置路由键
	channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
	channel.queueDeclare(queueName, false, false, false, null);
	channel.queueBind(queueName, exchangeName, routingKey);
	
       //durable 是否持久化消息
       QueueingConsumer consumer = new QueueingConsumer(channel);
       //参数:队列名称、是否自动ACK、Consumer
       channel.basicConsume(queueName, true, consumer); 
       //循环获取消息  
       while(true){  
           //获取消息,如果没有消息,这一步将会一直阻塞  
           Delivery delivery = consumer.nextDelivery();  
           String msg = new String(delivery.getBody());
           System.out.println("收到消息:" + msg);
       } 
}
 
源代码4 项目: code   文件: Consumer4DirectExchange.java
public static void main(String[] args) throws Exception {


        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.ip);
        connectionFactory.setPort(Constant.port);
        connectionFactory.setVirtualHost("/");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";

        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息  
        while (true) {
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
 
源代码5 项目: code   文件: Consumer4TopicExchange.java
public static void main(String[] args) throws Exception {
	
	
       ConnectionFactory connectionFactory = new ConnectionFactory() ;

       connectionFactory.setHost(Constant.ip);
       connectionFactory.setPort(Constant.port);
	connectionFactory.setVirtualHost("/");
	
       connectionFactory.setAutomaticRecoveryEnabled(true);
       connectionFactory.setNetworkRecoveryInterval(3000);
       Connection connection = connectionFactory.newConnection();
       
       Channel channel = connection.createChannel();  
	//4 声明
	String exchangeName = "test_topic_exchange";
	String exchangeType = "topic";
	String queueName = "test_topic_queue";
	String routingKey = "user.#";
	//String routingKey = "user.*";
	// 1 声明交换机 
	channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
	// 2 声明队列
	channel.queueDeclare(queueName, false, false, false, null);
	// 3 建立交换机和队列的绑定关系:
	channel.queueBind(queueName, exchangeName, routingKey);
	
       //durable 是否持久化消息
       QueueingConsumer consumer = new QueueingConsumer(channel);
       //参数:队列名称、是否自动ACK、Consumer
       channel.basicConsume(queueName, true, consumer);  
       //循环获取消息  
       while(true){  
           //获取消息,如果没有消息,这一步将会一直阻塞  
           Delivery delivery = consumer.nextDelivery();  
           String msg = new String(delivery.getBody());
           System.out.println("收到消息:" + msg);
       } 
}
 
源代码6 项目: code   文件: Consumer.java
public static void main(String[] args) throws Exception {


        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(Constant.ip);
		connectionFactory.setPort(Constant.port);
        connectionFactory.setVirtualHost("/");

        //2 获取C	onnection
        Connection connection = connectionFactory.newConnection();

        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";

        //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());

            System.err.println("消费端: " + msg);
        }


    }
 
源代码7 项目: Visage   文件: RenderContext.java
private void processDelivery(Delivery delivery) throws Exception {
	BasicProperties props = delivery.getProperties();
	BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
	DataInputStream data = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(delivery.getBody())));
	RenderMode mode = RenderMode.values()[data.readUnsignedByte()];
	int width = data.readUnsignedShort();
	int height = data.readUnsignedShort();
	GameProfile profile = Profiles.readGameProfile(data);
	Map<String, String[]> params = Maps.newHashMap();
	int len = data.readUnsignedShort();
	for (int i = 0; i < len; i++) {
		String key = data.readUTF();
		String[] val = new String[data.readUnsignedByte()];
		for (int v = 0; v < val.length; v++) {
			val[v] = data.readUTF();
		}
		params.put(key, val);
	}
	byte[] skinData = new byte[data.readInt()];
	data.readFully(skinData);
	BufferedImage skin = new PngImage().read(new ByteArrayInputStream(skinData), false);
	Visage.log.info("Received a job to render a "+width+"x"+height+" "+mode.name().toLowerCase()+" for "+(profile == null ? "null" : profile.getName()));
	
	RenderConfiguration conf = new RenderConfiguration(Type.fromMode(mode), Profiles.isSlim(profile), mode.isTall(), Profiles.isFlipped(profile));
	
	glClearColor(0, 0, 0, 0);
	glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT);
	byte[] pngBys = draw(conf, width, height, profile, skin, params);
	if (Visage.trace) Visage.log.finest("Got png bytes");
	parent.channel.basicPublish("", props.getReplyTo(), replyProps, buildResponse(0, pngBys));
	if (Visage.trace) Visage.log.finest("Published response");
	parent.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
	if (Visage.trace) Visage.log.finest("Ack'd message");
}
 
源代码8 项目: Visage   文件: RenderContext.java
public void process(Delivery delivery) throws IOException {
	toProcess.addLast(delivery);
}
 
源代码9 项目: code   文件: Consumer.java
public static void main(String[] args) throws Exception {
	
	
	ConnectionFactory connectionFactory = new ConnectionFactory();
	connectionFactory.setHost(Constant.ip);
	connectionFactory.setPort(Constant.port);
	connectionFactory.setVirtualHost("/");
	
	Connection connection = connectionFactory.newConnection();
	Channel channel = connection.createChannel();
	
	String exchangeName = "test_return_exchange";
	String routingKey = "return.#";
	String queueName = "test_return_queue";
	
	channel.exchangeDeclare(exchangeName, "topic", true, false, null);
	channel.queueDeclare(queueName, true, false, false, null);
	channel.queueBind(queueName, exchangeName, routingKey);
	
	QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
	
	channel.basicConsume(queueName, true, queueingConsumer);
	
	while(true){
		
		Delivery delivery = queueingConsumer.nextDelivery();
		String msg = new String(delivery.getBody());
		System.err.println("消费者: " + msg);
	}
	
	
	
	
	
}
 
源代码10 项目: AsuraFramework   文件: IRabbitMqMessageLisenter.java
/**
 * 接口方法实现消息的处理
 * @param delivery
 */
void processMessage(Delivery delivery) throws Exception;