下面列出了com.amazonaws.services.s3.model.SelectObjectContentEvent#com.amazonaws.services.s3.model.SelectObjectContentEventVisitor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** Run SQL query over non-compressed CSV file saved in s3 object. */
static String queryFile(
AmazonS3 s3client, String bucket, String s3file, @SuppressWarnings("SameParameterValue") String query) {
SelectObjectContentRequest request = generateBaseCSVRequest(bucket, s3file, query);
final AtomicBoolean isResultComplete = new AtomicBoolean(false);
String res;
try (SelectObjectContentResult result = s3client.selectObjectContent(request);
SelectObjectContentEventStream payload = result.getPayload();
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
InputStream resultInputStream = payload.getRecordsInputStream(
new SelectObjectContentEventVisitor() {
@Override
public void visit(SelectObjectContentEvent.EndEvent event) {
isResultComplete.set(true);
}
}
);
copy(resultInputStream, out);
res = out.toString().trim();
} catch (Throwable e) {
System.out.println("SQL query failure");
throw new RuntimeException("SQL query failure", e);
}
/*
* The End Event indicates all matching records have been transmitted.
* If the End Event is not received, the results may be incomplete.
*/
if (!isResultComplete.get()) {
throw new RuntimeException("S3 Select request was incomplete as End Event was not received.");
}
return res;
}
public InputStream getRecordsContent(SelectObjectContentRequest selectObjectRequest)
{
this.selectObjectRequest = requireNonNull(selectObjectRequest, "selectObjectRequest is null");
this.selectObjectContentResult = s3Client.selectObjectContent(selectObjectRequest);
return selectObjectContentResult.getPayload()
.getRecordsInputStream(
new SelectObjectContentEventVisitor()
{
@Override
public void visit(EndEvent endEvent)
{
requestComplete = true;
}
});
}
/** Run SQL query over non-compressed CSV file saved in s3 object. */
static String queryFile(
AmazonS3 s3client, String bucket, String s3file, @SuppressWarnings("SameParameterValue") String query) {
SelectObjectContentRequest request = generateBaseCSVRequest(bucket, s3file, query);
final AtomicBoolean isResultComplete = new AtomicBoolean(false);
String res;
try (SelectObjectContentResult result = s3client.selectObjectContent(request);
SelectObjectContentEventStream payload = result.getPayload();
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
InputStream resultInputStream = payload.getRecordsInputStream(
new SelectObjectContentEventVisitor() {
@Override
public void visit(SelectObjectContentEvent.EndEvent event) {
isResultComplete.set(true);
}
}
);
copy(resultInputStream, out);
res = out.toString().trim();
} catch (Throwable e) {
System.out.println("SQL query failure");
throw new RuntimeException("SQL query failure", e);
}
/*
* The End Event indicates all matching records have been transmitted.
* If the End Event is not received, the results may be incomplete.
*/
if (!isResultComplete.get()) {
throw new RuntimeException("S3 Select request was incomplete as End Event was not received.");
}
return res;
}