下面列出了怎么用io.netty.util.AttributeKey的API类实例代码及写法,或者点击链接到github查看源代码。
/** send heartbeat message to all active nodes */
public void heartBeat() {
List<ChannelHandlerContext> channelHandlers = getConnections().activeChannelHandlers();
channelHandlers.forEach(
(ctx) -> {
Message message = Message.builder(MessageType.HEARTBEAT);
MessageSerializer serializer = new MessageSerializer();
ByteBuf byteBuf = ctx.alloc().buffer();
serializer.serialize(message, byteBuf);
ctx.writeAndFlush(byteBuf);
Node node = (Node) ctx.channel().attr(AttributeKey.valueOf("node")).get();
logger.trace(" send heartbeat message to {} ", node);
});
}
/**
* Allow to specify an initial attribute of the newly created {@link Http2StreamChannel}. If the {@code value} is
* {@code null}, the attribute of the specified {@code key} is removed.
*/
@SuppressWarnings("unchecked")
public <T> Http2StreamChannelBootstrap attr(AttributeKey<T> key, T value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
synchronized (attrs) {
attrs.remove(key);
}
} else {
synchronized (attrs) {
attrs.put(key, value);
}
}
return this;
}
@SuppressWarnings("unchecked")
private void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
ChannelHandler handler = this.handler;
if (handler != null) {
p.addLast(handler);
}
synchronized (options) {
setChannelOptions(channel, options, logger);
}
synchronized (attrs) {
for (Map.Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
private ExportEntry<AttributeKey<?>> parseAttrPattern(String keyPattern, @Nullable String exportKey) {
final String[] components = keyPattern.split(":");
if (components.length < 2 || components.length > 3) {
if (exportKey == null) {
throw new IllegalArgumentException(
"invalid attribute export: " + keyPattern +
" (expected: attrs.<alias>:<AttributeKey.name>[:<FQCN of Function<?, String>>])");
} else {
throw new IllegalArgumentException(
"invalid attribute export: " + keyPattern +
" (expected: <alias>=attr:<AttributeKey.name>[:<FQCN of Function<?, String>>])");
}
}
if (exportKey == null) {
exportKey = components[0];
}
final AttributeKey<Object> attributeKey = AttributeKey.valueOf(components[1]);
if (components.length == 3) {
return new ExportEntry<>(attributeKey, exportKey, newStringifier(keyPattern, components[2]));
} else {
return new ExportEntry<>(attributeKey, exportKey);
}
}
public void onConnect(ChannelHandlerContext ctx, boolean connectToServer)
throws SSLPeerUnverifiedException {
Node node = channelContext2Node(ctx);
int hashCode = System.identityHashCode(ctx);
// set nodeID to channel attribute map
ctx.channel().attr(AttributeKey.valueOf("node")).set(node);
ctx.channel().attr(AttributeKey.valueOf("NodeID")).set(node.getNodeID());
logger.info("add new connections: {}, ctx: {}", node, hashCode);
getConnections().addChannelHandler(node, ctx, connectToServer);
logger.info(
" node {} connect success, nodeID: {}, ctx: {}",
node,
node.getNodeID(),
System.identityHashCode(ctx));
if (threadPool == null) {
callBack.onConnect(ctx, node);
} else {
try {
threadPool.execute(
new Runnable() {
@Override
public void run() {
callBack.onConnect(ctx, node);
}
});
} catch (TaskRejectedException e) {
logger.warn(" TaskRejectedException: {} ", e);
callBack.onConnect(ctx, node);
}
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
SocketChannel channel = (SocketChannel) ctx.channel();
String nodeId = IpUtil.getNodeId(channel.remoteAddress());
Attribute<Node> nodeAttribute = channel.attr(AttributeKey.valueOf("node-" + nodeId));
Node node = nodeAttribute.get();
if (node != null && node.getDisconnectListener() != null) {
node.getDisconnectListener().action();
}
LoggerUtil.COMMON_LOG.info("Server Node is channelUnregistered:{}:{}", channel.remoteAddress().getHostString(), channel.remoteAddress().getPort());
}
@Before
public void beforeMethod() {
channelMock = mock(Channel.class);
ctxMock = mock(ChannelHandlerContext.class);
doReturn(channelMock).when(ctxMock).channel();
doReturn(mock(Attribute.class)).when(channelMock).attr(any(AttributeKey.class));
}
@Test
public void testServer() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), null)
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) throws Exception {
}
});
ChannelFuture f = b.bind(PORT).sync();
f.addListener(future -> {
if (future.isSuccess()) {
System.out.println(LocalDateTime.now() + ": 端口[" + PORT + "]绑定成功!");
} else {
System.out.println(LocalDateTime.now() + ": 端口[" + PORT + "]绑定失败!");
}
});
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
LockSupport.park();
}
/**
* Returns the original host and port of this HTTPS request, as sent by the client. Does not reflect any modifications
* by other filters.
* TODO: evaluate this (unused) method and its capture mechanism in HttpsOriginalHostCaptureFilter; remove if not useful.
*
* @return host and port of this HTTPS request
* @throws IllegalStateException if this is not an HTTPS request
*/
private String getHttpsOriginalRequestHostAndPort() throws IllegalStateException {
if (!isHttps()) {
throw new IllegalStateException("Request is not HTTPS. Cannot get original host and port on non-HTTPS request using this method.");
}
Attribute<String> hostnameAttr = ctx.channel().attr(AttributeKey.valueOf(ORIGINAL_HOST_ATTRIBUTE_NAME));
return hostnameAttr.get();
}
public HttpsOriginalHostCaptureFilter(HttpRequest originalRequest, ChannelHandlerContext ctx) {
super(originalRequest, ctx);
// if this is an HTTP CONNECT, set the isHttps attribute on the ChannelHandlerConect and capture the hostname from the original request.
// capturing the original host (and the remapped/modified host in clientToProxyRequest() below) guarantees that we will
// have the "true" host, rather than relying on the Host header in subsequent requests (which may be absent or spoofed by malicious clients).
if (ProxyUtils.isCONNECT(originalRequest)) {
Attribute<String> originalHostAttr = ctx.channel().attr(AttributeKey.valueOf(ORIGINAL_HOST_ATTRIBUTE_NAME));
String hostAndPort = originalRequest.uri();
originalHostAttr.set(hostAndPort);
Attribute<Boolean> isHttpsAttr = ctx.channel().attr(AttributeKey.valueOf(IS_HTTPS_ATTRIBUTE_NAME));
isHttpsAttr.set(true);
}
}
private void cacheNode(Node node, SocketChannel channel) {
String name = "node-" + node.getId();
boolean exists = AttributeKey.exists(name);
AttributeKey attributeKey;
if (exists) {
attributeKey = AttributeKey.valueOf(name);
} else {
attributeKey = AttributeKey.newInstance(name);
}
Attribute<Node> attribute = channel.attr(attributeKey);
attribute.set(node);
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(channel.attr(any(AttributeKey.class))).thenReturn(mock(Attribute.class));
decoder = new Mqtt311ConnectDecoder(connacker,
new Mqtt3ServerDisconnector(new MqttDisconnectUtil(eventLog)),
eventLog,
new TestConfigurationBootstrap().getFullConfigurationService(),
new HivemqId());
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(channel.attr(any(AttributeKey.class))).thenReturn(mock(Attribute.class));
decoder = new Mqtt311ConnectDecoder(new MqttConnacker(new MqttConnackSendUtil(eventLog)),
new Mqtt3ServerDisconnector(new MqttDisconnectUtil(eventLog)),
eventLog,
new TestConfigurationBootstrap().getFullConfigurationService(),
new HivemqId());
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
SocketChannel channel = (SocketChannel) ctx.channel();
ByteBuf buf = (ByteBuf) msg;
String remoteIP = channel.remoteAddress().getHostString();
NulsByteBuffer byteBuffer = null;
Node node = null;
try {
String nodeId = IpUtil.getNodeId(channel.remoteAddress());
Attribute<Node> nodeAttribute = channel.attr(AttributeKey.valueOf("node-" + nodeId));
node = nodeAttribute.get();
if (node != null) {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
byteBuffer = new NulsByteBuffer(bytes);
} else {
LoggerUtil.COMMON_LOG.error("-----------------Server channelRead node is null -----------------" + remoteIP + ":" + channel.remoteAddress().getPort());
ctx.channel().close();
}
} catch (Exception e) {
LoggerUtil.COMMON_LOG.error(e);
// throw e;
} finally {
buf.clear();
}
MessageManager.getInstance().receiveMessage(byteBuffer, node);
}
public HttpsOriginalHostCaptureFilter(HttpRequest originalRequest, ChannelHandlerContext ctx) {
super(originalRequest, ctx);
// if this is an HTTP CONNECT, set the isHttps attribute on the ChannelHandlerConect and capture the hostname from the original request.
// capturing the original host (and the remapped/modified host in clientToProxyRequest() below) guarantees that we will
// have the "true" host, rather than relying on the Host header in subsequent requests (which may be absent or spoofed by malicious clients).
if (ProxyUtils.isCONNECT(originalRequest)) {
Attribute<String> originalHostAttr = ctx.attr(AttributeKey.<String>valueOf(HttpsAwareFiltersAdapter.ORIGINAL_HOST_ATTRIBUTE_NAME));
String hostAndPort = originalRequest.getUri();
originalHostAttr.set(hostAndPort);
Attribute<Boolean> isHttpsAttr = ctx.attr(AttributeKey.<Boolean>valueOf(HttpsAwareFiltersAdapter.IS_HTTPS_ATTRIBUTE_NAME));
isHttpsAttr.set(true);
}
}
@Test
void testGetSetWithNull() {
final DefaultAttributeMap map = new DefaultAttributeMap(null);
final AttributeKey<Integer> key = AttributeKey.valueOf("key");
map.setAttr(key, 1);
assertThat(map.attr(key)).isEqualTo(1);
map.setAttr(key, null);
assertThat(map.attr(key)).isNull();
}
@Test
void testMutabilityAndImmutability() {
final AttributeKey<Object> someAttr =
AttributeKey.valueOf(RequestContextExportingAppenderTest.class, "SOME_ATTR");
final RequestContextExportingAppender a = new RequestContextExportingAppender();
// Ensure mutability before start.
a.addBuiltIn(BuiltInProperty.ELAPSED_NANOS);
a.addAttribute("some-attr", someAttr);
a.addRequestHeader(HttpHeaderNames.USER_AGENT);
a.addResponseHeader(HttpHeaderNames.SET_COOKIE);
final ListAppender<ILoggingEvent> la = new ListAppender<>();
a.addAppender(la);
a.start();
la.start();
// Ensure immutability after start.
assertThatThrownBy(() -> a.addBuiltIn(BuiltInProperty.REQ_PATH))
.isExactlyInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> a.addAttribute("my-attr", MY_ATTR))
.isExactlyInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> a.addRequestHeader(HttpHeaderNames.ACCEPT))
.isExactlyInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> a.addResponseHeader(HttpHeaderNames.DATE))
.isExactlyInstanceOf(IllegalStateException.class);
}
@Test
void deriveContext() {
final DefaultClientRequestContext originalCtx = newContext();
mutateAdditionalHeaders(originalCtx);
final AttributeKey<String> foo = AttributeKey.valueOf(DefaultClientRequestContextTest.class, "foo");
originalCtx.setAttr(foo, "foo");
final RequestId newId = RequestId.random();
final HttpRequest newRequest = HttpRequest.of(RequestHeaders.of(
HttpMethod.POST, "/foo",
HttpHeaderNames.SCHEME, "http",
HttpHeaderNames.AUTHORITY, "example.com:8080",
"foo", "bar"));
final ClientRequestContext derivedCtx = originalCtx.newDerivedContext(newId, newRequest, null);
assertThat(derivedCtx.endpoint()).isSameAs(originalCtx.endpoint());
assertThat(derivedCtx.sessionProtocol()).isSameAs(originalCtx.sessionProtocol());
assertThat(derivedCtx.method()).isSameAs(originalCtx.method());
assertThat(derivedCtx.options()).isSameAs(originalCtx.options());
assertThat(derivedCtx.id()).isSameAs(newId);
assertThat(derivedCtx.request()).isSameAs(newRequest);
assertThat(derivedCtx.path()).isEqualTo(originalCtx.path());
assertThat(derivedCtx.maxResponseLength()).isEqualTo(originalCtx.maxResponseLength());
assertThat(derivedCtx.responseTimeoutMillis()).isEqualTo(originalCtx.responseTimeoutMillis());
assertThat(derivedCtx.writeTimeoutMillis()).isEqualTo(originalCtx.writeTimeoutMillis());
assertThat(derivedCtx.additionalRequestHeaders()).isSameAs(originalCtx.additionalRequestHeaders());
// the attribute is derived as well
assertThat(derivedCtx.attr(foo)).isEqualTo("foo");
// log is different
assertThat(derivedCtx.log()).isNotSameAs(originalCtx.log());
final AttributeKey<String> bar = AttributeKey.valueOf(DefaultClientRequestContextTest.class, "bar");
originalCtx.setAttr(bar, "bar");
// the Attribute added to the original context after creation is not propagated to the derived context
assertThat(derivedCtx.attr(bar)).isEqualTo(null);
}
@Test
public void settingChannelOptsAndAttrs() {
AttributeKey<String> key = AttributeKey.newInstance("foo");
Channel childChannel = newOutboundStream();
childChannel.config().setAutoRead(false).setWriteSpinCount(1000);
childChannel.attr(key).set("bar");
assertFalse(childChannel.config().isAutoRead());
assertEquals(1000, childChannel.config().getWriteSpinCount());
assertEquals("bar", childChannel.attr(key).get());
}
@Nullable
@Override
public <V> V computeAttrIfAbsent(AttributeKey<V> key,
Function<? super AttributeKey<V>, ? extends V> mappingFunction) {
requireNonNull(key, "key");
requireNonNull(mappingFunction, "mappingFunction");
return attrs.computeAttrIfAbsent(key, mappingFunction);
}
/**
* 继承SimpleChannelInboundHandler后,只需要重新channelRead0方法,msg会自动释放
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
String nodeId = IpUtil.getNodeId(channel.remoteAddress());
Attribute<Node> nodeAttribute = channel.attr(AttributeKey.valueOf("node-" + nodeId));
Node node = nodeAttribute.get();
ByteBuf buf = (ByteBuf) msg;
messageProcessor.processor(buf, node);
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(super.toString());
buf.setLength(buf.length() - 1);
buf.append(", ");
EventLoopGroup childGroup = childGroup();
if (childGroup != null) {
buf.append("childGroup: ");
buf.append(StringUtil.simpleClassName(childGroup));
buf.append(", ");
}
Map<ChannelOption<?>, Object> childOptions = childOptions();
if (!childOptions.isEmpty()) {
buf.append("childOptions: ");
buf.append(childOptions);
buf.append(", ");
}
Map<AttributeKey<?>, Object> childAttrs = childAttrs();
if (!childAttrs.isEmpty()) {
buf.append("childAttrs: ");
buf.append(childAttrs);
buf.append(", ");
}
ChannelHandler childHandler = childHandler();
if (childHandler != null) {
buf.append("childHandler: ");
buf.append(childHandler);
buf.append(", ");
}
if (buf.charAt(buf.length() - 1) == '(') {
buf.append(')');
} else {
buf.setCharAt(buf.length() - 2, ')');
buf.setLength(buf.length() - 1);
}
return buf.toString();
}
/**
* Return the read-only default channel attributes
*
* @return the read-only default channel attributes
*/
public final Map<AttributeKey<?>, ?> attributes() {
if (attrs.isEmpty()) {
return Collections.emptyMap();
}
return Collections.unmodifiableMap(attrs);
}
@Override
public T setValue(T value) {
final Entry<AttributeKey<T>, T> childAttr = this.childAttr;
if (childAttr == null) {
this.childAttr = setAttr(rootAttr.getKey(), value, false);
return rootAttr.getValue();
}
final T old = childAttr.getValue();
childAttr.setValue(value);
return old;
}
/**
* Returns the original host and port of this HTTPS request, as sent by the client. Does not reflect any modifications
* by other filters.
* TODO: evaluate this (unused) method and its capture mechanism in HttpsOriginalHostCaptureFilter; remove if not useful.
*
* @return host and port of this HTTPS request
* @throws IllegalStateException if this is not an HTTPS request
*/
private String getHttpsOriginalRequestHostAndPort() throws IllegalStateException {
if (!isHttps()) {
throw new IllegalStateException("Request is not HTTPS. Cannot get original host and port on non-HTTPS request using this method.");
}
Attribute<String> hostnameAttr = ctx.attr(AttributeKey.<String>valueOf(ORIGINAL_HOST_ATTRIBUTE_NAME));
return hostnameAttr.get();
}
/**
* Returns the host and port of this HTTPS request, including any modifications by other filters.
*
* @return host and port of this HTTPS request
* @throws IllegalStateException if this is not an HTTPS request
*/
private String getHttpsRequestHostAndPort() throws IllegalStateException {
if (!isHttps()) {
throw new IllegalStateException("Request is not HTTPS. Cannot get host and port on non-HTTPS request using this method.");
}
Attribute<String> hostnameAttr = ctx.attr(AttributeKey.<String>valueOf(HOST_ATTRIBUTE_NAME));
return hostnameAttr.get();
}
/**
* Returns the original host and port of this HTTPS request, as sent by the client. Does not reflect any modifications
* by other filters.
* TODO: evaluate this (unused) method and its capture mechanism in HttpsOriginalHostCaptureFilter; remove if not useful.
*
* @return host and port of this HTTPS request
* @throws IllegalStateException if this is not an HTTPS request
*/
private String getHttpsOriginalRequestHostAndPort() throws IllegalStateException {
if (!isHttps()) {
throw new IllegalStateException("Request is not HTTPS. Cannot get original host and port on non-HTTPS request using this method.");
}
Attribute<String> hostnameAttr = ctx.attr(AttributeKey.<String>valueOf(ORIGINAL_HOST_ATTRIBUTE_NAME));
return hostnameAttr.get();
}
@Override
public <T> T getAttribute(final String key) {
if (key == null) {
return null;
}
Attribute<T> attribute = channel.attr(AttributeKey.valueOf(key));
return attribute.get();
}
/**
* Returns the host and port of this HTTPS request, including any modifications by other filters.
*
* @return host and port of this HTTPS request
* @throws IllegalStateException if this is not an HTTPS request
*/
private String getHttpsRequestHostAndPort() throws IllegalStateException {
if (!isHttps()) {
throw new IllegalStateException("Request is not HTTPS. Cannot get host and port on non-HTTPS request using this method.");
}
Attribute<String> hostnameAttr = ctx.attr(AttributeKey.<String>valueOf(HOST_ATTRIBUTE_NAME));
return hostnameAttr.get();
}
@Override
public Object removeAttribute(final String key) {
if (key == null) {
return null;
}
return channel.attr(AttributeKey.valueOf(key)).getAndSet(null);
}