类com.amazonaws.services.dynamodbv2.model.WriteRequest源码实例Demo

下面列出了怎么用com.amazonaws.services.dynamodbv2.model.WriteRequest的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * Helper method to handle unprocessed items items
 * @param session process session
 * @param keysToFlowFileMap map of flow db primary key to flow file
 * @param table dynamodb table
 * @param hashKeyName the hash key name
 * @param hashKeyValueType the hash key value
 * @param rangeKeyName the range key name
 * @param rangeKeyValueType range key value
 * @param outcome the write outcome
 */
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
        final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
    BatchWriteItemResult result = outcome.getBatchWriteItemResult();

    // Handle unprocessed items
    List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
    if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
        for ( WriteRequest request : unprocessedItems) {
            Map<String,AttributeValue> item = getRequestItem(request);
            Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
            Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);

            sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
        }
    }
}
 
源代码2 项目: localization_nifi   文件: PutDynamoDBTest.java
@Before
public void setUp() {
    outcome = new BatchWriteItemOutcome(result);
    result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
    final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
        @Override
        public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
            return outcome;
        }
    };

    putDynamoDB = new PutDynamoDB() {
        @Override
        protected DynamoDB getDynamoDB() {
            return mockDynamoDB;
        }
    };

}
 
源代码3 项目: localization_nifi   文件: DeleteDynamoDBTest.java
@Before
public void setUp() {
    outcome = new BatchWriteItemOutcome(result);
    result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
    final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
        @Override
        public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
            return outcome;
        }
    };

    deleteDynamoDB = new DeleteDynamoDB() {
        @Override
        protected DynamoDB getDynamoDB() {
            return mockDynamoDB;
        }
    };

}
 
源代码4 项目: beam   文件: DynamoDBIOTest.java
@Test
public void testWriteDataToDynamo() {
  final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);

  final PCollection<Void> output =
      pipeline
          .apply(Create.of(writeRequests))
          .apply(
              DynamoDBIO.<WriteRequest>write()
                  .withWriteRequestMapperFn(
                      (SerializableFunction<WriteRequest, KV<String, WriteRequest>>)
                          writeRequest -> KV.of(tableName, writeRequest))
                  .withRetryConfiguration(
                      DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1)))
                  .withAwsClientsProvider(
                      AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));

  final PCollection<Long> publishedResultsSize = output.apply(Count.globally());
  PAssert.that(publishedResultsSize).containsInAnyOrder(0L);

  pipeline.run().waitUntilFinish();
}
 
源代码5 项目: Doradus   文件: DynamoDBService2.java
private void commitPartial(List<WriteRequest> list) {
  	Timer t = new Timer();
Map<String, List<WriteRequest>> map = new HashMap<>();
map.put(getTenant().getName(), list);
BatchWriteItemResult result = m_client.batchWriteItem(new BatchWriteItemRequest(map));
int retry = 0;
while(result.getUnprocessedItems().size() > 0) {
	if(retry == RETRY_SLEEPS.length) throw new RuntimeException("All retries failed");
	m_logger.debug("Committing {} unprocessed items, retry: {}", result.getUnprocessedItems().size(), retry + 1);
	try {
		Thread.sleep(RETRY_SLEEPS[retry++]);
	} catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
  		result = m_client.batchWriteItem(new BatchWriteItemRequest(result.getUnprocessedItems()));
}
m_logger.debug("Committed {} writes in {}", list.size(), t);
list.clear();
  }
 
源代码6 项目: geowave   文件: DynamoDBOperations.java
public void deleteRowsFromDataIndex(
    final byte[][] dataIds,
    final short adapterId,
    final String typeName) {
  final String tableName =
      typeName + "_" + getQualifiedTableName(DataIndexUtils.DATA_ID_INDEX.getName());
  final Iterator<byte[]> dataIdIterator = Arrays.stream(dataIds).iterator();
  while (dataIdIterator.hasNext()) {
    final List<WriteRequest> deleteRequests = new ArrayList<>();
    int i = 0;
    while (dataIdIterator.hasNext() && (i < MAX_ROWS_FOR_BATCHWRITER)) {
      deleteRequests.add(
          new WriteRequest(
              new DeleteRequest(
                  Collections.singletonMap(
                      DynamoDBRow.GW_PARTITION_ID_KEY,
                      new AttributeValue().withB(ByteBuffer.wrap(dataIdIterator.next()))))));
      i++;
    }

    client.batchWriteItem(Collections.singletonMap(tableName, deleteRequests));
  }
}
 
