下面列出了怎么用com.amazonaws.services.s3.model.S3ObjectInputStream的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public byte[] serialize(String topic, S3ObjectInputStream data) {
InputStream is = data.getDelegateStream();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] byteArray = new byte[16384];
try {
while ((nRead = is.read(byteArray, 0, byteArray.length)) != -1) {
buffer.write(byteArray, 0, nRead);
}
} catch (IOException e) {
LOG.warn("I/O error while serializing data from or to topic {}: {} | {}", topic, e.getMessage(), e);
}
return buffer.toByteArray();
}
@Test
void fetchCsvFileTest() throws IOException
{
String objectKey = S3_OBJECT_KEY + ".csv";
byte[] csv = ResourceUtils.loadResourceAsByteArray(CSV_FILE_PATH);
S3Object s3Object = mock(S3Object.class);
S3ObjectInputStream s3ObjectInputStream = new S3ObjectInputStream(new ByteArrayInputStream(csv), null);
when(s3Object.getObjectContent()).thenReturn(s3ObjectInputStream);
when(amazonS3Client.getObject(S3_BUCKET_NAME, objectKey)).thenReturn(s3Object);
Set<VariableScope> scopes = Set.of(VariableScope.SCENARIO);
String variableName = "varName";
steps.fetchCsvObject(objectKey, S3_BUCKET_NAME, scopes, variableName);
verify(amazonS3Client).getObject(S3_BUCKET_NAME, objectKey);
verify(bddVariableContext).putVariable(scopes, variableName, List.of(Map.of("id", "1")));
}
/**
* convert the S3 Object to String
*/
public static String convertS3Obj2Str(S3Object s3Obj) throws IOException {
S3ObjectInputStream s3is = s3Obj.getObjectContent();
ByteArrayOutputStream fos = new ByteArrayOutputStream();
byte[] read_buf = new byte[1024];
int read_len = 0;
try {
while ((read_len = s3is.read(read_buf)) > 0) {
fos.write(read_buf, 0, read_len);
}
return fos.toString(ConstantsUnicode.UTF8);
} finally {
s3is.close();
fos.close();
}
}
@Test
public void loadGetsObjectsAndReturnsTrueIfItExistsInS3() throws Exception {
/** Setup **/
buildCacheService = new AwsS3BuildCacheService(s3, "bucketName", null, true);
doReturn(true).when(s3).doesObjectExist("bucketName", "abcdefghijkl123456789");
S3Object s3Object = mock(S3Object.class);
doReturn(s3Object).when(s3).getObject("bucketName", "abcdefghijkl123456789");
S3ObjectInputStream s3ObjectInputStream = mock(S3ObjectInputStream.class);
doReturn(s3ObjectInputStream).when(s3Object).getObjectContent();
/** Run **/
boolean result = buildCacheService.load(key, reader);
/** Check **/
assertTrue(result);
verify(reader).readFrom(s3ObjectInputStream);
}
@Test
public void contentTest() throws Exception {
URL url = this.getClass().getResource("../../../../amazon-aws-logo.jpg");
String tmpFileName = url.getFile();
File file = new File(tmpFileName);
String fileName = file.getName();
InputStream is = url.openStream();
String contentType = URLConnection.guessContentTypeFromStream(is);
contentHelper.uploadContent(contentType, file.length(), bucketName, fileName, is);
Thread.sleep(500);
boolean doesObjectExist = s3Client.doesObjectExist(bucketName, fileName);
Assert.assertTrue(doesObjectExist);
S3ObjectInputStream inputStream = contentHelper.downloadContent(bucketName, fileName);
Assert.assertNotNull(inputStream);
contentHelper.deleteContent(bucketName, fileName);
Thread.sleep(500);
doesObjectExist = s3Client.doesObjectExist(bucketName, fileName);
Assert.assertFalse(doesObjectExist);
}
@Test
public void getInventReportSuccess() throws Exception {
testLocator.setMD5checksum(testMD5);
testManifest.setFileSchema("storageClass, size");
reportRetriever = new InventoryReportRetriever(mockS3Client, testLocator, testManifest);
String expectedInventoryReportString = "testString";
byte[] expectedInventoryReportBytes = inventReportBytes(expectedInventoryReportString);
when(mockS3Object.getObjectContent()).thenReturn(new S3ObjectInputStream(
new ByteArrayInputStream(expectedInventoryReportBytes), null));
when(mockS3Client.getObject(getObjectRequestCaptor.capture())).thenReturn(mockS3Object);
String result = reportRetriever.getInventoryReportToString();
assertThat(result, is(expectedInventoryReportString));
GetObjectRequest request = getObjectRequestCaptor.getValue();
assertThat(request.getBucketName(), is("testBucket"));
assertThat(request.getKey(), is("testInventReportKey"));
}
@Test
public void getInventoryManifestSuccess() throws Exception {
InventoryManifest expectedManifest = manifest();
byte[] expectedManifestBytes = manifestBytes(expectedManifest);
when(mockS3JsonObject.getObjectContent()).thenReturn(new S3ObjectInputStream(
new ByteArrayInputStream(expectedManifestBytes), null));
String expectedChecksum = "a6121a6a788be627a68d7e9def9f6968";
byte[] expectedChecksumBytes = expectedChecksum.getBytes(StandardCharsets.UTF_8);
when(mockS3ChecksumObject.getObjectContent()).thenReturn(new S3ObjectInputStream(
new ByteArrayInputStream(expectedChecksumBytes), null));
when(mockS3Client.getObject(getObjectRequestCaptor.capture()))
.thenReturn(mockS3JsonObject)
.thenReturn(mockS3ChecksumObject);
InventoryManifest result = retriever.getInventoryManifest();
assertThat(result, is(expectedManifest));
List<GetObjectRequest> request = getObjectRequestCaptor.getAllValues();
assertThat(request.get(0).getBucketName(), is("testBucketName"));
assertThat(request.get(0).getKey(), is("testBucketKey/manifest.json"));
assertThat(request.get(1).getBucketName(), is("testBucketName"));
assertThat(request.get(1).getKey(), is("testBucketKey/manifest.checksum"));
}
@Test (expected = ChecksumMismatchException.class)
public void getInventoryManifestMD5Mismatch() throws Exception {
InventoryManifest expectedManifest = manifest();
byte[] expectedManifestBytes = manifestBytes(expectedManifest);
byte[] errorBytes = "ERROR".getBytes();
byte[] wrongManifestBytes = ArrayUtils.addAll(expectedManifestBytes, errorBytes);
when(mockS3JsonObject.getObjectContent()).thenReturn(new S3ObjectInputStream(
new ByteArrayInputStream(wrongManifestBytes), null));
String expectedChecksum = "37289f10a76751046658f6c5e0ab41d9";
byte[] expectedChecksumBytes = expectedChecksum.getBytes(StandardCharsets.UTF_8);
when(mockS3ChecksumObject.getObjectContent()).thenReturn(new S3ObjectInputStream(
new ByteArrayInputStream(expectedChecksumBytes), null));
when(mockS3Client.getObject(getObjectRequestCaptor.capture())).
thenReturn(mockS3JsonObject)
.thenReturn(mockS3ChecksumObject);
retriever.getInventoryManifest();
}
private static void writeObjectToFile(Path filePath, S3Object object) {
// Writes the contents of the S3 object to a file.
File endpointsFile = new File(filePath.toAbsolutePath().toString());
try (FileOutputStream fos = new FileOutputStream(endpointsFile);
S3ObjectInputStream s3is = object.getObjectContent()) {
byte[] read_buf = new byte[1024];
int read_len = 0;
while ((read_len = s3is.read(read_buf)) > 0) {
fos.write(read_buf, 0, read_len);
}
} catch (IOException e) {
System.err.println(e.getMessage());
System.exit(1);
}
}
public void GetFromS3(){
new Thread(new Runnable() {
@Override
public void run() {
try {
S3ObjectInputStream content = s3Client.getObject(BUCKET, KEY).getObjectContent();
byte[] bytes = IOUtils.toByteArray(content);
bitmap = BitmapFactory.decodeByteArray(bytes, 0, bytes.length);
setBitmap(bitmap);
runOnUiThread(new Runnable() {
@Override
public void run() {
imageFromS3.setScaleType(ImageView.ScaleType.FIT_XY);
imageFromS3.setImageBitmap(getBitmap());
}
});
} catch (Exception e) {
Log.e(LOG_TAG,
"Exception occurred when retrieving image from S3",
e);
}
}
}).start();
}
SignedDomain getSignedDomain(AmazonS3 s3, String domainName) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("getSignedDomain with S3: {}", domainName);
}
SignedDomain signedDomain = null;
try {
S3Object object = s3.getObject(s3BucketName, domainName);
try (S3ObjectInputStream s3is = object.getObjectContent()) {
signedDomain = jsonMapper.readValue(s3is, SignedDomain.class);
}
} catch (Exception ex) {
LOGGER.error("AWSS3ChangeLog: getSignedDomain - unable to get domain {} error: {}",
domainName, ex.getMessage());
}
return signedDomain;
}
@Override
public void run() {
SignedDomain signedDomain = null;
try {
S3Object object = s3.getObject(s3BucketName, domainName);
try (S3ObjectInputStream s3is = object.getObjectContent()) {
signedDomain = jsonMapper.readValue(s3is, SignedDomain.class);
}
} catch (Exception ex) {
LOGGER.error("AWSS3ChangeLogThread: ObjectS3Thread- getSignedDomain - unable to get domain {} error: {}",
domainName, ex.getMessage());
}
if (signedDomain != null) {
signedDomainMap.put(domainName, signedDomain);
}
}
private static void streamReadAndDownloadObject(
final File workspace,
final S3Object sessionObject,
final String downloadedFileName) throws IOException {
final File outputFile = new File(workspace, downloadedFileName);
try (final S3ObjectInputStream objectContents = sessionObject.getObjectContent();
final OutputStream outputStream = new FileOutputStream(outputFile)) {
final int BUFFER_SIZE = 8192;
final byte[] buffer = new byte[BUFFER_SIZE];
int i;
while ((i = objectContents.read(buffer)) != -1) {
outputStream.write(buffer, 0, i);
}
}
}
/**
* S3 block read would be achieved through the AmazonS3 client. Following
* are the steps to achieve: (1) Create the objectRequest from bucketName
* and filePath. (2) Set the range to the above created objectRequest. (3)
* Get the object portion through AmazonS3 client API. (4) Get the object
* content from the above object portion.
*
* @param bytesFromCurrentOffset
* bytes read till now from current offset
* @param bytesToFetch
* the number of bytes to be fetched
* @return the number of bytes read, -1 if 0 bytes read
* @throws IOException
*/
@Override
protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException
{
GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath);
rangeObjectRequest.setRange(offset + bytesFromCurrentOffset, offset + bytesFromCurrentOffset + bytesToFetch - 1);
S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest);
S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
buffer = ByteStreams.toByteArray(wrappedStream);
wrappedStream.close();
int bufferLength = buffer.length;
if (bufferLength <= 0) {
return -1;
}
return bufferLength;
}
/**
* S3 block read would be achieved through the AmazonS3 client. Following are the steps to achieve:
* (1) Create the objectRequest from bucketName and filePath.
* (2) Set the range to the above created objectRequest.
* (3) Get the object portion through AmazonS3 client API.
* (4) Get the object content from the above object portion.
* @return the block entity
* @throws IOException
*/
@Override
protected Entity readEntity() throws IOException
{
entity.clear();
GetObjectRequest rangeObjectRequest = new GetObjectRequest(
bucketName, filePath);
rangeObjectRequest.setRange(offset, blockMetadata.getLength() - 1);
S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
byte[] record = ByteStreams.toByteArray(wrappedStream);
entity.setUsedBytes(record.length);
entity.setRecord(record);
wrappedStream.close();
return entity;
}
@Test
public void testCollectWithOneSocketTimeout() throws Throwable {
S3ObjectInputStream inputStream = mock(S3ObjectInputStream.class);
when(inputStream.read(any(byte[].class), anyInt(), anyInt()))
.thenAnswer(new WriteBufferAnswer(new byte[]{102, 111, 111, 10})) // first line: foo
.thenThrow(new SocketTimeoutException()) // exception causes retry
.thenAnswer(new WriteBufferAnswer(new byte[]{102, 111, 111, 10})) // first line again, because of retry
.thenAnswer(new WriteBufferAnswer(new byte[]{98, 97, 114, 10})) // second line: bar
.thenReturn(-1);
TestingRowConsumer consumer = getObjects(Collections.singletonList("s3://fakebucket/foo"), null, inputStream, false);
Bucket rows = consumer.getBucket();
assertThat(rows.size(), is(2));
assertThat(TestingHelpers.printedTable(rows), is("foo\nbar\n"));
}
@Override
public S3Object getObject(final GetObjectRequest request) throws AmazonClientException {
assertThat(request.getBucketName(), equalTo(bucket));
final String blobName = request.getKey();
final byte[] content = blobs.get(blobName);
if (content == null) {
AmazonS3Exception exception = new AmazonS3Exception("[" + blobName + "] does not exist.");
exception.setStatusCode(404);
throw exception;
}
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(content.length);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(content), null, false));
s3Object.setKey(blobName);
s3Object.setObjectMetadata(metadata);
return s3Object;
}
private static void abortStream(InputStream in)
{
try {
if (in instanceof S3ObjectInputStream) {
((S3ObjectInputStream) in).abort();
}
else {
in.close();
}
}
catch (IOException | AbortedException ignored) {
// thrown if the current thread is in the interrupted state
}
}
/**
* Download an object data as a file
*
* @param remoteObjectName the name of object/key which contents should be downloaded
* @param localFileName the location and file name on the local machine, where the file will be downloaded
* @throws S3OperationException if there is an error during data transfer
*/
@PublicAtsApi
public void download( String remoteObjectName, String localFileName ) throws S3OperationException,
IllegalArgumentException {
localFileName = IoUtils.normalizeFilePath(localFileName);
String localDirName = IoUtils.getFilePath(localFileName);
String localFileOnlyName = IoUtils.getFileName(localFileName);
File localDir = new File(localDirName);
if (localDir.exists()) {
if (localDir.isFile()) {
throw new IllegalArgumentException("Could not create file " + localFileOnlyName + " into existing file "
+ localDirName);
}
// else dir exists
} else {
LOG.debug("Creating target directory path " + localDirName);
if (!localDir.mkdirs()) {
throw new S3OperationException("Could not create local directory path '" + localDirName
+ "' for local file specified '" + localFileName + "'");
}
}
S3Object obj = s3Client.getObject(bucketName, remoteObjectName);
try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(new File(localFileName)));
S3ObjectInputStream s3is = obj.getObjectContent();) {
byte[] readBuffArr = new byte[4096];
int readBytes = 0;
while ( (readBytes = s3is.read(readBuffArr)) >= 0) {
bos.write(readBuffArr, 0, readBytes);
}
} catch (Exception e) {
handleExeption(e, "Error while downloading object " + remoteObjectName + " to local file " + localFileName
+ ". If error persists check your endpoint, credentials and permissions.");
}
LOG.info("S3 object '" + remoteObjectName + "; is downloaded successfully from bucket '" + bucketName
+ "' to file " + localFileName);
}
@Test (expected = ChecksumMismatchException.class)
public void getInventReportMD5Mismatch() throws Exception {
testLocator.setMD5checksum("badChecksum");
testManifest.setFileSchema("storageClass, size");
reportRetriever = new InventoryReportRetriever(mockS3Client, testLocator, testManifest);
String expectedInventoryReportString = "testString";
byte[] expectedInventReportBytes = inventReportBytes(expectedInventoryReportString);
when(mockS3Object.getObjectContent()).thenReturn(new S3ObjectInputStream(
new ByteArrayInputStream(expectedInventReportBytes), null));
when(mockS3Client.getObject(getObjectRequestCaptor.capture())).thenReturn(mockS3Object);
reportRetriever.getInventoryReportToString();
}
public String getString(String bucket, String key) {
GetObjectRequest request = new GetObjectRequest(bucket, key);
S3Object response = client.getObject(request);
try (S3ObjectInputStream is = response.getObjectContent()) {
return CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
public static void main(String arg[]) throws Exception {
// The S3 bucket and document
String document = "";
String bucket = "";
AmazonS3 s3client = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(
new EndpointConfiguration("https://s3.amazonaws.com","us-east-1"))
.build();
// Get the document from S3
com.amazonaws.services.s3.model.S3Object s3object = s3client.getObject(bucket, document);
S3ObjectInputStream inputStream = s3object.getObjectContent();
BufferedImage image = ImageIO.read(inputStream);
// Call DetectDocumentText
EndpointConfiguration endpoint = new EndpointConfiguration(
"https://textract.us-east-1.amazonaws.com", "us-east-1");
AmazonTextract client = AmazonTextractClientBuilder.standard()
.withEndpointConfiguration(endpoint).build();
DetectDocumentTextRequest request = new DetectDocumentTextRequest()
.withDocument(new Document().withS3Object(new S3Object().withName(document).withBucket(bucket)));
DetectDocumentTextResult result = client.detectDocumentText(request);
// Create frame and panel.
JFrame frame = new JFrame("RotateImage");
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
DocumentText panel = new DocumentText(result, image);
panel.setPreferredSize(new Dimension(image.getWidth() , image.getHeight() ));
frame.setContentPane(panel);
frame.pack();
frame.setVisible(true);
}
public String getString(String bucket, String key) {
GetObjectRequest request = new GetObjectRequest(bucket, key);
S3Object response = client.getObject(request);
try (S3ObjectInputStream is = response.getObjectContent()) {
return CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
private String getDecryptedData(final String bucketName, final String keyName) {
String keyValue = "";
S3Object s3Object = getS3().getObject(bucketName, keyName);
if (LOG.isDebugEnabled()) {
LOG.debug("retrieving appName {}, key {}", bucketName, keyName);
}
if (null == s3Object) {
LOG.error("error retrieving key {}, from bucket {}", keyName, bucketName);
return keyValue;
}
try (S3ObjectInputStream s3InputStream = s3Object.getObjectContent();
ByteArrayOutputStream result = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int length;
///CLOVER:OFF
while ((length = s3InputStream.read(buffer)) != -1) {
result.write(buffer, 0, length);
}
///CLOVER:ON
// if key should be decrypted, do so with KMS
if (kmsDecrypt) {
DecryptRequest req = new DecryptRequest().withCiphertextBlob(ByteBuffer.wrap(result.toByteArray()));
ByteBuffer plainText = getKMS().decrypt(req).getPlaintext();
keyValue = new String(plainText.array());
} else {
keyValue = result.toString();
}
} catch (IOException e) {
LOG.error("error getting application secret.", e);
}
return keyValue.trim();
}
@Test
public void testAwsPrivateKeyStore() {
System.setProperty("athenz.aws.s3.region", "us-east-1");
System.setProperty(ATHENZ_AWS_KMS_REGION, "us-east-1");
String bucketName = "my_bucket";
String keyName = "my_key";
String expected = "my_value";
System.setProperty(ATHENZ_PROP_ZTS_BUCKET_NAME, bucketName);
System.setProperty("athenz.aws.zts.key_name", keyName);
AmazonS3 s3 = mock(AmazonS3.class);
AWSKMS kms = mock(AWSKMS.class);
S3Object s3Object = mock(S3Object.class);
Mockito.when(s3.getObject(bucketName, keyName)).thenReturn(s3Object);
InputStream is = new ByteArrayInputStream( expected.getBytes() );
S3ObjectInputStream s3ObjectInputStream = new S3ObjectInputStream(is, null);
Mockito.when(s3Object.getObjectContent()).thenReturn(s3ObjectInputStream);
ByteBuffer buffer = ByteBuffer.wrap(expected.getBytes());
DecryptResult decryptResult = mock(DecryptResult.class);
Mockito.when(kms.decrypt(Mockito.any(DecryptRequest.class))).thenReturn(decryptResult);
Mockito.when(decryptResult.getPlaintext()).thenReturn(buffer);
AwsPrivateKeyStore awsPrivateKeyStore = new AwsPrivateKeyStore(s3, kms);
String actual = awsPrivateKeyStore.getApplicationSecret(bucketName, keyName);
StringBuilder privateKeyId = new StringBuilder(keyName);
awsPrivateKeyStore.getPrivateKey("zts", "testServerHostName", privateKeyId);
Assert.assertEquals(actual, expected);
Mockito.when(s3Object.getObjectContent()).thenAnswer(invocation -> { throw new IOException("test IOException"); });
awsPrivateKeyStore.getPrivateKey("zts", "testServerHostName", privateKeyId);
System.clearProperty("athenz.aws.s3.region");
System.clearProperty(ATHENZ_AWS_KMS_REGION);
}
@Test
public void testGetApplicationSecret() {
System.setProperty("athenz.aws.s3.region", "us-east-1");
System.setProperty(ATHENZ_AWS_KMS_REGION, "us-east-1");
String bucketName = "my_bucket";
String keyName = "my_key";
String expected = "my_value";
AmazonS3 s3 = mock(AmazonS3.class);
AWSKMS kms = mock(AWSKMS.class);
S3Object s3Object = mock(S3Object.class);
Mockito.when(s3.getObject(bucketName, keyName)).thenReturn(s3Object);
InputStream is = new ByteArrayInputStream( expected.getBytes() );
S3ObjectInputStream s3ObjectInputStream = new S3ObjectInputStream(is, null);
Mockito.when(s3Object.getObjectContent()).thenReturn(s3ObjectInputStream);
ByteBuffer buffer = ByteBuffer.wrap(expected.getBytes());
DecryptResult decryptResult = mock(DecryptResult.class);
Mockito.when(kms.decrypt(Mockito.any(DecryptRequest.class))).thenReturn(decryptResult);
Mockito.when(decryptResult.getPlaintext()).thenReturn(buffer);
System.setProperty("athenz.aws.store_kms_decrypt", "true");
AwsPrivateKeyStore awsPrivateKeyStore = new AwsPrivateKeyStore();
AwsPrivateKeyStore spyAWS = Mockito.spy(awsPrivateKeyStore);
doReturn(s3).when(spyAWS).getS3();
doReturn(kms).when(spyAWS).getKMS();
String actual = spyAWS.getApplicationSecret(bucketName, keyName);
Assert.assertEquals(actual, expected);
System.clearProperty("athenz.aws.s3.region");
System.clearProperty(ATHENZ_AWS_KMS_REGION);
}
@Override
public StorageObject open(String key)
throws StorageFileNotFoundException
{
checkArgument(key != null, "key is null");
String errorMessage = "opening file bucket " + bucket + " key " + key;
GetObjectRequest req = new GetObjectRequest(bucket, key);
S3Object obj = getWithRetry(errorMessage, () -> client.getObject(req));
final long actualSize = obj.getObjectMetadata().getContentLength();
// override close to call abort instead because close skips all remaining bytes so that
// s3 client can reuse the TCP connection. but close of a fully opened file is occasionally
// used to skip remaining work (e.g. finally block when exception is thrown). Unlike openRange,
// performance impact could be significantly large.
InputStream stream = overrideCloseToAbort(obj.getObjectContent());
InputStream resumable = new ResumableInputStream(stream, (offset, closedCause) -> {
try {
S3ObjectInputStream raw = getWithRetry(errorMessage, () -> {
req.setRange(offset, actualSize - offset - 1);
return client.getObject(req);
})
.getObjectContent();
return overrideCloseToAbort(raw);
}
catch (StorageFileNotFoundException ex) {
throw new IOException(ex);
}
});
return new StorageObject(resumable, actualSize);
}
private InputStream overrideCloseToAbort(final S3ObjectInputStream raw)
{
return new FilterInputStream(raw)
{
@Override
public void close() throws IOException
{
raw.abort();
}
};
}
@Test
public void testCreateObject() throws Exception {
AmazonS3ExecutorConfig config = getConfig();
config.taskConfig.taskType = TaskType.CREATE_NEW_OBJECT;
config.taskConfig.content = "${record:value('/content')}";
AmazonS3Executor executor = new AmazonS3Executor(config);
TargetRunner runner = new TargetRunner.Builder(AmazonS3DExecutor.class, executor)
.build();
runner.runInit();
try {
runner.runWrite(ImmutableList.of(getTestRecord()));
//Make sure the prefix is empty
ObjectListing objectListing = s3client.listObjects(BUCKET_NAME, objectName);
Assert.assertEquals(1, objectListing.getObjectSummaries().size());
S3Object object = s3client.getObject(BUCKET_NAME, objectName);
S3ObjectInputStream objectContent = object.getObjectContent();
List<String> stringList = IOUtils.readLines(objectContent);
Assert.assertEquals(1, stringList.size());
Assert.assertEquals("Secret", stringList.get(0));
Assert.assertEquals(1, runner.getEventRecords().size());
assertEvent(runner.getEventRecords().get(0), objectName);
} finally {
runner.runDestroy();
}
}
@Test
public void testCopyObject() throws Exception {
String newName = UUID.randomUUID().toString();
AmazonS3ExecutorConfig config = getConfig();
config.taskConfig.taskType = TaskType.COPY_OBJECT;
config.taskConfig.copyTargetLocation = newName;
AmazonS3Executor executor = new AmazonS3Executor(config);
TargetRunner runner = new TargetRunner.Builder(AmazonS3DExecutor.class, executor)
.build();
runner.runInit();
try {
s3client.putObject(new PutObjectRequest(BUCKET_NAME, objectName, IOUtils.toInputStream("content"), new ObjectMetadata()));
runner.runWrite(ImmutableList.of(getTestRecord()));
S3Object object = s3client.getObject(BUCKET_NAME, newName);
S3ObjectInputStream objectContent = object.getObjectContent();
List<String> stringList = IOUtils.readLines(objectContent);
Assert.assertEquals(1, stringList.size());
Assert.assertEquals("content", stringList.get(0));
Assert.assertTrue(s3client.doesObjectExist(BUCKET_NAME, objectName));
Assert.assertEquals(1, runner.getEventRecords().size());
assertEvent(runner.getEventRecords().get(0), newName);
} finally {
runner.runDestroy();
}
}