下面列出了io.netty.util.CharsetUtil#UTF_8 ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private FileMsg getMsgHeader(ByteBuf in) {
long sumCountPackage = in.readLong();
short fileNameLength = in.readShort();
byte[] fileNameData = new byte[fileNameLength];
in.readBytes(fileNameData);
String filename = new String(fileNameData, CharsetUtil.UTF_8);
// 2、数据源node节点名称
short srcNodeLength = in.readShort();
byte[] srcNodeData = new byte[srcNodeLength];
in.readBytes(srcNodeData);
String srcNode = new String(srcNodeData, CharsetUtil.UTF_8);
//3、数据文件md5值
short fileMd5Length = in.readShort();
byte[] fileMd5Data = new byte[fileMd5Length];
in.readBytes(fileMd5Data);
String fileMd5 = new String(fileMd5Data, CharsetUtil.UTF_8);
FileMsg msg = new FileMsg();
msg.setSumCountPackage(sumCountPackage);
msg.setFileName(filename);
msg.setSrcNode(srcNode);
msg.setFileMd5(fileMd5);
return msg;
}
/**
* 获取请求参数的Map
* @param request http请求
* @return 参数map
*/
public static Map<String, List<String>> getParameterMap(HttpRequest request){
Map<String, List<String>> paramMap = new HashMap<>();
HttpMethod method = request.method();
if(HttpMethod.GET.equals(method)){
String uri = request.uri();
QueryStringDecoder queryDecoder = new QueryStringDecoder(uri, CharsetUtil.UTF_8);
paramMap = queryDecoder.parameters();
}else if(HttpMethod.POST.equals(method)){
FullHttpRequest fullRequest = (FullHttpRequest) request;
paramMap = getPostParamMap(fullRequest);
}
return paramMap;
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final ByteBuf byteBuf = (ByteBuf) msg;
final byte[] bytes = ByteBufUtil.getBytes(byteBuf);
final String str = new String(bytes, CharsetUtil.UTF_8);
strFuture.set(str);
}
@Test
public void testMultiFileUploadInHtml5Mode() throws Exception {
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.POST, "http://localhost");
DefaultHttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(factory,
request, true, CharsetUtil.UTF_8, EncoderMode.HTML5);
File file1 = new File(getClass().getResource("/file-01.txt").toURI());
encoder.addBodyAttribute("foo", "bar");
encoder.addBodyFileUpload("quux", file1, "text/plain", false);
String multipartDataBoundary = encoder.multipartDataBoundary;
String content = getRequestBody(encoder);
String expected = "--" + multipartDataBoundary + "\r\n" +
CONTENT_DISPOSITION + ": form-data; name=\"foo\"" + "\r\n" +
CONTENT_LENGTH + ": 3" + "\r\n" +
CONTENT_TYPE + ": text/plain; charset=UTF-8" + "\r\n" +
"\r\n" +
"bar" +
"\r\n" +
"--" + multipartDataBoundary + "\r\n" +
CONTENT_DISPOSITION + ": form-data; name=\"quux\"; filename=\"file-01.txt\"" + "\r\n" +
CONTENT_LENGTH + ": " + file1.length() + "\r\n" +
CONTENT_TYPE + ": text/plain" + "\r\n" +
CONTENT_TRANSFER_ENCODING + ": binary" + "\r\n" +
"\r\n" +
"File 01" + StringUtil.NEWLINE +
"\r\n" +
"--" + multipartDataBoundary + "--" + "\r\n";
assertEquals(expected, content);
}
private String decode(final String encodedCredentials) {
try {
return new String(Base64.getDecoder().decode(encodedCredentials), CharsetUtil.UTF_8);
} catch (final Exception e) {
logger.error("Error decoding credentials " + encodedCredentials, e);
return null;
}
}
@Test
public void uber_constructor_for_full_response_sets_fields_as_expected() {
// given
String content = UUID.randomUUID().toString();
int httpStatusCode = 200;
HttpHeaders headers = new DefaultHttpHeaders();
String mimeType = "text/text";
Charset contentCharset = CharsetUtil.UTF_8;
Set<Cookie> cookies = Sets.newHashSet(new DefaultCookie("key1", "val1"), new DefaultCookie("key2", "val2"));
boolean preventCompressedResponse = true;
// when
FullResponseInfo<String> responseInfo = new FullResponseInfo<>(content, httpStatusCode, headers, mimeType, contentCharset, cookies, preventCompressedResponse);
// then
assertThat(responseInfo.getContentForFullResponse(), is(content));
assertThat(responseInfo.getHttpStatusCode(), is(httpStatusCode));
assertThat(responseInfo.getHeaders(), is(headers));
assertThat(responseInfo.getDesiredContentWriterMimeType(), is(mimeType));
assertThat(responseInfo.getDesiredContentWriterEncoding(), is(contentCharset));
assertThat(responseInfo.getCookies(), is(cookies));
assertThat(responseInfo.getUncompressedRawContentLength(), nullValue());
assertThat(responseInfo.getFinalContentLength(), nullValue());
assertThat(responseInfo.isPreventCompressedOutput(), is(preventCompressedResponse));
assertThat(responseInfo.isChunkedResponse(), is(false));
assertThat(responseInfo.isResponseSendingStarted(), is(false));
assertThat(responseInfo.isResponseSendingLastChunkSent(), is(false));
}
public static Configuration createConfiguration(final String host, final int port, final Option<String> database, final String userName, final Option<String> password,
final long connectTimeoutMilliSeconds, final long queryTimeoutMilliSeconds) {
final Option<Duration> queryTimeout = queryTimeoutMilliSeconds == -1 ?
Option.<Duration>apply(null) :
Option.<Duration>apply(Duration.apply(queryTimeoutMilliSeconds, TimeUnit.MILLISECONDS));
return new Configuration(userName, host, port, password, database, CharsetUtil.UTF_8, MAXIMUM_MESSAGE_SIZE,
PooledByteBufAllocator.DEFAULT, Duration.apply(connectTimeoutMilliSeconds, TimeUnit.MILLISECONDS), Duration.apply(4, TimeUnit.SECONDS),
queryTimeout);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
Integer streamId = msg.headers()
.getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
if (streamId == null) {
logger.error("HttpResponseHandler unexpected message received: " + msg);
return;
}
MapValues value = streamidMap.get(streamId);
if (value == null) {
logger.error("Message received for unknown stream id " + streamId);
ctx.close();
} else {
ByteBuf content = msg.content();
if (content.isReadable()) {
int contentLength = content.readableBytes();
byte[] arr = new byte[contentLength];
content.readBytes(arr);
String response = new String(arr, 0, contentLength, CharsetUtil.UTF_8);
logger.info("Response from Server: "+ (response));
value.setResponse(response);
}
value.getPromise()
.setSuccess();
}
}
private String dotStuffUsingByteBuf(String testString, boolean atStartOfLine, boolean appendCRLF) {
ByteBuf sourceBuffer = ALLOCATOR.buffer();
sourceBuffer.writeBytes(testString.getBytes(StandardCharsets.UTF_8));
byte[] previousBytes = atStartOfLine ? null : new byte[] { 'x', 'y' };
ByteBuf destBuffer = DotStuffing.createDotStuffedBuffer(ALLOCATOR, sourceBuffer, previousBytes,
appendCRLF ? MessageTermination.ADD_CRLF : MessageTermination.DO_NOT_TERMINATE);
byte[] bytes = new byte[destBuffer.readableBytes()];
destBuffer.getBytes(0, bytes);
return new String(bytes, CharsetUtil.UTF_8);
}
private static String getOpeningText() {
final InputStream inputStream = NettyServer.class.getResourceAsStream("/litany-against-fear.txt");
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, CharsetUtil.UTF_8))) {
String line;
final StringBuilder buffer = new StringBuilder();
while ((line = reader.readLine()) != null) {
buffer.append(line).append("\n");
}
return buffer.toString();
} catch (final IOException e) {
logger.warn("can't read opening text resource");
return "";
}
}
/**
* processConnect
* @param channel
* @param msg
*/
public void processConnect(Channel channel, MqttConnectMessage msg) {
// 消息解码器出现异常
if (msg.decoderResult().isFailure()) {
Throwable cause = msg.decoderResult().cause();
writeBackConnect(channel, ProtocolUtil.connectReturnCodeForException(cause), false, true);
return;
}
String deviceid = msg.payload().clientIdentifier();
// clientId为空或null的情况, 这里要求客户端必须提供clientId, 不管cleanSession是否为1, 此处没有参考标准协议实现
if (deviceid == null || deviceid.trim().length() == 0) {
writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false, true);
return;
}
// 用户名和密码验证, 这里要求客户端连接时必须提供用户名和密码, 不管是否设置用户名标志和密码标志为1, 此处没有参考标准协议实现
String username = msg.payload().userName();
String password = msg.payload().passwordInBytes() == null ? null
: new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
if (!authService.checkValid(deviceid, username, password)) {
writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false, true);
return;
}
boolean isCleanSession = msg.variableHeader().isCleanSession();
// 如果会话中已存储这个新连接的clientId, 就关闭之前该clientId的连接
if (sessionService.containsKey(deviceid)) {
if (isCleanSession) {
sessionService.remove(deviceid);
topicProcess.removeByCleanSession(deviceid);
procedureProcess.removeByCleanSession(deviceid);
consumerProcess.removeByCleanSession(deviceid);
}
sessionService.closeSession(deviceid);
}
// 处理遗嘱信息
MqttSession sessionStore = new MqttSession(deviceid, channel, isCleanSession, null);
if (msg.variableHeader().isWillFlag()) {
MqttPublishMessage willMessage = ProtocolUtil.publishMessage(msg.payload().willTopic(), false,
msg.variableHeader().willQos(), msg.variableHeader().isWillRetain(), 0,
msg.payload().willMessageInBytes());
sessionStore.setWillMessage(willMessage);
}
// 处理连接心跳包
int idelTimes = msg.variableHeader().keepAliveTimeSeconds();
if (idelTimes <= 0) {
idelTimes = 60;
}
if (idelTimes> 0) {
String idelStr = NettyConstant.HANDLER_NAME_HEARTCHECK;
if (channel.pipeline().names().contains(idelStr)) {
channel.pipeline().remove(idelStr);
}
channel.pipeline().addFirst(idelStr,
new IdleStateHandler(0, 0, Math.round(idelTimes * 1.5f)));
}
// 至此存储会话信息及返回接受客户端连接
sessionService.put(deviceid, sessionStore);
channel.attr(NettyConstant.CLIENTID_KEY).set(deviceid);
Boolean sessionPresent = sessionService.containsKey(deviceid) && !isCleanSession;
writeBackConnect(channel, MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent, false);
NettyLog.debug("CONNECT - clientId: {}, cleanSession: {}", deviceid, isCleanSession);
// 如果cleanSession为0, 需要重发同一clientId存储的未完成的QoS1和QoS2的DUP消息
if (!isCleanSession) {
this.consumerProcess.processHistoryPub(channel);
this.procedureProcess.processHistoryPubRel(channel);
}
}
/**
* @deprecated use {@link MqttConnectPayload#willMessageInBytes()} instead
*/
@Deprecated
public String willMessage() {
return willMessage == null ? null : new String(willMessage, CharsetUtil.UTF_8);
}
/**
* @deprecated use {@link MqttConnectPayload#passwordInBytes()} instead
*/
@Deprecated
public String password() {
return password == null ? null : new String(password, CharsetUtil.UTF_8);
}
/**
* Creates a new instance with the current system line separator and UTF-8 charset encoding.使用当前系统行分隔符和UTF-8字符集编码创建一个新实例。
*/
public LineEncoder() {
this(LineSeparator.DEFAULT, CharsetUtil.UTF_8);
}
public StringCodec() {
this(CharsetUtil.UTF_8);
}
/**
* Handles the connect packet. See spec for details on each of parameters.
*/
void connect(String cId,
String username,
byte[] passwordInBytes,
boolean will,
byte[] willMessage,
String willTopic,
boolean willRetain,
int willQosLevel,
boolean cleanSession) throws Exception {
String clientId = validateClientId(cId, cleanSession);
if (clientId == null) {
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
session.getProtocolHandler().disconnect(true);
return;
}
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8);
session.getConnection().setClientID(clientId);
ServerSessionImpl serverSession = createServerSession(username, password);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);
if (cleanSession) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean();
session.setClean(true);
}
if (will) {
isWill = true;
this.willMessage = ByteBufAllocator.DEFAULT.buffer(willMessage.length);
this.willMessage.writeBytes(willMessage);
this.willQoSLevel = willQosLevel;
this.willRetain = willRetain;
this.willTopic = willTopic;
}
session.getConnection().setConnected(true);
session.start();
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
}
}
@Override
public void handleRequest(Request request, Output output) {
throw new RuntimeException(new String(request.getRequestData(), CharsetUtil.UTF_8));
}
public static Charset defaultUtf8() {
return CharsetUtil.UTF_8;
}
@Override
public String toString() {
return new String(name, CharsetUtil.UTF_8);
}
private static void assertQueryString(String expected, String actual) {
QueryStringDecoder ed = new QueryStringDecoder(expected, CharsetUtil.UTF_8);
QueryStringDecoder ad = new QueryStringDecoder(actual, CharsetUtil.UTF_8);
Assert.assertEquals(ed.path(), ad.path());
Assert.assertEquals(ed.parameters(), ad.parameters());
}