类com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult源码实例Demo

下面列出了怎么用com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * Helper method to handle unprocessed items items
 * @param session process session
 * @param keysToFlowFileMap map of flow db primary key to flow file
 * @param table dynamodb table
 * @param hashKeyName the hash key name
 * @param hashKeyValueType the hash key value
 * @param rangeKeyName the range key name
 * @param rangeKeyValueType range key value
 * @param outcome the write outcome
 */
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
        final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
    BatchWriteItemResult result = outcome.getBatchWriteItemResult();

    // Handle unprocessed items
    List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
    if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
        for ( WriteRequest request : unprocessedItems) {
            Map<String,AttributeValue> item = getRequestItem(request);
            Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
            Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);

            sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
        }
    }
}
 
源代码2 项目: Doradus   文件: DynamoDBService2.java
private void commitPartial(List<WriteRequest> list) {
  	Timer t = new Timer();
Map<String, List<WriteRequest>> map = new HashMap<>();
map.put(getTenant().getName(), list);
BatchWriteItemResult result = m_client.batchWriteItem(new BatchWriteItemRequest(map));
int retry = 0;
while(result.getUnprocessedItems().size() > 0) {
	if(retry == RETRY_SLEEPS.length) throw new RuntimeException("All retries failed");
	m_logger.debug("Committing {} unprocessed items, retry: {}", result.getUnprocessedItems().size(), retry + 1);
	try {
		Thread.sleep(RETRY_SLEEPS[retry++]);
	} catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
  		result = m_client.batchWriteItem(new BatchWriteItemRequest(result.getUnprocessedItems()));
}
m_logger.debug("Committed {} writes in {}", list.size(), t);
list.clear();
  }
 
源代码3 项目: nifi   文件: AbstractWriteDynamoDBProcessor.java
/**
 * Helper method to handle unprocessed items items
 * @param session process session
 * @param keysToFlowFileMap map of flow db primary key to flow file
 * @param table dynamodb table
 * @param hashKeyName the hash key name
 * @param hashKeyValueType the hash key value
 * @param rangeKeyName the range key name
 * @param rangeKeyValueType range key value
 * @param outcome the write outcome
 */
protected void handleUnprocessedItems(final ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String hashKeyName, final String hashKeyValueType,
        final String rangeKeyName, final String rangeKeyValueType, BatchWriteItemOutcome outcome) {
    BatchWriteItemResult result = outcome.getBatchWriteItemResult();

    // Handle unprocessed items
    List<WriteRequest> unprocessedItems = result.getUnprocessedItems().get(table);
    if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
        for ( WriteRequest request : unprocessedItems) {
            Map<String,AttributeValue> item = getRequestItem(request);
            Object hashKeyValue = getValue(item, hashKeyName, hashKeyValueType);
            Object rangeKeyValue = getValue(item, rangeKeyName, rangeKeyValueType);

            sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue);
        }
    }
}
 
@Override
public void write(K key, V value) throws IOException {
  if (value == null) {
    throw new RuntimeException("Null record encountered. At least the key columns must be "
        + "specified.");
  }

  verifyInterval();
  if (progressable != null) {
    progressable.progress();
  }

  DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value);
  BatchWriteItemResult result = client.putBatch(tableName, item.getItem(),
      permissibleWritesPerSecond - writesPerSecond, reporter, deletionMode);

  batchSize++;
  totalItemsWritten++;

  if (result != null) {
    if (result.getConsumedCapacity() != null) {
      for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
        double consumedUnits = consumedCapacity.getCapacityUnits();
        totalIOPSConsumed += consumedUnits;
      }
    }

    int unprocessedItems = 0;
    for (List<WriteRequest> requests : result.getUnprocessedItems().values()) {
      unprocessedItems += requests.size();
    }
    writesPerSecond += batchSize - unprocessedItems;
    batchSize = unprocessedItems;
  }
}
 
/**
 * Writes to DynamoDBTable using an exponential backoff. If the
 * batchWriteItem returns unprocessed items then it will exponentially
 * backoff and retry the unprocessed items.
 */
