io.netty.channel.ChannelHandlerContext#writeAndFlush ( )源码实例Demo

下面列出了io.netty.channel.ChannelHandlerContext#writeAndFlush ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: learning-code   文件: TimeServerHandler.java
@Override
public void channelActive(final ChannelHandlerContext ctx) {
    //channelActive 在建立连接,准备通信的时候被调用
    // 构建一个 4 字节, 32 位整数的消息
    final ByteBuf time = ctx.alloc().buffer(4);
    time.writeInt((int) (System.currentTimeMillis() / 1000L + 123456789L));
    // 因为 netty 里面所有的操作都是异步的,所有的操作不是立即执行的,这里返回一个 还没有发生 I/O 操作
    final ChannelFuture f = ctx.writeAndFlush(time);
    // 由于 Netty 里面所有逇操作都是异步的, 直接 close channel 会导致消息还没发送,就关闭连接了,我们需要一个监听者,在它写操作完成后,通知我们去关闭通信连接
    f.addListener(ChannelFutureListener.CLOSE);
    // 下面是自定义的监听器
    /*f.addListener((ChannelFutureListener) future -> {
        assert f == future;
        ctx.close();
    });*/
}
 
protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
   if (frame instanceof PingWebSocketFrame) {
      if (logger.isTraceEnabled()) {
         logger.trace("Ping with payload [{}]", ByteBufUtil.hexDump(frame.content()));
      }

      PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
      ctx.writeAndFlush(pong);
   }
   else if (frame instanceof PongWebSocketFrame) {
      PingPong pingPongSession = PingPong.get(ctx.channel());
      if (pingPongSession != null) {
         pingPongSession.recordPong();
      }
   }
   else {
      throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
            .getName()));
   }
}
 
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    Assert.assertSame(t, Thread.currentThread());

    // Don't let the write request go to the server-side channel - just swallow.
    boolean swallow = this == ctx.pipeline().first();

    ByteBuf m = (ByteBuf) msg;
    int count = m.readableBytes() / 4;
    for (int j = 0; j < count; j ++) {
        int actual = m.readInt();
        int expected = outCnt ++;
        Assert.assertEquals(expected, actual);
        if (!swallow) {
            ctx.write(actual);
        }
    }
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
    m.release();
}
 
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    Assert.assertSame(t, Thread.currentThread());

    // Don't let the write request go to the server-side channel - just swallow.
    boolean swallow = this == ctx.pipeline().first();

    ByteBuf m = (ByteBuf) msg;
    int count = m.readableBytes() / 4;
    for (int j = 0; j < count; j ++) {
        int actual = m.readInt();
        int expected = outCnt ++;
        Assert.assertEquals(expected, actual);
        if (!swallow) {
            ctx.write(actual);
        }
    }
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
    m.release();
}
 
源代码5 项目: blynk-server   文件: MobileLoginHandler.java
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginMessage message) {
    String[] messageParts = message.body.split(BODY_SEPARATOR_STRING);

    if (messageParts.length < 2) {
        log.error("Wrong income message format.");
        ctx.writeAndFlush(illegalCommand(message.id), ctx.voidPromise());
        return;
    }

    ///.trim() is not used for back compatibility
    String email = messageParts[0].toLowerCase();

    Version version = messageParts.length > 3
            ? new Version(messageParts[2], messageParts[3])
            : Version.UNKNOWN_VERSION;

    if (messageParts.length == 5) {
        if (AppNameUtil.FACEBOOK.equals(messageParts[4])) {
            facebookLogin(ctx, message.id, email, messageParts[1], version);
        } else {
            String appName = messageParts[4];
            blynkLogin(ctx, message.id, email, messageParts[1], version, appName);
        }
    } else {
        //todo this is for back compatibility
        blynkLogin(ctx, message.id, email, messageParts[1], version, AppNameUtil.BLYNK);
    }
}
 
源代码6 项目: grpc-nebula-java   文件: ProtocolNegotiators.java
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void channelActive(ChannelHandlerContext ctx) throws Exception {
  // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
  // which causes the upgrade headers to be added
  DefaultHttpRequest upgradeTrigger =
      new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
  ctx.writeAndFlush(upgradeTrigger);
  super.channelActive(ctx);
}
 
源代码7 项目: timer   文件: NettyChannelRead.java
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

  //  System.out.println(IPUtils.parseChannelRemoteAddr1(ctx.channel()));
    if(log.isInfoEnabled())log.info("client rpc call");
    RpcResponse rpcResponse= (RpcResponse) handler.handle(msg);
    ctx.writeAndFlush(rpcResponse);
}
 
