下面列出了io.netty.channel.ChannelHandlerContext#write ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try {
Packet packet = (Packet) msg;
Connection connection = ctx.channel().attr(AttributeKey.<Connection>valueOf("connection")).get();
if (packet.getType().getDirection() == PacketDirection.INBOUND) {
throw new DecoderException("Outbound packet has invalid direction: " + packet.getType());
}
if (CleanstoneServer.publishEvent(
new OutboundPacketEvent<>(packet, connection, networking)).isCancelled()) {
return;
}
log.trace("Sending " + packet.getType() + " (" + networking.getProtocol().translateOutboundPacketID(packet.getType(), connection) + ") packet to " + connection.getAddress().getHostAddress());
ctx.write(packet, promise);
} catch (Exception e) {
log.error("Error occurred while handling outbound packet", e);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
if (is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = isKeepAlive(req);
ByteBuf content = Unpooled.copiedBuffer("Hello World " + new Date(), CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
String sendMessage = "Hello netty";
ByteBuf messageBuffer = Unpooled.buffer();
messageBuffer.writeBytes(sendMessage.getBytes());
StringBuilder builder = new StringBuilder();
builder.append("전송한 문자열 [");
builder.append(sendMessage);
builder.append("]");
System.out.println(builder.toString());
ctx.write(messageBuffer);
ctx.flush();
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
return;
}
}
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof HttpRequest)) return;
HttpRequest req = (HttpRequest) msg;
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpUtil.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK,
Unpooled.wrappedBuffer(HELLO_WORLD));
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
}
}
private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
// write operations need synchronization
synchronized (perChannel) {
ToSend newToSend = perChannel.messagesQueue.pollFirst();
for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
if (newToSend.relativeTimeAction <= now) {
long size = newToSend.size;
trafficCounter.bytesRealWriteFlowControl(size);
perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
perChannel.queueSize -= size;
queuesSize.addAndGet(-size);
ctx.write(newToSend.toSend, newToSend.promise);
perChannel.lastWriteTimestamp = now;
} else {
perChannel.messagesQueue.addFirst(newToSend);
break;
}
}
if (perChannel.messagesQueue.isEmpty()) {
releaseWriteSuspended(ctx);
}
}
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
//HTTP客户端程序有一个实体的主体部分要发送给服务器,但希望在发送之前查看下服务器是否会
//接受这个实体,所以在发送实体之前先发送了一个携带100 Continue的Expect请求首部的请求。
//服务器在收到这样的请求后,应该用 100 Continue或一条错误码来进行响应。
//在使用curl做POST的时候, 当要POST的数据大于1024字节的时候, curl并不会直接就发起POST请求, 而是会分为俩步,
// 1. 发送一个请求, 包含一个Expect:100-continue, 询问Server使用愿意接受数据
// 2. 接收到Server返回的100-continue应答以后, 才把数据POST给Server
String uri = req.uri();
if (uri.equals(FAVICON_ICO)) {
return;
}
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
}
boolean keepAlive = HttpUtil.isKeepAlive(req);
System.out.println("【method】" + req.method() + "【uri】" + req.uri());
ByteBuf content = ctx.alloc().buffer();
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(response);
}
}
private static ChannelFuture sendResponse(final ChannelHandlerContext ctx, final String response) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes(response.getBytes(WireProtocol.CHARSET));
ctx.write(buf);
return ctx.writeAndFlush(Delimiters.lineDelimiter()[0]);
}
final void writeBuffer(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = toByteBufNoThrow((Buffer) msg);
if (byteBuf == null) {
promise.setFailure(new IllegalArgumentException("unsupported Buffer type:" + msg));
ctx.close();
} else {
ctx.write(new DefaultHttp2DataFrame(byteBuf.retain(), false), promise);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
String in_s = in.toString(CharsetUtil.UTF_8);
String uc = in_s.toUpperCase();
if( logger.isInfoEnabled() ) {
logger.info("[READ] read " + in_s + ", writing " + uc);
}
in.setBytes(0, uc.getBytes(CharsetUtil.UTF_8));
ctx.write(in);
}
protected void sendResponse(ChannelHandlerContext ctx) {
if (!busy.compareAndSet(false, true)) {
return;
}
Channel channel = ctx.channel();
try {
boolean flush = false;
while (channel.isWritable()) {
Response res = responseQueue.poll();
if (res == null) {
break;
}
ctx.write(res, channel.voidPromise());
flush = true;
}
if (flush) {
ctx.flush();
}
} finally {
busy.set(false);
}
// in case there are new response added
if (channel.isWritable() && !responseQueue.isEmpty()) {
sendResponse(ctx);
}
}
/**
* 输出文件响应
*
* @param responseEntity
* @return
* @throws IOException
*/
private ChannelFuture writeFileResponse(ResponseEntity<?> responseEntity) throws IOException {
RandomAccessFile raf = (RandomAccessFile) responseEntity.getBody();
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
HttpUtil.setContentLength(response, fileLength);
if(responseEntity.getMimetype() != null && !responseEntity.getMimetype().trim().equals("")) {
response.headers().set(HttpHeaderNames.CONTENT_TYPE, responseEntity.getMimetype());
}
if(responseEntity.getFileName() != null && !responseEntity.getFileName().trim().equals("")) {
String fileName = new String(responseEntity.getFileName().getBytes("gb2312"), "ISO8859-1");
response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=" + fileName);
}
if (HttpUtil.isKeepAlive(HttpContextHolder.getRequest())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ChannelHandlerContext ctx = HttpContextHolder.getResponse().getChannelHandlerContext();
ctx.write(response);
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture = null;
if (ctx.pipeline().get(SslHandler.class) == null) {
sendFileFuture =
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
// Write the end marker.
lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
ctx.newProgressivePromise());
// HttpChunkedInput will write the end marker (LastHttpContent) for us.
lastContentFuture = sendFileFuture;
}
return lastContentFuture;
}
private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext ctx) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, currentObj.decoderResult().isSuccess()? OK : BAD_REQUEST,
Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Encode the cookie.
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
for (Cookie cookie: cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(cookie));
}
}
} else {
// Browser sent no cookie. Add some.
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key1", "value1"));
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
}
// Write the response.
ctx.write(response);
return keepAlive;
}
@Override
public void write(ChannelHandlerContext context, Object message, ChannelPromise promise)
{
if (message instanceof ThriftFrame) {
// always re-enable auto read
context.channel().config().setAutoRead(true);
}
context.write(message, promise);
}
private void flushPendings(ChannelHandlerContext ctx) {
synchronized (pendings) {
Iterator<Object> iterator = pendings.iterator();
while (iterator.hasNext()) {
ctx.write(iterator.next());
iterator.remove();
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.write(Unpooled.wrappedBuffer(CONNECTION_HEADER));
ctx.writeAndFlush(frame);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
QueryStringDecoder queryDecoder = new QueryStringDecoder(req.uri());
String requestPath = queryDecoder.path();
URL resUrl = resources.get(requestPath);
if (resUrl != null) {
if (log.isDebugEnabled())
log.debug("Received HTTP resource request: {} {} from channel: {}", req.method(), requestPath, ctx.channel());
URLConnection fileUrl = resUrl.openConnection();
long lastModified = fileUrl.getLastModified();
// check if file has been modified since last request
if (isNotModified(req, lastModified)) {
sendNotModified(ctx);
return;
}
// create resource input-stream and check existence
final InputStream is = fileUrl.getInputStream();
if (is == null) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
// create ok response
HttpResponse res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
// set Content-Length header
HttpUtil.setContentLength(res, fileUrl.getContentLengthLong());
// set Content-Type header
setContentTypeHeader(res, fileUrl);
// set Date, Expires, Cache-Control and Last-Modified headers
setDateAndCacheHeaders(res, lastModified);
// write initial response header
ctx.write(res);
// write the content stream
ctx.pipeline().addBefore(ctx.name(), "chunked-writer-handler", new ChunkedWriteHandler());
ChannelFuture writeFuture = ctx.writeAndFlush(new ChunkedStream(is, fileUrl.getContentLength()));
// add operation complete listener so we can close the channel and the input stream
writeFuture.addListener(ChannelFutureListener.CLOSE);
ReferenceCountUtil.release(msg);
return;
}
}
super.channelRead(ctx, msg);
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
LOG.trace("Channel read: {}", msg);
ctx.write(msg.copy());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
ctx.write("ok");
}
private void wrapNonAppData(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
ByteBuf out = null;
ByteBufAllocator alloc = ctx.alloc();
try {
for (;;) {
if (out == null) {
out = allocateOutNetBuf(ctx, 0);
}
SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
if (result.bytesProduced() > 0) {
ctx.write(out);
if (inUnwrap) {
needsFlush = true;
}
out = null;
}
switch (result.getHandshakeStatus()) {
case FINISHED:
setHandshakeSuccess();
break;
case NEED_TASK:
runDelegatedTasks();
break;
case NEED_UNWRAP:
if (!inUnwrap) {
unwrapNonAppData(ctx);
}
break;
case NEED_WRAP:
break;
case NOT_HANDSHAKING:
setHandshakeSuccessIfStillHandshaking();
// Workaround for TLS False Start problem reported at:
// https://github.com/netty/netty/issues/1108#issuecomment-14266970
if (!inUnwrap) {
unwrapNonAppData(ctx);
}
break;
default:
throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
}
if (result.bytesProduced() == 0) {
break;
}
}
} catch (SSLException e) {
setHandshakeFailure(ctx, e);
throw e;
} finally {
if (out != null) {
out.release();
}
}
}