public List<ConsumedCapacity> runWithBackoff(BatchWriteItemRequest req) {
    BatchWriteItemResult writeItemResult = null;
    List<ConsumedCapacity> consumedCapacities = new LinkedList<ConsumedCapacity>();
    Map<String, List<WriteRequest>> unprocessedItems = null;
    boolean interrupted = false;
    try {
        do {
            writeItemResult = client.batchWriteItem(req);
            unprocessedItems = writeItemResult.getUnprocessedItems();
            consumedCapacities
                    .addAll(writeItemResult.getConsumedCapacity());

            if (unprocessedItems != null) {
                req.setRequestItems(unprocessedItems);
                try {
                    Thread.sleep(exponentialBackoffTime);
                } catch (InterruptedException ie) {
                    interrupted = true;
                } finally {
                    exponentialBackoffTime *= 2;
                    if (exponentialBackoffTime > BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME) {
                        exponentialBackoffTime = BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME;
                    }
                }
            }
        } while (unprocessedItems != null && unprocessedItems.get(tableName) != null);
        return consumedCapacities;
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
}
 
@Test
public void test_batchWriteItem_WithAllParameters() throws Exception {
  createTable();

  String TEST_ATTRIBUTE_2 = "Attribute2";
  String TEST_ATTRIBUTE_VALUE_2 = "AttributeValue2";

  Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
  List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();

  Map<String, AttributeValue> item1 = new HashMap<String, AttributeValue>();
  item1.put(TEST_ATTRIBUTE, new AttributeValue()
    .withS(TEST_ATTRIBUTE_VALUE));
  WriteRequest writeRequest1 = new WriteRequest()
    .withPutRequest(new PutRequest()
      .withItem(item1));
  writeRequests.add(writeRequest1);

  Map<String, AttributeValue> item2 = new HashMap<String, AttributeValue>();
  item2.put(TEST_ATTRIBUTE_2, new AttributeValue()
    .withS(TEST_ATTRIBUTE_VALUE_2));
  WriteRequest writeRequest2 = new WriteRequest()
    .withPutRequest(new PutRequest()
      .withItem(item2));
  writeRequests.add(writeRequest2);

  requestItems.put(TEST_TABLE_NAME, writeRequests);

  BatchWriteItemResult result = dynamoDb.batchWriteItem(requestItems);
  List<ConsumedCapacity> consumedCapacities = result.getConsumedCapacity();

  assertThat(consumedCapacities.size(), equalTo(writeRequests.size()));
}
 
源代码7 项目: dynamodb-geo   文件: DynamoDBManager.java
public BatchWritePointResult batchWritePoints(List<PutPointRequest> putPointRequests) {
	BatchWriteItemRequest batchItemRequest = new BatchWriteItemRequest();
	List<WriteRequest> writeRequests = new ArrayList<WriteRequest>();
	for (PutPointRequest putPointRequest : putPointRequests) {
		long geohash = S2Manager.generateGeohash(putPointRequest.getGeoPoint());
		long hashKey = S2Manager.generateHashKey(geohash, config.getHashKeyLength());
		String geoJson = GeoJsonMapper.stringFromGeoObject(putPointRequest.getGeoPoint());

		PutRequest putRequest = putPointRequest.getPutRequest();
		AttributeValue hashKeyValue = new AttributeValue().withN(String.valueOf(hashKey));
		putRequest.getItem().put(config.getHashKeyAttributeName(), hashKeyValue);
		putRequest.getItem().put(config.getRangeKeyAttributeName(), putPointRequest.getRangeKeyValue());
		AttributeValue geohashValue = new AttributeValue().withN(Long.toString(geohash));
		putRequest.getItem().put(config.getGeohashAttributeName(), geohashValue);
		AttributeValue geoJsonValue = new AttributeValue().withS(geoJson);
		putRequest.getItem().put(config.getGeoJsonAttributeName(), geoJsonValue);			
		
		WriteRequest writeRequest = new WriteRequest(putRequest);
		writeRequests.add(writeRequest);
	}
	Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
	requestItems.put(config.getTableName(), writeRequests);
	batchItemRequest.setRequestItems(requestItems);
	BatchWriteItemResult batchWriteItemResult = config.getDynamoDBClient().batchWriteItem(batchItemRequest);
	BatchWritePointResult batchWritePointResult = new BatchWritePointResult(batchWriteItemResult);
	return batchWritePointResult;
}
 
private List<Map<String, AttributeValue>> unproccessedItems(BatchWriteItemResult result,
        Map<WriteRequest, Map<String, AttributeValue>> requestMap) {
    Collection<List<WriteRequest>> items = result.getUnprocessedItems().values();
    List<Map<String, AttributeValue>> unprocessed = new ArrayList<Map<String, AttributeValue>>();
    // retrieve the unprocessed items
    for (List<WriteRequest> list : items) {
        for (WriteRequest request : list) {
            unprocessed.add(requestMap.get(request));
        }
    }

    return unprocessed;
}
 
源代码9 项目: podyn   文件: PostgresDynamoDB.java
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest batchWriteItemRequest) {
	throw new UnsupportedOperationException();
}
 