源代码8 项目: DDMQ   文件: AbstractSendMessageProcessor.java
protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
    final RemotingCommand response) {
    if (!request.isOnewayRPC()) {
        try {
            ctx.writeAndFlush(response);
        } catch (Throwable e) {
            log.error("SendMessageProcessor process request over, but response failed", e);
            log.error(request.toString());
            log.error(response.toString());
        }
    }
}
 
源代码9 项目: timely   文件: WSQueryRequestHandler.java
@Override
protected void channelRead0(ChannelHandlerContext ctx, QueryRequest msg) throws Exception {
    try {
        String response = JsonUtil.getObjectMapper().writeValueAsString(dataStore.query(msg));
        ctx.writeAndFlush(new TextWebSocketFrame(response));
    } catch (TimelyException e) {
        if (e.getMessage().contains("No matching tags")) {
            LOG.trace(e.getMessage());
        } else {
            LOG.error(e.getMessage(), e);
        }
        ctx.writeAndFlush(new CloseWebSocketFrame(1008, e.getMessage()));
    }
}
 
源代码10 项目: blynk-server   文件: MobileUpdateDashLogic.java
public static void messageReceived(Holder holder, ChannelHandlerContext ctx,
                                   MobileStateHolder state, StringMessage message) {
    String dashString = message.body;

    if (dashString == null || dashString.isEmpty()) {
        throw new IllegalCommandException("Income create dash message is empty.");
    }

    if (dashString.length() > holder.limits.profileSizeLimitBytes) {
        throw new NotAllowedException("User dashboard is larger then limit.", message.id);
    }

    log.debug("Trying to parse user dash : {}", dashString);
    DashBoard updatedDash = JsonParser.parseDashboard(dashString, message.id);

    if (updatedDash == null) {
        throw new IllegalCommandException("Project parsing error.");
    }

    log.debug("Saving dashboard.");

    User user = state.user;

    DashBoard existingDash = user.profile.getDashByIdOrThrow(updatedDash.id);

    TimerWorker timerWorker = holder.timerWorker;
    timerWorker.deleteTimers(state.userKey, existingDash);
    updatedDash.addTimers(timerWorker, state.userKey);

    existingDash.updateFields(updatedDash);
    user.profile.cleanPinStorage(existingDash, false, true);

    user.lastModifiedTs = existingDash.updatedAt;

    ctx.writeAndFlush(ok(message.id), ctx.voidPromise());
}
 
源代码11 项目: spring-boot-netty   文件: OnMessageController.java
@NettyOnMessage(serverName = "server1", priority = 2)
void onMessage1(final ChannelHandlerContext ctx, final Channel channel, @NettyMessageBody final Long msg) {
    calls.add(ON_MESSAGE1);
    counter.arrive();

    ctx.writeAndFlush(msg + 1);
    channel.writeAndFlush(msg + 2);
}
 
源代码12 项目: bazel   文件: HttpCacheServerHandler.java
private static void sendError(
    ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
  ByteBuf data = Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8);
  FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, data);
  response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
  response.headers().set(HttpHeaderNames.CONTENT_LENGTH, data.readableBytes());
  ChannelFuture future = ctx.writeAndFlush(response);

  if (!HttpUtil.isKeepAlive(request)) {
    future.addListener(ChannelFutureListener.CLOSE);
  }
}
 
源代码13 项目: p2p   文件: PeerChannelHandler.java
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
    LOGGER.debug("Channel active {}", ctx.channel().remoteAddress());
    final Connection connection = new Connection(ctx);
    getSessionAttribute(ctx).set(connection);
    ctx.writeAndFlush(new Handshake(config.getPeerName(), peer.getLeaderName()));
}
 
源代码14 项目: incubator-hivemall   文件: MixServerHandler.java
private void mix(final ChannelHandlerContext ctx, final MixMessage requestMsg,
        final PartialResult partial, final SessionObject session) {
    final MixEventName event = requestMsg.getEvent();
    final Object feature = requestMsg.getFeature();
    final float weight = requestMsg.getWeight();
    final float covar = requestMsg.getCovariance();
    final short localClock = requestMsg.getClock();
    final int deltaUpdates = requestMsg.getDeltaUpdates();
    final boolean cancelRequest = requestMsg.isCancelRequest();

    if (deltaUpdates <= 0) {
        throw new IllegalArgumentException("Illegal deltaUpdates received: " + deltaUpdates);
    }

    MixMessage responseMsg = null;
    try {
        partial.lock();

        if (cancelRequest) {
            partial.subtract(weight, covar, deltaUpdates, scale);
        } else {
            int diffClock = partial.diffClock(localClock);
            partial.add(weight, covar, deltaUpdates, scale);

            if (diffClock >= syncThreshold) {// sync model if clock DIFF is above threshold
                float averagedWeight = partial.getWeight(scale);
                float meanCovar = partial.getCovariance(scale);
                short globalClock = partial.getClock();
                responseMsg = new MixMessage(event, feature, averagedWeight, meanCovar,
                    globalClock, 0 /* deltaUpdates */);
            }
        }

    } finally {
        partial.unlock();
    }

    if (responseMsg != null) {
        session.incrResponse();
        ctx.writeAndFlush(responseMsg);
    }
}
 
