下面列出了io.netty.channel.ChannelHandlerContext#writeAndFlush ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void channelActive(final ChannelHandlerContext ctx) {
//channelActive 在建立连接,准备通信的时候被调用
// 构建一个 4 字节, 32 位整数的消息
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 123456789L));
// 因为 netty 里面所有的操作都是异步的,所有的操作不是立即执行的,这里返回一个 还没有发生 I/O 操作
final ChannelFuture f = ctx.writeAndFlush(time);
// 由于 Netty 里面所有逇操作都是异步的, 直接 close channel 会导致消息还没发送,就关闭连接了,我们需要一个监听者,在它写操作完成后,通知我们去关闭通信连接
f.addListener(ChannelFutureListener.CLOSE);
// 下面是自定义的监听器
/*f.addListener((ChannelFutureListener) future -> {
assert f == future;
ctx.close();
});*/
}
protected void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof PingWebSocketFrame) {
if (logger.isTraceEnabled()) {
logger.trace("Ping with payload [{}]", ByteBufUtil.hexDump(frame.content()));
}
PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
ctx.writeAndFlush(pong);
}
else if (frame instanceof PongWebSocketFrame) {
PingPong pingPongSession = PingPong.get(ctx.channel());
if (pingPongSession != null) {
pingPongSession.recordPong();
}
}
else {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass()
.getName()));
}
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
// Don't let the write request go to the server-side channel - just swallow.
boolean swallow = this == ctx.pipeline().first();
ByteBuf m = (ByteBuf) msg;
int count = m.readableBytes() / 4;
for (int j = 0; j < count; j ++) {
int actual = m.readInt();
int expected = outCnt ++;
Assert.assertEquals(expected, actual);
if (!swallow) {
ctx.write(actual);
}
}
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
m.release();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Assert.assertSame(t, Thread.currentThread());
// Don't let the write request go to the server-side channel - just swallow.
boolean swallow = this == ctx.pipeline().first();
ByteBuf m = (ByteBuf) msg;
int count = m.readableBytes() / 4;
for (int j = 0; j < count; j ++) {
int actual = m.readInt();
int expected = outCnt ++;
Assert.assertEquals(expected, actual);
if (!swallow) {
ctx.write(actual);
}
}
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
m.release();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginMessage message) {
String[] messageParts = message.body.split(BODY_SEPARATOR_STRING);
if (messageParts.length < 2) {
log.error("Wrong income message format.");
ctx.writeAndFlush(illegalCommand(message.id), ctx.voidPromise());
return;
}
///.trim() is not used for back compatibility
String email = messageParts[0].toLowerCase();
Version version = messageParts.length > 3
? new Version(messageParts[2], messageParts[3])
: Version.UNKNOWN_VERSION;
if (messageParts.length == 5) {
if (AppNameUtil.FACEBOOK.equals(messageParts[4])) {
facebookLogin(ctx, message.id, email, messageParts[1], version);
} else {
String appName = messageParts[4];
blynkLogin(ctx, message.id, email, messageParts[1], version, appName);
}
} else {
//todo this is for back compatibility
blynkLogin(ctx, message.id, email, messageParts[1], version, AppNameUtil.BLYNK);
}
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
// which causes the upgrade headers to be added
DefaultHttpRequest upgradeTrigger =
new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
ctx.writeAndFlush(upgradeTrigger);
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println(IPUtils.parseChannelRemoteAddr1(ctx.channel()));
if(log.isInfoEnabled())log.info("client rpc call");
RpcResponse rpcResponse= (RpcResponse) handler.handle(msg);
ctx.writeAndFlush(rpcResponse);
}
protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
final RemotingCommand response) {
if (!request.isOnewayRPC()) {
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("SendMessageProcessor process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, QueryRequest msg) throws Exception {
try {
String response = JsonUtil.getObjectMapper().writeValueAsString(dataStore.query(msg));
ctx.writeAndFlush(new TextWebSocketFrame(response));
} catch (TimelyException e) {
if (e.getMessage().contains("No matching tags")) {
LOG.trace(e.getMessage());
} else {
LOG.error(e.getMessage(), e);
}
ctx.writeAndFlush(new CloseWebSocketFrame(1008, e.getMessage()));
}
}
public static void messageReceived(Holder holder, ChannelHandlerContext ctx,
MobileStateHolder state, StringMessage message) {
String dashString = message.body;
if (dashString == null || dashString.isEmpty()) {
throw new IllegalCommandException("Income create dash message is empty.");
}
if (dashString.length() > holder.limits.profileSizeLimitBytes) {
throw new NotAllowedException("User dashboard is larger then limit.", message.id);
}
log.debug("Trying to parse user dash : {}", dashString);
DashBoard updatedDash = JsonParser.parseDashboard(dashString, message.id);
if (updatedDash == null) {
throw new IllegalCommandException("Project parsing error.");
}
log.debug("Saving dashboard.");
User user = state.user;
DashBoard existingDash = user.profile.getDashByIdOrThrow(updatedDash.id);
TimerWorker timerWorker = holder.timerWorker;
timerWorker.deleteTimers(state.userKey, existingDash);
updatedDash.addTimers(timerWorker, state.userKey);
existingDash.updateFields(updatedDash);
user.profile.cleanPinStorage(existingDash, false, true);
user.lastModifiedTs = existingDash.updatedAt;
ctx.writeAndFlush(ok(message.id), ctx.voidPromise());
}
@NettyOnMessage(serverName = "server1", priority = 2)
void onMessage1(final ChannelHandlerContext ctx, final Channel channel, @NettyMessageBody final Long msg) {
calls.add(ON_MESSAGE1);
counter.arrive();
ctx.writeAndFlush(msg + 1);
channel.writeAndFlush(msg + 2);
}
private static void sendError(
ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
ByteBuf data = Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, data);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, data.readableBytes());
ChannelFuture future = ctx.writeAndFlush(response);
if (!HttpUtil.isKeepAlive(request)) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
LOGGER.debug("Channel active {}", ctx.channel().remoteAddress());
final Connection connection = new Connection(ctx);
getSessionAttribute(ctx).set(connection);
ctx.writeAndFlush(new Handshake(config.getPeerName(), peer.getLeaderName()));
}
private void mix(final ChannelHandlerContext ctx, final MixMessage requestMsg,
final PartialResult partial, final SessionObject session) {
final MixEventName event = requestMsg.getEvent();
final Object feature = requestMsg.getFeature();
final float weight = requestMsg.getWeight();
final float covar = requestMsg.getCovariance();
final short localClock = requestMsg.getClock();
final int deltaUpdates = requestMsg.getDeltaUpdates();
final boolean cancelRequest = requestMsg.isCancelRequest();
if (deltaUpdates <= 0) {
throw new IllegalArgumentException("Illegal deltaUpdates received: " + deltaUpdates);
}
MixMessage responseMsg = null;
try {
partial.lock();
if (cancelRequest) {
partial.subtract(weight, covar, deltaUpdates, scale);
} else {
int diffClock = partial.diffClock(localClock);
partial.add(weight, covar, deltaUpdates, scale);
if (diffClock >= syncThreshold) {// sync model if clock DIFF is above threshold
float averagedWeight = partial.getWeight(scale);
float meanCovar = partial.getCovariance(scale);
short globalClock = partial.getClock();
responseMsg = new MixMessage(event, feature, averagedWeight, meanCovar,
globalClock, 0 /* deltaUpdates */);
}
}
} finally {
partial.unlock();
}
if (responseMsg != null) {
session.incrResponse();
ctx.writeAndFlush(responseMsg);
}
}
public void writeError(ChannelHandlerContext ctx, String content, RemotingHeader requestHeader) {
Datagram datagram = RemotingBuilder.buildResponseDatagram(ResponseCode.RESP_TYPE_EXCEPTION.getCode(), requestHeader, new ErrorResponsePayloadHolder(content));
ctx.writeAndFlush(datagram);
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse) {
started = true;
channel().pipeline()
.remove(HttpObjectAggregator.class);
FullHttpResponse response = (FullHttpResponse) msg;
setNettyResponse(response);
if (notRedirected(response)) {
try {
handshaker.finishHandshake(channel(), response);
listener().onStateChange(this, HttpClientState.RESPONSE_RECEIVED);
}
catch (Exception e) {
onInboundError(e);
}
finally {
//Release unused content (101 status)
response.content()
.release();
}
}
else {
response.content()
.release();
listener().onUncaughtException(this, redirecting);
}
return;
}
if (!this.proxyPing && msg instanceof PingWebSocketFrame) {
//"FutureReturnValueIgnored" this is deliberate
ctx.writeAndFlush(new PongWebSocketFrame(((PingWebSocketFrame) msg).content()));
ctx.read();
return;
}
if (msg instanceof CloseWebSocketFrame &&
((CloseWebSocketFrame)msg).isFinalFragment()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "CloseWebSocketFrame detected. Closing Websocket"));
}
CloseWebSocketFrame close = (CloseWebSocketFrame) msg;
sendCloseNow(new CloseWebSocketFrame(true,
close.rsv(),
close.content()));
onInboundComplete();
}
else if (msg != LastHttpContent.EMPTY_LAST_CONTENT) {
super.onInboundNext(ctx, msg);
}
}
public void writeFullResponse(ChannelHandlerContext ctx, final byte[] bytes, RemotingHeader responseHeader) {
Datagram datagram = RemotingBuilder.buildFullResponseDatagram(responseHeader, new ResponsePayloadHolder(bytes));
ctx.writeAndFlush(datagram);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain());
} else {
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx);
}
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
String path = request.getUri();
if ("/".equals(path)) {
path = "/index.html";
}
URL res = HttpTtyConnection.class.getResource("/org/aesh/terminal/http" + path);
try {
if (res != null) {
DefaultFullHttpResponse fullResp = new DefaultFullHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
InputStream in = res.openStream();
byte[] tmp = new byte[256];
for (int l = 0; l != -1; l = in.read(tmp)) {
fullResp.content().writeBytes(tmp, 0, l);
}
int li = path.lastIndexOf('.');
if (li != -1 && li != path.length() - 1) {
String ext = path.substring(li + 1, path.length());
String contentType;
switch (ext) {
case "html":
contentType = "text/html";
break;
case "js":
contentType = "application/javascript";
break;
default:
contentType = null;
break;
}
if (contentType != null) {
fullResp.headers().set(HttpHeaders.Names.CONTENT_TYPE, contentType);
}
}
response = fullResp;
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ctx.write(response);
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
public static void write(ChannelHandlerContext ctx, byte[] data, int msgId) {
if (ctx.channel().isWritable()) {
var outputMsg = makeResponse(data, msgId);
ctx.writeAndFlush(outputMsg, ctx.voidPromise());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.writeAndFlush(msg);
}