源代码10 项目: podyn   文件: PostgresDynamoDB.java
@Override
public BatchWriteItemResult batchWriteItem(Map<String, List<WriteRequest>> requestItems) {
	throw new UnsupportedOperationException();
}
 
源代码11 项目: emr-dynamodb-connector   文件: DynamoDBClient.java
public BatchWriteItemResult putBatch(String tableName, Map<String, AttributeValue> item,
    long maxItemsPerBatch, Reporter reporter, boolean deletionMode)
    throws UnsupportedEncodingException {

  int itemSizeBytes = DynamoDBUtil.getItemSizeBytes(item);
  if (itemSizeBytes > maxItemByteSize) {
    throw new RuntimeException("Cannot pass items with size greater than " + maxItemByteSize
        + ". Item with size of " + itemSizeBytes + " was given.");
  }
  maxItemsPerBatch = DynamoDBUtil.getBoundedBatchLimit(config, maxItemsPerBatch);
  BatchWriteItemResult result = null;
  if (writeBatchMap.containsKey(tableName)) {

    boolean writeRequestsForTableAtLimit =
        writeBatchMap.get(tableName).size() >= maxItemsPerBatch;

    boolean totalSizeOfWriteBatchesOverLimit =
        writeBatchMapSizeBytes + itemSizeBytes > maxBatchSize;

    if (writeRequestsForTableAtLimit || totalSizeOfWriteBatchesOverLimit) {
        result = writeBatch(reporter, itemSizeBytes);
    }
  }
  // writeBatchMap could be cleared from writeBatch()
  List<WriteRequest> writeBatchList;
  if (!writeBatchMap.containsKey(tableName)) {
    writeBatchList = new ArrayList<>((int) maxItemsPerBatch);
    writeBatchMap.put(tableName, writeBatchList);
  } else {
    writeBatchList = writeBatchMap.get(tableName);
  }

  log.debug("BatchWriteItem deletionMode " + deletionMode);

  if (deletionMode) {
    writeBatchList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(item)));
  } else {
    writeBatchList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item)));
  }

  writeBatchMapSizeBytes += itemSizeBytes;

  return result;
}
 
源代码12 项目: aws-doc-sdk-examples   文件: LowLevelBatchWrite.java
private static void writeMultipleItemsBatchWrite() {
    try {

        // Create a map for the requests in the batch
        Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();

        // Create a PutRequest for a new Forum item
        Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
        forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
        forumItem.put("Threads", new AttributeValue().withN("0"));

        List<WriteRequest> forumList = new ArrayList<WriteRequest>();
        forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
        requestItems.put(table1Name, forumList);

        // Create a PutRequest for a new Thread item
        Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
        threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
        threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
        threadItem.put("Message", new AttributeValue().withS("ElastiCache Thread 1 message"));
        threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));

        List<WriteRequest> threadList = new ArrayList<WriteRequest>();
        threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));

        // Create a DeleteRequest for a Thread item
        Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>();
        threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
        threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));

        threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
        requestItems.put(table2Name, threadList);

        BatchWriteItemResult result;
        BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

        do {
            System.out.println("Making the request.");

            batchWriteItemRequest.withRequestItems(requestItems);
            result = client.batchWriteItem(batchWriteItemRequest);

            // Print consumed capacity units
            for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
                String tableName = consumedCapacity.getTableName();
                Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
                System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
            }

            // Check for unprocessed keys which could happen if you exceed
            // provisioned throughput
            System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
            requestItems = result.getUnprocessedItems();
        } while (result.getUnprocessedItems().size() > 0);

    }
    catch (AmazonServiceException ase) {
        System.err.println("Failed to retrieve items: ");
        ase.printStackTrace(System.err);
    }

}
 