源代码7 项目: nifi   文件: PutDynamoDBTest.java
@Before
public void setUp() {
    outcome = new BatchWriteItemOutcome(result);
    result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
    final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
        @Override
        public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
            return outcome;
        }
    };

    putDynamoDB = new PutDynamoDB() {
        @Override
        protected DynamoDB getDynamoDB() {
            return mockDynamoDB;
        }
    };

}
 
源代码8 项目: nifi   文件: DeleteDynamoDBTest.java
@Before
public void setUp() {
    outcome = new BatchWriteItemOutcome(result);
    result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
    final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
        @Override
        public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) {
            return outcome;
        }
    };

    deleteDynamoDB = new DeleteDynamoDB() {
        @Override
        protected DynamoDB getDynamoDB() {
            return mockDynamoDB;
        }
    };

}
 
源代码9 项目: nifi   文件: AbstractWriteDynamoDBProcessor.java
/**
 * Helper method to handle unprocessed items items
 * @param session process session
 * @param keysToFlowFileMap map of flow db primary key to flow file
 * @param table dynamodb table
 * @param hashKeyName the hash key name
 * @param hashKeyValueType the hash key value
 * @param rangeKeyName the range key name
 * @param rangeKeyValueType range key value
 * @param outcome the write outcome
 */
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
        final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
    BatchWriteItemResult result = outcome.getBatchWriteItemResult();

    // Handle unprocessed items
    List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
    if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
        for ( WriteRequest request : unprocessedItems) {
            Map<String,AttributeValue> item = getRequestItem(request);
            Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
            Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);

            sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
        }
    }
}
 
源代码10 项目: localization_nifi   文件: PutDynamoDBTest.java
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() {
    Map<String, List<WriteRequest>> unprocessed =
            new HashMap<String, List<WriteRequest>>();
    PutRequest put = new PutRequest();
    put.addItemEntry("hashS", new AttributeValue("h1"));
    put.addItemEntry("rangeS", new AttributeValue("r1"));
    WriteRequest write = new WriteRequest(put);
    List<WriteRequest> writes = new ArrayList<>();
    writes.add(write);
    unprocessed.put(stringHashStringRangeTableName, writes);
    result.setUnprocessedItems(unprocessed);
    final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);

    putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
    putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
    putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
    putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
    putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
    putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
    putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
    putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
    putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
    putRunner.enqueue("{\"hello\":\"world\"}".getBytes());

    putRunner.run(1);

    putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);

}
 
源代码11 项目: localization_nifi   文件: DeleteDynamoDBTest.java
@Test
public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() {
    Map<String, List<WriteRequest>> unprocessed =
            new HashMap<String, List<WriteRequest>>();
    DeleteRequest delete = new DeleteRequest();
    delete.addKeyEntry("hashS", new AttributeValue("h1"));
    delete.addKeyEntry("rangeS", new AttributeValue("r1"));
    WriteRequest write = new WriteRequest(delete);
    List<WriteRequest> writes = new ArrayList<>();
    writes.add(write);
    unprocessed.put(stringHashStringRangeTableName, writes);
    result.setUnprocessedItems(unprocessed);
    final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);

    deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
    deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
    deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
    deleteRunner.enqueue(new byte[] {});

    deleteRunner.run(1);

    deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);

}
 
@Override
public void write(K key, V value) throws IOException {
  if (value == null) {
    throw new RuntimeException("Null record encountered. At least the key columns must be "
        + "specified.");
  }

  verifyInterval();
  if (progressable != null) {
    progressable.progress();
  }

  DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value);
  BatchWriteItemResult result = client.putBatch(tableName, item.getItem(),
      permissibleWritesPerSecond - writesPerSecond, reporter, deletionMode);

  batchSize++;
  totalItemsWritten++;

  if (result != null) {
    if (result.getConsumedCapacity() != null) {
      for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
        double consumedUnits = consumedCapacity.getCapacityUnits();
        totalIOPSConsumed += consumedUnits;
      }
    }

    int unprocessedItems = 0;
    for (List<WriteRequest> requests : result.getUnprocessedItems().values()) {
      unprocessedItems += requests.size();
    }
    writesPerSecond += batchSize - unprocessedItems;
    batchSize = unprocessedItems;
  }
}
 
@Test
public void testPutBatchDeletionModeSuccessful() throws Exception {
  Map<String, AttributeValue> item = ImmutableMap.of("",
          new AttributeValue(Strings.repeat("a", (int) DEFAULT_MAX_ITEM_SIZE)));

  client.putBatch("dummyTable", item, 1, null, true);

  for (Map.Entry<String, List<WriteRequest>> entry: client.getWriteBatchMap().entrySet()) {
    for (WriteRequest req: entry.getValue()) {
      Assert.assertNotNull(req.getDeleteRequest());
      Assert.assertNull(req.getPutRequest());
    }
  }
}
 
