下面列出了java.nio.channels.NotYetConnectedException#io.netty.channel.ChannelHandlerContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void test_message_expired_qos_0() throws Exception {
final PUBLISH publish = TestMessageUtil.createMqtt5Publish("topic", QoS.AT_MOST_ONCE);
publish.setMessageExpiryInterval(1);
Thread.sleep(2000);
final CountDownLatch droppedEventFiredLatch = new CountDownLatch(1);
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, @NotNull final Object evt)
throws Exception {
if (evt instanceof PublishDroppedEvent) {
droppedEventFiredLatch.countDown();
}
}
});
channel.writeOutbound(ctx, publish, channel.newPromise());
assertTrue(droppedEventFiredLatch.await(5, TimeUnit.SECONDS));
assertEquals(0, publish.getMessageExpiryInterval());
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
}
}
public void handle(ChannelHandlerContext ctx, HttpRequest req)
throws IOException, URISyntaxException {
String op = params.op();
HttpMethod method = req.getMethod();
if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)
&& method == PUT) {
onCreate(ctx);
} else if (PostOpParam.Op.APPEND.name().equalsIgnoreCase(op)
&& method == POST) {
onAppend(ctx);
} else if (GetOpParam.Op.OPEN.name().equalsIgnoreCase(op)
&& method == GET) {
onOpen(ctx);
} else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op)
&& method == GET) {
onGetFileChecksum(ctx);
} else {
throw new IllegalArgumentException("Invalid operation " + op);
}
}
@Override
public void handlerRemoved(final ChannelHandlerContext ctx) throws Exception {
if (viewRowObservable != null) {
viewRowObservable.onCompleted();
viewRowObservable = null;
}
if (viewInfoObservable != null) {
viewInfoObservable.onCompleted();
viewInfoObservable = null;
}
if (viewErrorObservable != null) {
viewErrorObservable.onCompleted();
viewErrorObservable = null;
}
cleanupViewStates();
if (responseContent != null && responseContent.refCnt() > 0) {
responseContent.release();
}
super.handlerRemoved(ctx);
}
/**
* {@inheritDoc}
* <p>
* Any queued {@link FlowControlled} objects will be sent.
*/
@Override
public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
this.ctx = checkNotNull(ctx, "ctx");
// Writing the pending bytes will not check writability change and instead a writability change notification
// to be provided by an explicit call.
channelWritabilityChanged();
// Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
// closed and the queue cleanup will occur when the stream state transitions occur.
// If any frames have been queued up, we should send them now that we have a channel context.
if (isChannelWritable()) {
writePendingBytes();
}
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
// This is the ultimate channel inactive handler, not forwarding
if (this.promise.isCancelled()) {
return;
}
if (!this.promise.isInitialConnectFinished()) {
LOG.debug("Connection to {} was dropped during negotiation, reattempting", this.promise.address);
this.promise.reconnect();
return;
}
LOG.debug("Reconnecting after connection to {} was dropped", this.promise.address);
this.promise.connect();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
while (true) {
if (byteBuf.readableBytes() < FrameSetting.FRAME_HEAD_LENGTH) {
return;
}
if (byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_1
|| byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_2) {
return;
}
int groupId = byteBuf.readByte() & 0xFF;
int msgId = byteBuf.readByte() & 0xFF;
int deviceId = byteBuf.readByte() & 0xFF;
int backupMsg = byteBuf.readByte() & 0xFF;
int dataLength = byteBuf.readShort() & 0xFFFF;
FrameMajorHeader headMsg = new FrameMajorHeader(msgId, groupId, deviceId, dataLength, backupMsg);
ByteBuf subBuf = ctx.alloc().buffer(dataLength);
byteBuf.readBytes(subBuf, dataLength);
ctx.fireChannelRead(new FrameMajor(headMsg, subBuf));
}
}
/**
* 获取最早的消息的存储的时间
* @param ctx ctx
* @param request ;
* @return ;
* @throws RemotingCommandException ;
*/
private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
final GetEarliestMsgStoretimeRequestHeader requestHeader =
(GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
long timestamp =
this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setTimestamp(timestamp);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (logger.isLoggable(Level.FINE)) {
Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg;
logger.log(
Level.FINE,
"Unexpected channelRead()->{0} reached end of pipeline {1}",
new Object[] {loggedMsg, ctx.pipeline().names()});
}
exceptionCaught(
ctx,
Status.INTERNAL.withDescription(
"channelRead() missed by ProtocolNegotiator handler: " + msg)
.asRuntimeException());
} finally {
ReferenceCountUtil.safeRelease(msg);
}
}
private WebSocketFrame decodeBinaryFrame(ChannelHandlerContext ctx, byte type, ByteBuf buffer) {
long frameSize = 0;
int lengthFieldSize = 0;
byte b;
do {
b = buffer.readByte();
frameSize <<= 7;
frameSize |= b & 0x7f;
if (frameSize > maxFrameSize) {
throw new TooLongFrameException();
}
lengthFieldSize++;
if (lengthFieldSize > 8) {
// Perhaps a malicious peer?
throw new TooLongFrameException();
}
} while ((b & 0x80) == 0x80);
if (type == (byte) 0xFF && frameSize == 0) {
receivedClosingHandshake = true;
return new CloseWebSocketFrame();
}
ByteBuf payload = ctx.alloc().buffer((int) frameSize);
buffer.readBytes(payload);
return new BinaryWebSocketFrame(payload);
}
/**
* 处理WebSocket请求
*
* @param ctx
* @param frame
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
ctx.close();
return;
}
// 没有使用WebSocketServerProtocolHandler,所以不会接收到PingWebSocketFrame。
// if (frame instanceof PingWebSocketFrame) {
// ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
// return;
// }
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(
String.format("%s frame types not supported", frame.getClass().getName()));
}
String request = ((TextWebSocketFrame) frame).text();
logger.debug("收到客户端发送的数据:" + request);
// 回复心跳
if (request.length() == 0) {
ctx.writeAndFlush(new TextWebSocketFrame(""));
return;
}
this.handleMessage(ctx.channel(), request);
}
private void newWebSocketMessage(ChannelHandlerContext ctx, WebSocketFrame frame, int type, boolean request) {
BodyType bodyType = type == WebSocketMessage.TYPE_TEXT ? BodyType.text : BodyType.binary;
Body body = new Body(bodyType, Optional.empty(), "");
ByteBuf content = frame.content();
body.append(content.nioBuffer());
WebSocketMessage message = new WebSocketMessage(host, url, type, request, body);
messageListener.onMessage(message);
if (frame.isFinalFragment()) {
body.finish();
} else {
if (request) {
responseMessage = message;
} else {
responseMessage = message;
}
}
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
LOG.debug("[{}] Sending NOOP to keep the connection alive", connectionId);
if (expectingNoopResponse) {
LOG.warn("[{}] Did not receive a response to our last NOOP, will not send another", connectionId);
return;
}
Optional<String> debugString = responseHandler.getPendingResponseDebugString();
if (debugString.isPresent()) {
LOG.warn("[{}] Waiting for a response to [{}], will not send a NOOP to keep the connection alive", connectionId, debugString.get());
} else {
LOG.debug("[{}] Sending NOOP", connectionId);
ctx.channel().writeAndFlush(new DefaultSmtpRequest(SmtpCommand.NOOP));
expectingNoopResponse = true;
}
}
@Override
public void processRequest(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
String clientId = NettyUtil.getClientId(ctx.channel());
int messageId = MessageUtil.getMessageId(mqttMessage);
if(ConnectManager.getInstance().containClient(clientId)){
Message message = flowMessageStore.releaseRecMsg(clientId,messageId);
if(Objects.nonNull(message)){
super.processMessage(message);
}else{
log.warn("[PubRelMessage] -> the message is not exist,clientId={},messageId={}.",clientId,messageId);
}
MqttMessage pubComMessage = MessageUtil.getPubComMessage(messageId);
ctx.writeAndFlush(pubComMessage);
}else{
log.warn("[PubRelMessage] -> the client:{} disconnect to this server.",clientId);
RemotingHelper.closeChannel(ctx.channel());
}
}
public void channelActive(ChannelHandlerContext ctx) {
DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ");
String currTimeStr = format.format(new Date());
String data = String.format("Hello, current time is %s", currTimeStr);
ctx.writeAndFlush( Unpooled.copiedBuffer(data, _utf8) );
_logger.info( String.format("Client send data:%s", data) );
}
@Override
public void hungupCall(IMHeader header, MessageLite body, ChannelHandlerContext ctx) {
IMAVCallHungUpReq msg = (IMAVCallHungUpReq)body;
long fromId = msg.getFromId();
long toId = msg.getToId();
// messageServerCluster.send(header, body);
// TODO 挂断 处理
messageServerCluster.webrtcHungupReq(fromId, toId, super.getHandleId(ctx));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
currentConnectionsGauge.set(currentConnections.incrementAndGet());
totalConnections.increment();
ctx.channel().attr(ATTR_CURRENT_CONNS).set(currentConnections);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// user mapping client disconnect initiative
Channel channel = ctx.channel();
log.info("user mapping client disconnect initiative");
closeHttpProxyEndPoint(ctx);
if (channel.isActive()) {
channel.close();
}
}
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
final GetConsumerListByGroupRequestHeader requestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager()
.getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (consumerGroupInfo != null) {
List<String> clientIds = consumerGroupInfo.getAllClientId();
if (!clientIds.isEmpty()) {
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
else {
log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
}
else {
log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
return response;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
log.debug("Received TL1 message {}", s);
// Search for "COMPLD" or "DENY" to identify a TL1 response,
// then return the remainder of the string.
String[] words = s.split("\\s");
for (int i = 0; i < words.length; i++) {
String w = words[i];
if (w.startsWith(COMPLD) || w.startsWith(DENY)) {
// ctag is just in front of it
int ctag = Integer.parseInt(words[i - 1]);
// We return everything that follows to the caller (this will lose line breaks and such)
String result = Arrays.stream(words).skip(i + 1L).collect(Collectors.joining());
// Set future when command is executed, good or bad
Map<Integer, CompletableFuture<String>> msg = msgMap.get(ctx.channel());
if (msg != null) {
CompletableFuture<String> f = msg.remove(ctag);
if (f != null) {
f.complete(result);
}
}
return;
}
}
}
private void processIncomingHeader(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
headerBuf.writeBytes(buffer, Math.min(HEADER_SIZE, buffer.readableBytes()));
if (headerBuf.writableBytes() == 0) {
headerRead = true;
AmqpHeader header = new AmqpHeader(headerBuf);
if (isHeaderValid(header, authenticator != null)) {
LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
} else {
LOG.warn("Connection attempt from unsupported AMQP client. {}", header);
AmqpHeader reply = getMinimallySupportedHeader();
ctx.write(reply.getBuffer());
throw new IOException("Connection from client using unsupported AMQP attempted");
}
switch (header.getProtocolId()) {
case 0: // non-SASL
authenticator = null;
break;
case 3: // SASL
authenticator = new SaslAuthenticator(this);
break;
default:
}
pourIntoProton(headerBuf);
}
pourIntoProton(buffer);
}
@Override
public @NotNull CompletableFuture<ResponseInfo<String>> execute(
@NotNull RequestInfo<Void> request,
@NotNull Executor longRunningTaskExecutor,
@NotNull ChannelHandlerContext ctx
) {
String value = ConfigurationManager.getConfigInstance().getString("archaiusServer.foo");
return CompletableFuture.completedFuture(
ResponseInfo.newBuilder(value).build()
);
}
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
final GetConsumerListByGroupRequestHeader requestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(
requestHeader.getConsumerGroup());
if (consumerGroupInfo != null) {
List<String> clientIds = consumerGroupInfo.getAllClientId();
if (!clientIds.isEmpty()) {
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
} else {
log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
} else {
log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
return response;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().addLast(http2StreamFrameToHttpObjectCodec);
ctx.pipeline().addLast(new HttpObjectAggregator(http2ClientConfig.http2MaxContentLength));
ctx.pipeline().addLast(ambryNetworkRequestHandler);
ctx.pipeline().addLast(ambrySendToHttp2Adaptor);
}
/**
* 数据读取完毕
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将数据写入到缓存,并刷新
// 一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, D瓜哥~, pong -> O(∩_∩)O哈哈~", UTF_8));
}
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request
.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
}
else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
}
else {
log.warn("checkTransactionState, pick producer group failed");
}
}
else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
private void respond(ChannelHandlerContext ctx, ServiceRequestContext reqCtx,
ResponseHeadersBuilder resHeaders, @Nullable HttpData resContent,
@Nullable Throwable cause) {
if (!handledLastRequest) {
respond(reqCtx, true, resHeaders, resContent, cause).addListener(CLOSE_ON_FAILURE);
} else {
respond(reqCtx, false, resHeaders, resContent, cause).addListener(CLOSE);
}
if (!isReading) {
ctx.flush();
}
}
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt)
throws Exception {
if (evt instanceof ZMTPHandshakeSuccess) {
send(ctx);
}
}
@Override
public void header(final ChannelHandlerContext ctx, final long length, final boolean more,
final List<Object> out) {
if (length > Integer.MAX_VALUE) {
throw new IllegalArgumentException("length");
}
frameLength = (int) length;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
this.ctx = ctx;
// Initialize the message.
content = ctx.alloc().directBuffer(DiscardClient.SIZE).writeZero(DiscardClient.SIZE);
// Send the initial messages.
generateTraffic();
}