public BatchWriteItemResult batchWriteItem(final BatchWriteItemRequest batchRequest) throws BackendException {
    int count = 0;
    for (Entry<String, List<WriteRequest>> entry : batchRequest.getRequestItems().entrySet()) {
        final String tableName = entry.getKey();
        final List<WriteRequest> requests = entry.getValue();
        count += requests.size();
        if (count > BATCH_WRITE_MAX_NUMBER_OF_ITEMS) {
            throw new IllegalArgumentException("cant have more than 25 requests in a batchwrite");
        }
        for (final WriteRequest request : requests) {
            if ((request.getPutRequest() != null) == (request.getDeleteRequest() != null)) {
                throw new IllegalArgumentException("Exactly one of PutRequest or DeleteRequest must be set in each WriteRequest in a batch write operation");
            }
            final int wcu;
            final String apiName;
            if (request.getPutRequest() != null) {
                apiName = PUT_ITEM;
                final int bytes = calculateItemSizeInBytes(request.getPutRequest().getItem());
                wcu = computeWcu(bytes);
            } else { //deleterequest
                apiName = DELETE_ITEM;
                wcu = estimateCapacityUnits(apiName, tableName);
            }
            timedWriteThrottle(apiName, tableName, wcu);
        }
    }

    BatchWriteItemResult result;
    setUserAgent(batchRequest);
    final Timer.Context apiTimerContext = getTimerContext(BATCH_WRITE_ITEM, null /*tableName*/);
    try {
        result = client.batchWriteItem(batchRequest);
    } catch (Exception e) {
        throw processDynamoDbApiException(e, BATCH_WRITE_ITEM, null /*tableName*/);
    } finally {
        apiTimerContext.stop();
    }
    if (result.getConsumedCapacity() != null) {
        for (ConsumedCapacity ccu : result.getConsumedCapacity()) {
            meterConsumedCapacity(BATCH_WRITE_ITEM, ccu);
        }
    }
    return result;
}
 
源代码14 项目: geowave   文件: DynamoDBWriter.java
private void writeBatch(final boolean async) {
  final List<WriteRequest> batch;

  if (batchedItems.size() <= NUM_ITEMS) {
    batch = batchedItems;
  } else {
    batch = batchedItems.subList(0, NUM_ITEMS + 1);
  }
  final Map<String, List<WriteRequest>> writes = new HashMap<>();
  writes.put(tableName, new ArrayList<>(batch));
  if (async) {

    /**
     * To support asynchronous batch write a async handler is created Callbacks are provided for
     * success and error. As there might be unprocessed items on failure, they are retried
     * asynchronously Keep track of futures, so that they can be waited on during "flush"
     */
    final BatchWriteItemRequest batchRequest = new BatchWriteItemRequest(writes);
    final Future<BatchWriteItemResult> future =
        client.batchWriteItemAsync(
            batchRequest,
            new AsyncHandler<BatchWriteItemRequest, BatchWriteItemResult>() {

              @Override
              public void onError(final Exception exception) {
                LOGGER.warn(
                    "Unable to get response from Dynamo-Async Write " + exception.toString());
                futureMap.remove(batchRequest);
                return;
              }

              @Override
              public void onSuccess(
                  final BatchWriteItemRequest request,
                  final BatchWriteItemResult result) {
                retryAsync(result.getUnprocessedItems());
                if (futureMap.remove(request) == null) {
                  LOGGER.warn(" Unable to delete BatchWriteRequest from futuresMap ");
                }
              }
            });

    futureMap.put(batchRequest, future);
  } else {
    final BatchWriteItemResult response =
        client.batchWriteItem(new BatchWriteItemRequest(writes));
    retry(response.getUnprocessedItems());
  }

  batch.clear();
}
 