源代码14 项目: kafka-connect-dynamodb   文件: DynamoDbSinkTask.java
private Map<String, List<WriteRequest>> toWritesByTable(Iterator<SinkRecord> recordIterator) {
    final Map<String, List<WriteRequest>> writesByTable = new HashMap<>();
    for (int count = 0; recordIterator.hasNext() && count < config.batchSize; count++) {
        final SinkRecord record = recordIterator.next();
        final WriteRequest writeRequest = new WriteRequest(toPutRequest(record));
        writesByTable.computeIfAbsent(tableName(record), k -> new ArrayList<>(config.batchSize)).add(writeRequest);
    }
    return writesByTable;
}
 
private static String makeMessage(Map<String, List<WriteRequest>> unprocessedItems) {
    final StringBuilder msg = new StringBuilder("Unprocessed writes: {");
    for (Map.Entry<String, List<WriteRequest>> e : unprocessedItems.entrySet()) {
        msg.append(" ").append(e.getKey()).append("(").append(e.getValue().size()).append(")").append(" ");
    }
    msg.append("}");
    return msg.toString();
}
 
源代码16 项目: beam   文件: DynamoDBIO.java
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
  final KV<String, WriteRequest> writeRequest =
      (KV<String, WriteRequest>) spec.getWriteItemMapperFn().apply(context.element());
  batch.add(writeRequest);
  if (batch.size() >= BATCH_SIZE) {
    flushBatch();
  }
}
 
源代码17 项目: beam   文件: DynamoDBIOTestHelper.java
static List<WriteRequest> generateWriteRequests(int numOfItem) {
  List<WriteRequest> writeRequests = new ArrayList<>();
  for (int i = 1; i <= numOfItem; i++) {
    WriteRequest writeRequest = new WriteRequest();
    writeRequest.setPutRequest(generatePutRequest("hashKeyDataStr_" + i, "1000" + i));
    writeRequests.add(writeRequest);
  }
  return writeRequests;
}
 
源代码18 项目: beam   文件: DynamoDBIOTest.java
@Test
public void testRetries() throws Throwable {
  thrown.expectMessage("Error writing to DynamoDB");

  final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);

  AmazonDynamoDB amazonDynamoDBMock = Mockito.mock(AmazonDynamoDB.class);
  Mockito.when(amazonDynamoDBMock.batchWriteItem(Mockito.any(BatchWriteItemRequest.class)))
      .thenThrow(new AmazonDynamoDBException("Service unavailable"));

  pipeline
      .apply(Create.of(writeRequests))
      .apply(
          DynamoDBIO.<WriteRequest>write()
              .withWriteRequestMapperFn(
                  (SerializableFunction<WriteRequest, KV<String, WriteRequest>>)
                      writeRequest -> KV.of(tableName, writeRequest))
              .withRetryConfiguration(
                  DynamoDBIO.RetryConfiguration.create(4, Duration.standardSeconds(10)))
              .withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDBMock)));

  try {
    pipeline.run().waitUntilFinish();
  } catch (final Pipeline.PipelineExecutionException e) {
    // check 3 retries were initiated by inspecting the log before passing on the exception
    expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 1));
    expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 2));
    expectedLogs.verifyWarn(String.format(DynamoDBIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 3));
    throw e.getCause();
  }
  fail("Pipeline is expected to fail because we were unable to write to DynamoDB.");
}
 
/**
 * Writes to DynamoDBTable using an exponential backoff. If the
 * batchWriteItem returns unprocessed items then it will exponentially
 * backoff and retry the unprocessed items.
 */
public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) {
    BatchWriteItemResult writeItemResult = null;
    List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>();
    Map<String, List<WriteRequest>> unprocessedItems = null;
    boolean interrupted = false;
    try {
        do {
            writeItemResult = client.batchWriteItem(req);
            unprocessedItems = writeItemResult.getUnprocessedItems();
            consumedCapacities
                    .addAll(writeItemResult.getConsumedCapacity());

            if (unprocessedItems != null) {
                req.setRequestItems(unprocessedItems);
                try {
                    Thread.sleep(exponentialBackoffTime);
                } catch (InterruptedException ie) {
                    interrupted = true;
                } finally {
                    exponentialBackoffTime *= 2;
                    if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) {
                        exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME;
                    }
                }
            }
        } while (unprocessedItems != null && unprocessedItems.get(tableName) != null);
        return consumedCapacities;
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
}
 
