下面列出了怎么用 io.netty.handler.codec.http.multipart.HttpPostRequestEncoder 的API类实例代码及写法,或者点击链接到github查看源代码。
public boolean predict(
String modelName, DefaultFullHttpRequest req, HttpPostRequestEncoder requestEncoder)
throws InterruptedException, HttpPostRequestEncoder.ErrorDataEncoderException,
IOException {
Channel channel = connect(bootstrap, inferencePort);
req.setUri("/predictions/" + URLEncoder.encode(modelName, StandardCharsets.UTF_8.name()));
channel.writeAndFlush(requestEncoder.finalizeRequest());
if (requestEncoder.isChunked()) {
channel.writeAndFlush(requestEncoder).sync();
}
channel.closeFuture().sync();
int statusCode = handler.getStatusCode();
String ret = handler.getContent();
if (statusCode == 200) {
logger.info("predict: {} success.", modelName);
logger.trace(ret);
return true;
}
logger.warn("predict: {} failed: {}", modelName, ret);
return false;
}
private void runTest(HttpClient client, ModelInfo info, Logger logger)
throws HttpPostRequestEncoder.ErrorDataEncoderException, InterruptedException,
IOException {
String modelName = info.getModelName();
String url = info.getUrl();
int type = info.getType();
logger.info("Testing model: {}={}", modelName, url);
if (!client.registerModel(modelName, url)) {
failedModels.add(url);
return;
}
try {
if (!predict(client, type, modelName)) {
failedModels.add(url);
}
} finally {
if (!client.unregisterModel(modelName)) {
failedModels.add(url);
}
}
}
private void testInvocationsMultipart(Channel channel)
throws InterruptedException, HttpPostRequestEncoder.ErrorDataEncoderException,
IOException {
result = null;
latch = new CountDownLatch(1);
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/invocations");
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
encoder.addBodyAttribute("model_name", "noop_v0.1");
MemoryFileUpload body =
new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 4);
body.setContent(Unpooled.copiedBuffer("test", StandardCharsets.UTF_8));
encoder.addBodyHttpData(body);
channel.writeAndFlush(encoder.finalizeRequest());
if (encoder.isChunked()) {
channel.writeAndFlush(encoder).sync();
}
latch.await();
Assert.assertEquals(result, "OK");
}
private void testModelsInvokeMultipart(Channel channel)
throws InterruptedException, HttpPostRequestEncoder.ErrorDataEncoderException,
IOException {
result = null;
latch = new CountDownLatch(1);
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/models/noop/invoke");
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
MemoryFileUpload body =
new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 4);
body.setContent(Unpooled.copiedBuffer("test", StandardCharsets.UTF_8));
encoder.addBodyHttpData(body);
channel.writeAndFlush(encoder.finalizeRequest());
if (encoder.isChunked()) {
channel.writeAndFlush(encoder).sync();
}
latch.await();
Assert.assertEquals(result, "OK");
}
/**
* Tests multipart POST and verifies it via GET operations.
* @throws Exception
*/
@Test
public void multipartPostGetHeadUpdateDeleteUndeleteTest() throws Exception {
Account refAccount = ACCOUNT_SERVICE.createAndAddRandomAccount();
Container refContainer = refAccount.getContainerById(Container.DEFAULT_PUBLIC_CONTAINER_ID);
doPostGetHeadUpdateDeleteUndeleteTest(0, refAccount, refContainer, refAccount.getName(),
!refContainer.isCacheable(), refAccount.getName(), refContainer.getName(), true);
doPostGetHeadUpdateDeleteUndeleteTest(FRONTEND_CONFIG.chunkedGetResponseThresholdInBytes * 3, refAccount,
refContainer, refAccount.getName(), !refContainer.isCacheable(), refAccount.getName(), refContainer.getName(),
true);
// failure case
// size of content being POSTed is higher than what is allowed via multipart/form-data
long maxAllowedSizeBytes = new NettyConfig(FRONTEND_VERIFIABLE_PROPS).nettyMultipartPostMaxSizeBytes;
ByteBuffer content = ByteBuffer.wrap(TestUtils.getRandomBytes((int) maxAllowedSizeBytes + 1));
HttpHeaders headers = new DefaultHttpHeaders();
setAmbryHeadersForPut(headers, TTL_SECS, !refContainer.isCacheable(), refAccount.getName(),
"application/octet-stream", null, refAccount.getName(), refContainer.getName());
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, "/", headers);
HttpPostRequestEncoder encoder = createEncoder(httpRequest, content, ByteBuffer.allocate(0));
ResponseParts responseParts = nettyClient.sendRequest(encoder.finalizeRequest(), encoder, null).get();
HttpResponse response = getHttpResponse(responseParts);
assertEquals("Unexpected response status", HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, response.status());
assertTrue("No Date header", response.headers().getTimeMillis(HttpHeaderNames.DATE, -1) != -1);
assertFalse("Channel should not be active", HttpUtil.isKeepAlive(response));
}
/**
* Creates a {@link NettyMultipartRequest} with the given {@code headers} and {@code parts}.
* @param headers the {@link HttpHeaders} that need to be added to the request.
* @param parts the files that will form the parts of the request.
* @return a {@link NettyMultipartRequest} containing all the {@code headers} and {@code parts}.
* @throws Exception
*/
private NettyMultipartRequest createRequest(HttpHeaders headers, InMemoryFile[] parts) throws Exception {
HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
if (headers != null) {
httpRequest.headers().set(headers);
}
HttpPostRequestEncoder encoder = createEncoder(httpRequest, parts);
NettyMultipartRequest request =
new NettyMultipartRequest(encoder.finalizeRequest(), new MockChannel(), NETTY_METRICS, Collections.emptySet(),
Long.MAX_VALUE);
assertTrue("Request channel is not open", request.isOpen());
while (!encoder.isEndOfInput()) {
// Sending null for ctx because the encoder is OK with that.
request.addContent(encoder.readChunk(PooledByteBufAllocator.DEFAULT));
}
return request;
}
/**
* Creates a {@link HttpPostRequestEncoder} that encodes the given {@code request} and {@code parts}.
* @param request the {@link HttpRequest} containing headers and other metadata about the request.
* @param parts the {@link InMemoryFile}s that will form the parts of the request.
* @return a {@link HttpPostRequestEncoder} that can encode the {@code request} and {@code parts}.
* @throws HttpPostRequestEncoder.ErrorDataEncoderException
* @throws IOException
*/
private HttpPostRequestEncoder createEncoder(HttpRequest request, InMemoryFile[] parts)
throws HttpPostRequestEncoder.ErrorDataEncoderException, IOException {
HttpDataFactory httpDataFactory = new DefaultHttpDataFactory(false);
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(httpDataFactory, request, true);
if (parts != null) {
for (InMemoryFile part : parts) {
FileUpload fileUpload =
new MemoryFileUpload(part.name, part.name, "application/octet-stream", "", Charset.forName("UTF-8"),
part.content.remaining());
fileUpload.setContent(Unpooled.wrappedBuffer(part.content));
encoder.addBodyHttpData(fileUpload);
}
}
return encoder;
}
/**
* Tests the case where multipart upload is used.
* @throws Exception
*/
@Test
public void multipartPostTest() throws Exception {
Random random = new Random();
ByteBuffer content = ByteBuffer.wrap(TestUtils.getRandomBytes(random.nextInt(128) + 128));
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, "/", null);
httpRequest.headers().set(RestUtils.Headers.SERVICE_ID, "rawBytesPostTest");
HttpPostRequestEncoder encoder = createEncoder(httpRequest, content);
HttpRequest postRequest = encoder.finalizeRequest();
List<ByteBuffer> contents = new ArrayList<ByteBuffer>();
while (!encoder.isEndOfInput()) {
// Sending null for ctx because the encoder is OK with that.
contents.add(encoder.readChunk(PooledByteBufAllocator.DEFAULT).content().nioBuffer());
}
ByteBuffer receivedContent = doPostTest(postRequest, contents);
compareContent(receivedContent, Collections.singletonList(content));
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testInvocationsJson"})
public void testInvocationsMultipart()
throws InterruptedException, HttpPostRequestEncoder.ErrorDataEncoderException,
IOException {
Channel channel = TestUtils.getInferenceChannel(configManager);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/invocations");
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
encoder.addBodyAttribute("model_name", "noop_v1.0");
MemoryFileUpload body =
new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 4);
body.setContent(Unpooled.copiedBuffer("test", StandardCharsets.UTF_8));
encoder.addBodyHttpData(body);
channel.writeAndFlush(encoder.finalizeRequest());
if (encoder.isChunked()) {
channel.writeAndFlush(encoder).sync();
}
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getResult(), "OK");
}
@Test(
alwaysRun = true,
dependsOnMethods = {"testModelsInvokeJson"})
public void testModelsInvokeMultipart()
throws InterruptedException, HttpPostRequestEncoder.ErrorDataEncoderException,
IOException {
Channel channel = TestUtils.getInferenceChannel(configManager);
TestUtils.setResult(null);
TestUtils.setLatch(new CountDownLatch(1));
DefaultFullHttpRequest req =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, "/models/noop/invoke");
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(req, true);
MemoryFileUpload body =
new MemoryFileUpload("data", "test.txt", "text/plain", null, null, 4);
body.setContent(Unpooled.copiedBuffer("test", StandardCharsets.UTF_8));
encoder.addBodyHttpData(body);
channel.writeAndFlush(encoder.finalizeRequest());
if (encoder.isChunked()) {
channel.writeAndFlush(encoder).sync();
}
TestUtils.getLatch().await();
Assert.assertEquals(TestUtils.getResult(), "OK");
}
public MultipartFormUpload(Context context,
MultipartForm parts,
boolean multipart,
HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception {
this.context = context;
this.pending = new InboundBuffer<>(context)
.handler(this::handleChunk)
.drainHandler(v -> run()).pause();
this.request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
io.netty.handler.codec.http.HttpMethod.POST,
"/");
this.encoder = new HttpPostRequestEncoder(
new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE),
request,
multipart,
HttpConstants.DEFAULT_CHARSET,
encoderMode);
for (FormDataPart formDataPart : parts) {
if (formDataPart.isAttribute()) {
encoder.addBodyAttribute(formDataPart.name(), formDataPart.value());
} else {
encoder.addBodyFileUpload(formDataPart.name(),
formDataPart.filename(), new File(formDataPart.pathname()),
formDataPart.mediaType(), formDataPart.isText());
}
}
encoder.finalizeRequest();
}
@Test
public void testFileUploadWhenFileDoesNotExist() {
HttpRequest<Buffer> builder = webClient.post("somepath");
MultipartForm form = MultipartForm.create()
.textFileUpload("file", "nonexistentFilename", "nonexistentPathname", "text/plain");
builder.sendMultipartForm(form, onFailure(err -> {
assertEquals(err.getClass(), StreamResetException.class);
assertEquals(err.getCause().getClass(), HttpPostRequestEncoder.ErrorDataEncoderException.class);
complete();
}));
await();
}
@Test
public void testFileUploads() {
HttpRequest<Buffer> builder = webClient.post("somepath");
MultipartForm form = MultipartForm.create()
.textFileUpload("file", "nonexistentFilename", "nonexistentPathname", "text/plain");
builder.sendMultipartForm(form, onFailure(err -> {
assertEquals(err.getClass(), StreamResetException.class);
assertEquals(err.getCause().getClass(), HttpPostRequestEncoder.ErrorDataEncoderException.class);
complete();
}));
await();
}
@Test
public void testSimpleAttribute(TestContext ctx) throws Exception {
Async async = ctx.async();
Buffer result = Buffer.buffer();
MultipartFormUpload upload = new MultipartFormUpload(vertx.getOrCreateContext(), MultipartForm.create().attribute("foo", "bar"), false, HttpPostRequestEncoder.EncoderMode.RFC1738);
upload.run();
upload.endHandler(v -> {
assertEquals("foo=bar", result.toString());
async.complete();
});
upload.handler(result::appendBuffer);
upload.resume();
}
private void testFileUpload(TestContext ctx, boolean paused) throws Exception {
File file = testFolder.newFile();
Files.write(file.toPath(), TestUtils.randomByteArray(32 * 1024));
String filename = file.getName();
String pathname = file.getAbsolutePath();
Async async = ctx.async();
Context context = vertx.getOrCreateContext();
context.runOnContext(v1 -> {
try {
MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().textFileUpload(
"the-file",
filename,
pathname,
"text/plain"), true, HttpPostRequestEncoder.EncoderMode.RFC1738);
List<Buffer> buffers = Collections.synchronizedList(new ArrayList<>());
AtomicInteger end = new AtomicInteger();
upload.endHandler(v2 -> {
assertEquals(0, end.getAndIncrement());
ctx.assertTrue(buffers.size() > 0);
async.complete();
});
upload.handler(buffer -> {
assertEquals(0, end.get());
buffers.add(buffer);
});
if (!paused) {
upload.resume();
}
upload.run();
if (paused) {
context.runOnContext(v3 -> upload.resume());
}
} catch (Exception e) {
ctx.fail(e);
throw new AssertionError(e);
}
});
}
/**
* Gets the encoded size for a set of bytes
* @param bytes the bytes to encode.
* @return the encoded size
* @throws Exception
*/
private int getEncodedSize(byte[] bytes) throws Exception {
int encodedSize = 0;
InMemoryFile[] files = {new InMemoryFile(RestUtils.MultipartPost.BLOB_PART, ByteBuffer.wrap(bytes))};
HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
HttpPostRequestEncoder encoder = createEncoder(httpRequest, files);
encoder.finalizeRequest();
while (!encoder.isEndOfInput()) {
HttpContent httpContent = encoder.readChunk(PooledByteBufAllocator.DEFAULT);
encodedSize += httpContent.content().readableBytes();
}
return encodedSize;
}
/**
* Creates a {@link HttpPostRequestEncoder} that encodes the given {@code request} and {@code blobContent}.
* @param request the {@link HttpRequest} containing headers and other metadata about the request.
* @param blobContent the {@link ByteBuffer} that represents the content of the blob.
* @return a {@link HttpPostRequestEncoder} that can encode the {@code request} and {@code blobContent}.
* @throws HttpPostRequestEncoder.ErrorDataEncoderException
* @throws IOException
*/
private HttpPostRequestEncoder createEncoder(HttpRequest request, ByteBuffer blobContent)
throws HttpPostRequestEncoder.ErrorDataEncoderException, IOException {
HttpDataFactory httpDataFactory = new DefaultHttpDataFactory(false);
HttpPostRequestEncoder encoder = new HttpPostRequestEncoder(httpDataFactory, request, true);
FileUpload fileUpload = new MemoryFileUpload(RestUtils.MultipartPost.BLOB_PART, RestUtils.MultipartPost.BLOB_PART,
"application/octet-stream", "", Charset.forName("UTF-8"), blobContent.remaining());
fileUpload.setContent(Unpooled.wrappedBuffer(blobContent));
encoder.addBodyHttpData(fileUpload);
return encoder;
}
/**
* Standard post without multipart but already support on Factory (memory management)
*
* @return the list of HttpData object (attribute and file) to be reused on next post
*/
private static List<InterfaceHttpData> formpost(
Bootstrap bootstrap,
String host, int port, URI uriSimple, File file, HttpDataFactory factory,
List<Entry<String, String>> headers) throws Exception {
// XXX /formpost
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(SocketUtils.socketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.sync().channel();
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriSimple.toASCIIString());
// Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder =
new HttpPostRequestEncoder(factory, request, false); // false => not multipart
// it is legal to add directly header or cookie into the request until finalize
for (Entry<String, String> entry : headers) {
request.headers().set(entry.getKey(), entry.getValue());
}
// add Form attribute
bodyRequestEncoder.addBodyAttribute("getform", "POST");
bodyRequestEncoder.addBodyAttribute("info", "first value");
bodyRequestEncoder.addBodyAttribute("secondinfo", "secondvalue ���&");
bodyRequestEncoder.addBodyAttribute("thirdinfo", textArea);
bodyRequestEncoder.addBodyAttribute("fourthinfo", textAreaLong);
bodyRequestEncoder.addBodyFileUpload("myfile", file, "application/x-zip-compressed", false);
// finalize request
request = bodyRequestEncoder.finalizeRequest();
// Create the bodylist to be reused on the last version with Multipart support
List<InterfaceHttpData> bodylist = bodyRequestEncoder.getBodyListAttributes();
// send request
channel.write(request);
// test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked()) { // could do either request.isChunked()
// either do it through ChunkedWriteHandler
channel.write(bodyRequestEncoder);
}
channel.flush();
// Do not clear here since we will reuse the InterfaceHttpData on the next request
// for the example (limit action on client side). Take this as a broadcast of the same
// request on both Post actions.
//
// On standard program, it is clearly recommended to clean all files after each request
// bodyRequestEncoder.cleanFiles();
// Wait for the server to close the connection.
channel.closeFuture().sync();
return bodylist;
}
/**
* Multipart example
*/
private static void formpostmultipart(
Bootstrap bootstrap, String host, int port, URI uriFile, HttpDataFactory factory,
Iterable<Entry<String, String>> headers, List<InterfaceHttpData> bodylist) throws Exception {
// XXX /formpostmultipart
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(SocketUtils.socketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.sync().channel();
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriFile.toASCIIString());
// Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder =
new HttpPostRequestEncoder(factory, request, true); // true => multipart
// it is legal to add directly header or cookie into the request until finalize
for (Entry<String, String> entry : headers) {
request.headers().set(entry.getKey(), entry.getValue());
}
// add Form attribute from previous request in formpost()
bodyRequestEncoder.setBodyHttpDatas(bodylist);
// finalize request
bodyRequestEncoder.finalizeRequest();
// send request
channel.write(request);
// test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked()) {
channel.write(bodyRequestEncoder);
}
channel.flush();
// Now no more use of file representation (and list of HttpData)
bodyRequestEncoder.cleanFiles();
// Wait for the server to close the connection.
channel.closeFuture().sync();
}
public RequestHolder(Route route, URL url, HttpRequest request, HttpPostRequestEncoder bodyRequestEncoder) {
this.route = route;
this.url = url;
this.request = request;
this.bodyRequestEncoder = bodyRequestEncoder;
}
@Test
public void test()
throws InterruptedException, HttpPostRequestEncoder.ErrorDataEncoderException,
IOException, NoSuchFieldException, IllegalAccessException {
Channel channel = null;
Channel managementChannel = null;
for (int i = 0; i < 5; ++i) {
channel = connect(false);
if (channel != null) {
break;
}
Thread.sleep(100);
}
for (int i = 0; i < 5; ++i) {
managementChannel = connect(true);
if (managementChannel != null) {
break;
}
Thread.sleep(100);
}
Assert.assertNotNull(channel, "Failed to connect to inference port.");
Assert.assertNotNull(managementChannel, "Failed to connect to management port.");
testPing(channel);
testRoot(channel, listInferenceApisResult);
testRoot(managementChannel, listManagementApisResult);
testApiDescription(channel, listInferenceApisResult);
testDescribeApi(channel);
testUnregisterModel(managementChannel);
testLoadModel(managementChannel);
testSyncScaleModel(managementChannel);
testScaleModel(managementChannel);
testListModels(managementChannel);
testDescribeModel(managementChannel);
testLoadModelWithInitialWorkers(managementChannel);
testLoadModelWithInitialWorkersWithJSONReqBody(managementChannel);
testPredictions(channel);
testPredictionsBinary(channel);
testPredictionsJson(channel);
testInvocationsJson(channel);
testInvocationsMultipart(channel);
testModelsInvokeJson(channel);
testModelsInvokeMultipart(channel);
testLegacyPredict(channel);
testPredictionsInvalidRequestSize(channel);
testPredictionsValidRequestSize(channel);
testPredictionsDecodeRequest(channel, managementChannel);
testPredictionsDoNotDecodeRequest(channel, managementChannel);
testPredictionsModifyResponseHeader(channel, managementChannel);
testPredictionsNoManifest(channel, managementChannel);
testModelRegisterWithDefaultWorkers(managementChannel);
testLogging(channel, managementChannel);
testLoggingUnload(channel, managementChannel);
testLoadingMemoryError();
testPredictionMemoryError();
testMetricManager();
testErrorBatch();
channel.close();
managementChannel.close();
// negative test case, channel will be closed by server
testInvalidRootRequest();
testInvalidInferenceUri();
testInvalidPredictionsUri();
testInvalidDescribeModel();
testPredictionsModelNotFound();
testInvalidManagementUri();
testInvalidModelsMethod();
testInvalidModelMethod();
testDescribeModelNotFound();
testRegisterModelMissingUrl();
testRegisterModelInvalidRuntime();
testRegisterModelNotFound();
testRegisterModelConflict();
testRegisterModelMalformedUrl();
testRegisterModelConnectionFailed();
testRegisterModelHttpError();
testRegisterModelInvalidPath();
testScaleModelNotFound();
testScaleModelFailure();
testUnregisterModelNotFound();
testUnregisterModelTimeout();
testInvalidModel();
}
/**
* Standard post without multipart but already support on Factory (memory management)
*
* @return the list of HttpData object (attribute and file) to be reused on next post
*/
private static List<InterfaceHttpData> formpost(
Bootstrap bootstrap,
String host, int port, URI uriSimple, File file, HttpDataFactory factory,
List<Entry<String, String>> headers) throws Exception {
// XXX /formpost
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(SocketUtils.socketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.sync().channel();
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriSimple.toASCIIString());
// Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder =
new HttpPostRequestEncoder(factory, request, false); // false => not multipart
// it is legal to add directly header or cookie into the request until finalize
for (Entry<String, String> entry : headers) {
request.headers().set(entry.getKey(), entry.getValue());
}
// add Form attribute
bodyRequestEncoder.addBodyAttribute("getform", "POST");
bodyRequestEncoder.addBodyAttribute("info", "first value");
bodyRequestEncoder.addBodyAttribute("secondinfo", "secondvalue ���&");
bodyRequestEncoder.addBodyAttribute("thirdinfo", textArea);
bodyRequestEncoder.addBodyAttribute("fourthinfo", textAreaLong);
bodyRequestEncoder.addBodyFileUpload("myfile", file, "application/x-zip-compressed", false);
// finalize request
request = bodyRequestEncoder.finalizeRequest();
// Create the bodylist to be reused on the last version with Multipart support
List<InterfaceHttpData> bodylist = bodyRequestEncoder.getBodyListAttributes();
// send request
channel.write(request);
// test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked()) { // could do either request.isChunked()
// either do it through ChunkedWriteHandler
channel.write(bodyRequestEncoder);
}
channel.flush();
// Do not clear here since we will reuse the InterfaceHttpData on the next request
// for the example (limit action on client side). Take this as a broadcast of the same
// request on both Post actions.
//
// On standard program, it is clearly recommended to clean all files after each request
// bodyRequestEncoder.cleanFiles();
// Wait for the server to close the connection.
channel.closeFuture().sync();
return bodylist;
}
/**
* Multipart example
*/
private static void formpostmultipart(
Bootstrap bootstrap, String host, int port, URI uriFile, HttpDataFactory factory,
Iterable<Entry<String, String>> headers, List<InterfaceHttpData> bodylist) throws Exception {
// XXX /formpostmultipart
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(SocketUtils.socketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.sync().channel();
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriFile.toASCIIString());
// Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder =
new HttpPostRequestEncoder(factory, request, true); // true => multipart
// it is legal to add directly header or cookie into the request until finalize
for (Entry<String, String> entry : headers) {
request.headers().set(entry.getKey(), entry.getValue());
}
// add Form attribute from previous request in formpost()
bodyRequestEncoder.setBodyHttpDatas(bodylist);
// finalize request
bodyRequestEncoder.finalizeRequest();
// send request
channel.write(request);
// test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked()) {
channel.write(bodyRequestEncoder);
}
channel.flush();
// Now no more use of file representation (and list of HttpData)
bodyRequestEncoder.cleanFiles();
// Wait for the server to close the connection.
channel.closeFuture().sync();
}
@SuppressWarnings("FutureReturnValueIgnored")
void _subscribe(CoreSubscriber<? super Void> s) {
HttpDataFactory df = DEFAULT_FACTORY;
try {
HttpClientFormEncoder encoder = new HttpClientFormEncoder(df,
parent.nettyRequest,
false,
HttpConstants.DEFAULT_CHARSET,
HttpPostRequestEncoder.EncoderMode.RFC1738);
formCallback.accept(parent, encoder);
encoder = encoder.applyChanges(parent.nettyRequest);
df = encoder.newFactory;
if (!encoder.isMultipart()) {
parent.requestHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
}
// Returned value is deliberately ignored
parent.addHandlerFirst(NettyPipeline.ChunkedWriter, new ChunkedWriteHandler());
boolean chunked = HttpUtil.isTransferEncodingChunked(parent.nettyRequest);
HttpRequest r = encoder.finalizeRequest();
if (!chunked) {
HttpUtil.setTransferEncodingChunked(r, false);
HttpUtil.setContentLength(r, encoder.length());
}
ChannelFuture f = parent.channel()
.writeAndFlush(r);
Flux<Long> tail = encoder.progressFlux.onBackpressureLatest();
if (encoder.cleanOnTerminate) {
tail = tail.doOnCancel(encoder)
.doAfterTerminate(encoder);
}
if (encoder.isChunked()) {
if (progressCallback != null) {
progressCallback.accept(tail);
}
//"FutureReturnValueIgnored" this is deliberate
parent.channel()
.writeAndFlush(encoder);
}
else {
if (progressCallback != null) {
progressCallback.accept(FutureMono.from(f)
.cast(Long.class)
.switchIfEmpty(Mono.just(encoder.length()))
.flux());
}
}
s.onComplete();
}
catch (Throwable e) {
Exceptions.throwIfJvmFatal(e);
df.cleanRequestHttpData(parent.nettyRequest);
s.onError(Exceptions.unwrap(e));
}
}
/**
* Standard post without multipart but already support on Factory (memory management)
*
* @return the list of HttpData object (attribute and file) to be reused on next post
*/
private static List<InterfaceHttpData> formpost(
Bootstrap bootstrap,
String host, int port, URI uriSimple, File file, HttpDataFactory factory,
List<Entry<String, String>> headers) throws Exception {
// XXX /formpost
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.sync().channel();
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriSimple.toASCIIString());
// Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder =
new HttpPostRequestEncoder(factory, request, false); // false => not multipart
// it is legal to add directly header or cookie into the request until finalize
for (Entry<String, String> entry : headers) {
request.headers().set(entry.getKey(), entry.getValue());
}
// add Form attribute
bodyRequestEncoder.addBodyAttribute("getform", "POST");
bodyRequestEncoder.addBodyAttribute("info", "first value");
bodyRequestEncoder.addBodyAttribute("secondinfo", "secondvalue ���&");
bodyRequestEncoder.addBodyAttribute("thirdinfo", textArea);
bodyRequestEncoder.addBodyAttribute("fourthinfo", textAreaLong);
bodyRequestEncoder.addBodyFileUpload("myfile", file, "application/x-zip-compressed", false);
// finalize request
request = bodyRequestEncoder.finalizeRequest();
// Create the bodylist to be reused on the last version with Multipart support
List<InterfaceHttpData> bodylist = bodyRequestEncoder.getBodyListAttributes();
// send request
channel.write(request);
// test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked()) { // could do either request.isChunked()
// either do it through ChunkedWriteHandler
channel.write(bodyRequestEncoder);
}
channel.flush();
// Do not clear here since we will reuse the InterfaceHttpData on the next request
// for the example (limit action on client side). Take this as a broadcast of the same
// request on both Post actions.
//
// On standard program, it is clearly recommended to clean all files after each request
// bodyRequestEncoder.cleanFiles();
// Wait for the server to close the connection.
channel.closeFuture().sync();
return bodylist;
}
/**
* Multipart example
*/
private static void formpostmultipart(
Bootstrap bootstrap, String host, int port, URI uriFile, HttpDataFactory factory,
List<Entry<String, String>> headers, List<InterfaceHttpData> bodylist) throws Exception {
// XXX /formpostmultipart
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.sync().channel();
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriFile.toASCIIString());
// Use the PostBody encoder
HttpPostRequestEncoder bodyRequestEncoder =
new HttpPostRequestEncoder(factory, request, true); // true => multipart
// it is legal to add directly header or cookie into the request until finalize
for (Entry<String, String> entry : headers) {
request.headers().set(entry.getKey(), entry.getValue());
}
// add Form attribute from previous request in formpost()
bodyRequestEncoder.setBodyHttpDatas(bodylist);
// finalize request
bodyRequestEncoder.finalizeRequest();
// send request
channel.write(request);
// test if request was chunked and if so, finish the write
if (bodyRequestEncoder.isChunked()) {
channel.write(bodyRequestEncoder);
}
channel.flush();
// Now no more use of file representation (and list of HttpData)
bodyRequestEncoder.cleanFiles();
// Wait for the server to close the connection.
channel.closeFuture().sync();
}
/**
* Tests to make sure the max allowed size for multipart requests is enforced.
* @throws Exception
*/
// Disabling test because the encoded size is different at different times and it is hard to predict. Need a better
// test
//@Test
public void sizeLimitationTest() throws Exception {
int blobPartSize = 1024;
byte[] bytes = TestUtils.getRandomBytes(blobPartSize);
int encodedSize = getEncodedSize(bytes);
long[] maxSizesAllowed = {encodedSize + 1, encodedSize, encodedSize - 1, 0};
for (long maxSizeAllowed : maxSizesAllowed) {
InMemoryFile[] files = {new InMemoryFile(RestUtils.MultipartPost.BLOB_PART, ByteBuffer.wrap(bytes))};
HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
HttpPostRequestEncoder encoder = createEncoder(httpRequest, files);
NettyMultipartRequest request =
new NettyMultipartRequest(encoder.finalizeRequest(), new MockChannel(), NETTY_METRICS, Collections.emptySet(),
maxSizeAllowed);
assertTrue("Request channel is not open", request.isOpen());
long currentSizeAdded = 0;
boolean failedToAdd = false;
while (!encoder.isEndOfInput()) {
HttpContent httpContent = encoder.readChunk(PooledByteBufAllocator.DEFAULT);
int readableBytes = httpContent.content().readableBytes();
if (currentSizeAdded + readableBytes <= maxSizeAllowed) {
request.addContent(httpContent);
} else {
assertTrue("Max size [" + maxSizeAllowed + "] must be lesser than content size: " + encodedSize,
maxSizeAllowed < encodedSize);
try {
request.addContent(httpContent);
fail("Should have failed to add content of size [" + encodedSize
+ "] because it is over the max size allowed: " + maxSizeAllowed);
} catch (RestServiceException e) {
failedToAdd = true;
assertEquals("Unexpected RestServiceErrorCode", RestServiceErrorCode.RequestTooLarge, e.getErrorCode());
break;
}
}
currentSizeAdded += readableBytes;
}
assertEquals(
"Success state not as expected. maxSizeAllowed=[" + maxSizeAllowed + "], encodedSize expected=[" + encodedSize
+ "], actual size added=[" + currentSizeAdded + "]", maxSizeAllowed < encodedSize, failedToAdd);
}
}
/**
* Posts a blob with the given {@code headers} and {@code content}.
* @param headers the headers required.
* @param content the content of the blob.
* @param usermetadata the {@link ByteBuffer} that represents user metadata
* @return the blob ID of the blob.
* @throws Exception
*/
private String multipartPostBlobAndVerify(HttpHeaders headers, ByteBuffer content, ByteBuffer usermetadata)
throws Exception {
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, "/", headers);
HttpPostRequestEncoder encoder = createEncoder(httpRequest, content, usermetadata);
ResponseParts responseParts = nettyClient.sendRequest(encoder.finalizeRequest(), encoder, null).get();
return verifyPostAndReturnBlobId(responseParts, content.capacity());
}
/**
* Set Form encoding
*
* @param mode the encoding mode for this form encoding
* @return this builder
*/
HttpClientForm encoding(HttpPostRequestEncoder.EncoderMode mode);