源代码15 项目: aws-dynamodb-examples   文件: LowLevelBatchWrite.java
private static void writeMultipleItemsBatchWrite() {
    try {                    
        
       // Create a map for the requests in the batch
       Map<String, List<WriteRequest>> requestItems = new HashMap<String, List<WriteRequest>>();
        
       // Create a PutRequest for a new Forum item
       Map<String, AttributeValue> forumItem = new HashMap<String, AttributeValue>();
       forumItem.put("Name", new AttributeValue().withS("Amazon RDS"));
       forumItem.put("Threads", new AttributeValue().withN("0"));
    
       List<WriteRequest> forumList = new ArrayList<WriteRequest>();
       forumList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(forumItem)));
       requestItems.put(table1Name, forumList);
       
       // Create a PutRequest for a new Thread item
       Map<String, AttributeValue> threadItem = new HashMap<String, AttributeValue>();
       threadItem.put("ForumName", new AttributeValue().withS("Amazon RDS"));
       threadItem.put("Subject", new AttributeValue().withS("Amazon RDS Thread 1"));
       threadItem.put("Message", new AttributeValue().withS("ElasticCache Thread 1 message"));
       threadItem.put("KeywordTags", new AttributeValue().withSS(Arrays.asList("cache", "in-memory")));
    
       List<WriteRequest> threadList = new ArrayList<WriteRequest>();
       threadList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(threadItem)));
       
       // Create a DeleteRequest for a Thread item
       Map<String, AttributeValue> threadDeleteKey = new HashMap<String, AttributeValue>(); 
       threadDeleteKey.put("ForumName", new AttributeValue().withS("Amazon S3"));
       threadDeleteKey.put("Subject", new AttributeValue().withS("S3 Thread 100"));
       
       threadList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(threadDeleteKey)));
       requestItems.put(table2Name, threadList);
       
       BatchWriteItemResult result;
       BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
           .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
       
       do {
           System.out.println("Making the request.");
                           
           batchWriteItemRequest.withRequestItems(requestItems);
           result = client.batchWriteItem(batchWriteItemRequest);
           
           // Print consumed capacity units
           for(ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
                String tableName = consumedCapacity.getTableName();
                Double consumedCapacityUnits = consumedCapacity.getCapacityUnits();
                System.out.println("Consumed capacity units for table " + tableName + ": " + consumedCapacityUnits);
           }
           
           // Check for unprocessed keys which could happen if you exceed provisioned throughput
           System.out.println("Unprocessed Put and Delete requests: \n" + result.getUnprocessedItems());
           requestItems = result.getUnprocessedItems();
       } while (result.getUnprocessedItems().size() > 0);
                    
    }  catch (AmazonServiceException ase) {
        System.err.println("Failed to retrieve items: ");
        ase.printStackTrace(System.err);
    }  

}
 
源代码16 项目: dynamodb-geo   文件: BatchWritePointResult.java
public BatchWritePointResult(BatchWriteItemResult batchWriteItemResult) {
	this.batchWriteItemResult = batchWriteItemResult;
}
 
源代码17 项目: dynamodb-geo   文件: BatchWritePointResult.java
public BatchWriteItemResult getBatchWriteItemResult() {
	return batchWriteItemResult;
}
 
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest arg0)
        throws AmazonServiceException, AmazonClientException {
    throw new UnsupportedOperationException("Use the underlying client instance instead");
}
 
@Override
public BatchWriteItemResult batchWriteItem(
        Map<String, List<WriteRequest>> requestItems)
        throws AmazonServiceException, AmazonClientException {
    throw new UnsupportedOperationException("Use the underlying client instance instead");
}
 
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest request) throws AmazonServiceException, AmazonClientException {
    return getBackend().batchWriteItem(request);
}
 
@Override
public BatchWriteItemResult batchWriteItem(Map<String, List<WriteRequest>> requestItems) throws AmazonServiceException, AmazonClientException {
    return getBackend().batchWriteItem(requestItems);
}
 
@Override
public BatchWriteItemResult batchWriteItem(BatchWriteItemRequest arg0)
        throws AmazonServiceException, AmazonClientException {
    throw new UnsupportedOperationException("Use the underlying client instance instead");
}
 
@Override
public BatchWriteItemResult batchWriteItem(
        Map<String, List<WriteRequest>> requestItems)
        throws AmazonServiceException, AmazonClientException {
    throw new UnsupportedOperationException("Use the underlying client instance instead");
}
 
 类方法
 同包方法