/**
 * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25
 * items or less each.
 */
public static List<BatchWriteItemRequest> splitResultIntoBatches(
        ScanResult result, String tableName) {
    List<BatchWriteItemRequest> batches = new LinkedList<BatchWriteItemRequest>();
    Iterator<Map<String, AttributeValue>> it = result.getItems().iterator();

    BatchWriteItemRequest req = new BatchWriteItemRequest()
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
    List<WriteRequest> writeRequests = new LinkedList<WriteRequest>();
    int i = 0;
    while (it.hasNext()) {
        PutRequest put = new PutRequest(it.next());
        writeRequests.add(new WriteRequest(put));

        i++;
        if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) {
            req.addRequestItemsEntry(tableName, writeRequests);
            batches.add(req);
            req = new BatchWriteItemRequest()
                    .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
            writeRequests = new LinkedList<WriteRequest>();
            i = 0;
        }
    }
    if (i > 0) {
        req.addRequestItemsEntry(tableName, writeRequests);
        batches.add(req);
    }
    return batches;
}
 
@Test
public void test_batchWriteItem_WithAllParameters() throws Exception {
  createTable();

  String TEST_ATTRIBUTE_2 = "Attribute2";
  String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2";

  Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
  List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();

  Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>();
  item1.put(TEST_ATTRIBUTE, new AttributeValue()
    .withS(TEST_ATTRIBUTE_VALUE));
  WriteRequest writeRequest1 = new WriteRequest()
    .withPutRequest(new PutRequest()
      .withItem(item1));
  writeRequests.add(writeRequest1);

  Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>();
  item2.put(TEST_ATTRIBUTE_2, new AttributeValue()
    .withS(TEST_ATTRIBUTE_VALUE_2));
  WriteRequest writeRequest2 = new WriteRequest()
    .withPutRequest(new PutRequest()
      .withItem(item2));
  writeRequests.add(writeRequest2);

  requestItems.put(TEST_TABLE_NAME, writeRequests);

  BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems);
  List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity();

  assertThat(consumedCapacities.size(), equalTo(writeRequests.size()));
}
 
源代码22 项目: geowave   文件: DynamoDBWriter.java
@Override
public void write(final GeoWaveRow[] rows) {
  final List<WriteRequest> mutations = new ArrayList<>();

  for (final GeoWaveRow row : rows) {
    mutations.addAll(rowToMutations(row, isDataIndex));
  }

  write(mutations);
}
 
源代码23 项目: geowave   文件: DynamoDBWriter.java
public void write(final WriteRequest item) {
  synchronized (batchedItems) {
    batchedItems.add(item);
    if (batchedItems.size() >= NUM_ITEMS) {
      do {
        writeBatch(ASYNC_WRITE);
      } while (batchedItems.size() >= NUM_ITEMS);
    }
  }
}
 
源代码24 项目: geowave   文件: DynamoDBWriter.java
private void retry(final Map<String, List<WriteRequest>> map) {
  for (final Entry<String, List<WriteRequest>> requests : map.entrySet()) {
    for (final WriteRequest r : requests.getValue()) {
      if (r.getPutRequest() != null) {
        client.putItem(requests.getKey(), r.getPutRequest().getItem());
      }
    }
  }
}
 
源代码25 项目: geowave   文件: DynamoDBWriter.java
private void retryAsync(final Map<String, List<WriteRequest>> map) {
  for (final Entry<String, List<WriteRequest>> requests : map.entrySet()) {
    for (final WriteRequest r : requests.getValue()) {
      if (r.getPutRequest() != null) {

        /**
         * The code is pretty similar to retry. The only difference is retryAsync uses
         * putItemAsync instead of putItem
         */
        final PutItemRequest putRequest =
            new PutItemRequest(requests.getKey(), r.getPutRequest().getItem());
        final Future<PutItemResult> future =
            client.putItemAsync(putRequest, new AsyncHandler<PutItemRequest, PutItemResult>() {

              @Override
              public void onError(final Exception exception) {
                LOGGER.warn("Putitem Async failed in Dynamo");
                futureMap.remove(putRequest);
              }

              @Override
              public void onSuccess(final PutItemRequest request, final PutItemResult result) {
                if (futureMap.remove(request) == null) {
                  LOGGER.warn("Unable to delete PutItemRequest from futuresMap ");
                }

                return;
              }
            });

        futureMap.put(putRequest, future);
      }
    }
  }
}
 