源代码15 项目: bistoury   文件: ResponseWriter.java
public void writeError(ChannelHandlerContext ctx, String content, RemotingHeader requestHeader) {
    Datagram datagram = RemotingBuilder.buildResponseDatagram(ResponseCode.RESP_TYPE_EXCEPTION.getCode(), requestHeader, new ErrorResponsePayloadHolder(content));
    ctx.writeAndFlush(datagram);
}
 
源代码16 项目: reactor-netty   文件: WebsocketClientOperations.java
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onInboundNext(ChannelHandlerContext ctx, Object msg) {
	if (msg instanceof FullHttpResponse) {
		started = true;
		channel().pipeline()
		         .remove(HttpObjectAggregator.class);
		FullHttpResponse response = (FullHttpResponse) msg;

		setNettyResponse(response);

		if (notRedirected(response)) {


			try {
				handshaker.finishHandshake(channel(), response);
				listener().onStateChange(this, HttpClientState.RESPONSE_RECEIVED);
			}
			catch (Exception e) {
				onInboundError(e);
			}
			finally {
				//Release unused content (101 status)
				response.content()
				        .release();
			}

		}
		else {
			response.content()
			        .release();
			listener().onUncaughtException(this, redirecting);
		}
		return;
	}
	if (!this.proxyPing && msg instanceof PingWebSocketFrame) {
		//"FutureReturnValueIgnored" this is deliberate
		ctx.writeAndFlush(new PongWebSocketFrame(((PingWebSocketFrame) msg).content()));
		ctx.read();
		return;
	}
	if (msg instanceof CloseWebSocketFrame &&
			((CloseWebSocketFrame)msg).isFinalFragment()) {
		if (log.isDebugEnabled()) {
			log.debug(format(channel(), "CloseWebSocketFrame detected. Closing Websocket"));
		}
		CloseWebSocketFrame close = (CloseWebSocketFrame) msg;
		sendCloseNow(new CloseWebSocketFrame(true,
				close.rsv(),
				close.content()));
		onInboundComplete();
	}
	else if (msg != LastHttpContent.EMPTY_LAST_CONTENT) {
		super.onInboundNext(ctx, msg);
	}
}
 
源代码17 项目: bistoury   文件: ResponseWriter.java
public void writeFullResponse(ChannelHandlerContext ctx, final byte[] bytes, RemotingHeader responseHeader) {
    Datagram datagram = RemotingBuilder.buildFullResponseDatagram(responseHeader, new ResponsePayloadHolder(bytes));
    ctx.writeAndFlush(datagram);
}
 
源代码18 项目: aesh-readline   文件: HttpRequestHandler.java
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
    if (wsUri.equalsIgnoreCase(request.getUri())) {
        ctx.fireChannelRead(request.retain());
    } else {
        if (HttpHeaders.is100ContinueExpected(request)) {
            send100Continue(ctx);
        }

        HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR);

        String path = request.getUri();
        if ("/".equals(path)) {
            path = "/index.html";
        }
        URL res = HttpTtyConnection.class.getResource("/org/aesh/terminal/http" + path);
        try {
            if (res != null) {
                DefaultFullHttpResponse fullResp = new DefaultFullHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
                InputStream in = res.openStream();
                byte[] tmp = new byte[256];
                for (int l = 0; l != -1; l = in.read(tmp)) {
                    fullResp.content().writeBytes(tmp, 0, l);
                }
                int li = path.lastIndexOf('.');
                if (li != -1 && li != path.length() - 1) {
                    String ext = path.substring(li + 1, path.length());
                    String contentType;
                    switch (ext) {
                        case "html":
                            contentType = "text/html";
                            break;
                        case "js":
                            contentType = "application/javascript";
                            break;
                        default:
                            contentType = null;
                            break;
                    }
                    if (contentType != null) {
                        fullResp.headers().set(HttpHeaders.Names.CONTENT_TYPE, contentType);
                    }
                }
                response = fullResp;
            } else {
                response.setStatus(HttpResponseStatus.NOT_FOUND);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ctx.write(response);
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }
}
 
public static void write(ChannelHandlerContext ctx, byte[] data, int msgId) {
    if (ctx.channel().isWritable()) {
        var outputMsg = makeResponse(data, msgId);
        ctx.writeAndFlush(outputMsg, ctx.voidPromise());
    }
}
 
源代码20 项目: x-pipe   文件: EchoServerHandler.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.writeAndFlush(msg);
}