下面列出了怎么用com.rabbitmq.client.QueueingConsumer.Delivery的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public boolean hasNext() throws IOException, CollectionException {
try {
Delivery d = consumer.nextDelivery(timeout * 1000);
if (d == null) {
LOG.info(" [RabbitReader] timout, exiting reader!");
return false;
}
nextDelivery = d.getBody();
deliveryTag = d.getEnvelope().getDeliveryTag();
return true;
} catch (InterruptedException ie) {
LOG.info(" [RabbitReader] timout2, exiting reader!");
return false;
} catch (Exception e) {
throw new CollectionException(e);
}
}
public static void main(String[] args) throws Exception {
//1 创建一个ConnectionFactory, 并进行配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过connection创建一个Channel
Channel channel = connection.createChannel();
//4 声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 设置Channel
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
//7 获取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.err.println("headers get my1 value: " + headers.get("my1") + "\tmy1 value:" + headers.get("my2"));
//Envelope envelope = delivery.getEnvelope();
}
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不设置路由键
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示声明了一个交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示声明了一个队列
channel.queueDeclare(queueName, false, false, false, null);
//建立一个绑定关系:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while (true) {
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
//String routingKey = "user.*";
// 1 声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 2 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 3 建立交换机和队列的绑定关系:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
private void processDelivery(Delivery delivery) throws Exception {
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
DataInputStream data = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(delivery.getBody())));
RenderMode mode = RenderMode.values()[data.readUnsignedByte()];
int width = data.readUnsignedShort();
int height = data.readUnsignedShort();
GameProfile profile = Profiles.readGameProfile(data);
Map<String, String[]> params = Maps.newHashMap();
int len = data.readUnsignedShort();
for (int i = 0; i < len; i++) {
String key = data.readUTF();
String[] val = new String[data.readUnsignedByte()];
for (int v = 0; v < val.length; v++) {
val[v] = data.readUTF();
}
params.put(key, val);
}
byte[] skinData = new byte[data.readInt()];
data.readFully(skinData);
BufferedImage skin = new PngImage().read(new ByteArrayInputStream(skinData), false);
Visage.log.info("Received a job to render a "+width+"x"+height+" "+mode.name().toLowerCase()+" for "+(profile == null ? "null" : profile.getName()));
RenderConfiguration conf = new RenderConfiguration(Type.fromMode(mode), Profiles.isSlim(profile), mode.isTall(), Profiles.isFlipped(profile));
glClearColor(0, 0, 0, 0);
glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT);
byte[] pngBys = draw(conf, width, height, profile, skin, params);
if (Visage.trace) Visage.log.finest("Got png bytes");
parent.channel.basicPublish("", props.getReplyTo(), replyProps, buildResponse(0, pngBys));
if (Visage.trace) Visage.log.finest("Published response");
parent.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
if (Visage.trace) Visage.log.finest("Ack'd message");
}
public void process(Delivery delivery) throws IOException {
toProcess.addLast(delivery);
}
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constant.ip);
connectionFactory.setPort(Constant.port);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者: " + msg);
}
}
/**
* 接口方法实现消息的处理
* @param delivery
*/
void processMessage(Delivery delivery) throws Exception;