源代码26 项目: nifi   文件: PutDynamoDBTest.java
@Test
public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() {
    Map<String, List<WriteRequest>> unprocessed =
            new HashMap<String, List<WriteRequest>>();
    PutRequest put = new PutRequest();
    put.addItemEntry("hashS", new AttributeValue("h1"));
    put.addItemEntry("rangeS", new AttributeValue("r1"));
    WriteRequest write = new WriteRequest(put);
    List<WriteRequest> writes = new ArrayList<>();
    writes.add(write);
    unprocessed.put(stringHashStringRangeTableName, writes);
    result.setUnprocessedItems(unprocessed);
    final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);

    putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
    putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
    putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
    putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
    putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
    putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
    putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
    putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
    putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
    putRunner.enqueue("{\"hello\":\"world\"}".getBytes());

    putRunner.run(1);

    putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);

}
 
源代码27 项目: nifi   文件: DeleteDynamoDBTest.java
@Test
public void testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() {
    Map<String, List<WriteRequest>> unprocessed =
            new HashMap<String, List<WriteRequest>>();
    DeleteRequest delete = new DeleteRequest();
    delete.addKeyEntry("hashS", new AttributeValue("h1"));
    delete.addKeyEntry("rangeS", new AttributeValue("r1"));
    WriteRequest write = new WriteRequest(delete);
    List<WriteRequest> writes = new ArrayList<>();
    writes.add(write);
    unprocessed.put(stringHashStringRangeTableName, writes);
    result.setUnprocessedItems(unprocessed);
    final TestRunner deleteRunner = TestRunners.newTestRunner(deleteDynamoDB);

    deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
    deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName);
    deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS");
    deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
    deleteRunner.enqueue(new byte[] {});

    deleteRunner.run(1);

    deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1);

}
 
源代码28 项目: dynamodb-geo   文件: DynamoDBManager.java
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) {
	BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest();
	List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
	for (PutPointRequest putPointRequest : putPointRequests) {
		long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint());
		long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength());
		String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint());

		PutRequest putRequest = putPointRequest.getPutRequest();
		AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey));
		putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue);
		putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue());
		AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash));
		putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue);
		AttributeValue geoJsonValue = new AttributeValue().withS(geoJson);
		putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue);			
		
		WriteRequest writeRequest = new WriteRequest(putRequest);
		writeRequests.add(writeRequest);
	}
	Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
	requestItems.put(config.getTableName(), writeRequests);
	batchItemRequest.setRequestItems(requestItems);
	BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest);
	BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult);
	return batchWritePointResult;
}
 
@Override
public List<Map<String, AttributeValue>> emit(final UnmodifiableBuffer<Map<String, AttributeValue>> buffer)
    throws IOException {
    // Map of WriteRequests to records for reference
    Map<WriteRequest, Map<String, AttributeValue>> requestMap =
            new HashMap<WriteRequest, Map<String, AttributeValue>>();
    List<Map<String, AttributeValue>> unproc = new ArrayList<Map<String, AttributeValue>>();
    // Build a batch request with a record list
    List<WriteRequest> rList = new ArrayList<WriteRequest>();
    List<Map<String, AttributeValue>> resultList;
    // Amazon DynamoDB only allows one operation per item in a bulk insertion (no duplicate items)
    Set<Map<String, AttributeValue>> uniqueItems = uniqueItems(buffer.getRecords());
    for (Map<String, AttributeValue> item : uniqueItems) {
        WriteRequest wr = new WriteRequest().withPutRequest(new PutRequest().withItem(item));
        // add to the map
        requestMap.put(wr, item);
        // add to the list of requests
        rList.add(wr);
        // Max of sixteen not to exceed maximum request size
        if (rList.size() == 16) {
            resultList = performBatchRequest(rList, requestMap);
            unproc.addAll(resultList);
            rList.clear();
        }
    }
    resultList = performBatchRequest(rList, requestMap);
    unproc.addAll(resultList);
    LOG.info("Successfully emitted " + (buffer.getRecords().size() - unproc.size()) + " records into DynamoDB.");
    return unproc;
}
 
private List<Map<String, AttributeValue>> unproccessedItems(BatchWriteItemResult result,
        Map<WriteRequest, Map<String, AttributeValue>> requestMap) {
    Collection<List<WriteRequest>> items = result.getUnprocessedItems().values();
    List<Map<String, AttributeValue>> unprocessed = new ArrayList<Map<String, AttributeValue>>();
    // retrieve the unprocessed items
    for (List<WriteRequest> list : items) {
        for (WriteRequest request : list) {
            unprocessed.add(requestMap.get(request));
        }
    }

    return unprocessed;
}
 
 类方法
 同包方法