下面列出了io.netty.channel.socket.SocketChannel#attr ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void cacheNode(Node node, SocketChannel channel) {
String name = "node-" + node.getId();
boolean exists = AttributeKey.exists(name);
AttributeKey attributeKey;
if (exists) {
attributeKey = AttributeKey.valueOf(name);
} else {
attributeKey = AttributeKey.newInstance(name);
}
Attribute<Node> attribute = channel.attr(attributeKey);
attribute.set(node);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
SocketChannel channel = (SocketChannel) ctx.channel();
ByteBuf buf = (ByteBuf) msg;
String remoteIP = channel.remoteAddress().getHostString();
NulsByteBuffer byteBuffer = null;
Node node = null;
try {
String nodeId = IpUtil.getNodeId(channel.remoteAddress());
Attribute<Node> nodeAttribute = channel.attr(AttributeKey.valueOf("node-" + nodeId));
node = nodeAttribute.get();
if (node != null) {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
byteBuffer = new NulsByteBuffer(bytes);
} else {
LoggerUtil.COMMON_LOG.error("-----------------Server channelRead node is null -----------------" + remoteIP + ":" + channel.remoteAddress().getPort());
ctx.channel().close();
}
} catch (Exception e) {
LoggerUtil.COMMON_LOG.error(e);
// throw e;
} finally {
buf.clear();
}
MessageManager.getInstance().receiveMessage(byteBuffer, node);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
SocketChannel channel = (SocketChannel) ctx.channel();
String nodeId = IpUtil.getNodeId(channel.remoteAddress());
Attribute<Node> nodeAttribute = channel.attr(AttributeKey.valueOf("node-" + nodeId));
Node node = nodeAttribute.get();
if (node != null && node.getDisconnectListener() != null) {
node.getDisconnectListener().action();
}
LoggerUtil.COMMON_LOG.info("Server Node is channelUnregistered:{}:{}", channel.remoteAddress().getHostString(), channel.remoteAddress().getPort());
}
private void cacheNode(Node node, SocketChannel channel) {
String name = "node-" + node.getId();
boolean exists = AttributeKey.exists(name);
AttributeKey attributeKey;
if (exists) {
attributeKey = AttributeKey.valueOf(name);
} else {
attributeKey = AttributeKey.newInstance(name);
}
Attribute<Node> attribute = channel.attr(attributeKey);
attribute.set(node);
}
/**
* 继承SimpleChannelInboundHandler后,只需要重新channelRead0方法,msg会自动释放
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
String nodeId = IpUtil.getNodeId(channel.remoteAddress());
Attribute<Node> nodeAttribute = channel.attr(AttributeKey.valueOf("node-" + nodeId));
Node node = nodeAttribute.get();
ByteBuf buf = (ByteBuf) msg;
messageProcessor.processor(buf, node);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
SocketChannel channel = (SocketChannel) ctx.channel();
String nodeId = IpUtil.getNodeId(channel.remoteAddress());
Attribute<Node> nodeAttribute = channel.attr(AttributeKey.valueOf("node-" + nodeId));
Node node = nodeAttribute.get();
if (node != null && node.getDisconnectListener() != null) {
node.getDisconnectListener().action();
}
//channelUnregistered之前,channel就已经close了,可以调用channel.isOpen()查看状态
//channel.close();
}