类com.amazonaws.services.dynamodbv2.model.ScanRequest源码实例Demo

下面列出了怎么用com.amazonaws.services.dynamodbv2.model.ScanRequest的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: emr-dynamodb-connector   文件: DynamoDBClient.java
public RetryResult<ScanResult> scanTable(
    String tableName, DynamoDBQueryFilter dynamoDBQueryFilter, Integer segment, Integer
    totalSegments, Map<String, AttributeValue> exclusiveStartKey, long limit, Reporter reporter) {
  final ScanRequest scanRequest = new ScanRequest(tableName)
      .withExclusiveStartKey(exclusiveStartKey)
      .withLimit(Ints.checkedCast(limit))
      .withSegment(segment)
      .withTotalSegments(totalSegments)
      .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);

  if (dynamoDBQueryFilter != null) {
    Map<String, Condition> scanFilter = dynamoDBQueryFilter.getScanFilter();
    if (!scanFilter.isEmpty()) {
      scanRequest.setScanFilter(scanFilter);
    }
  }

  RetryResult<ScanResult> retryResult = getRetryDriver().runWithRetry(new Callable<ScanResult>() {
    @Override
    public ScanResult call() {
      log.debug("Executing DynamoDB scan: " + scanRequest);
      return dynamoDB.scan(scanRequest);
    }
  }, reporter, PrintCounter.DynamoDBReadThrottle);
  return retryResult;
}
 
源代码2 项目: aws-doc-sdk-examples   文件: LowLevelScan.java
private static void findProductsForPriceLessThanZero() {

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>();
        expressionAttributeValues.put(":pr", new AttributeValue().withN("100"));

        ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withFilterExpression("Price < :pr")
            .withExpressionAttributeValues(expressionAttributeValues)
            .withProjectionExpression("Id, Title, ProductCategory, Price");

        ScanResult result = client.scan(scanRequest);

        System.out.println("Scan of " + tableName + " for items with a price less than 100.");
        for (Map<String, AttributeValue> item : result.getItems()) {
            System.out.println("");
            printItem(item);
        }
    }
 
源代码3 项目: beam   文件: DynamoDBIO.java
@Override
public PCollection<T> expand(PBegin input) {
  checkArgument((getScanRequestFn() != null), "withScanRequestFn() is required");
  checkArgument((getAwsClientsProvider() != null), "withAwsClientsProvider() is required");
  ScanRequest scanRequest = getScanRequestFn().apply(null);
  checkArgument(
      (scanRequest.getTotalSegments() != null && scanRequest.getTotalSegments() > 0),
      "TotalSegments is required with withScanRequestFn() and greater zero");

  PCollection<Read<T>> splits =
      (PCollection<Read<T>>)
          input.apply("Create", Create.of(this)).apply("Split", ParDo.of(new SplitFn()));
  splits.setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}));

  PCollection<T> output =
      (PCollection<T>)
          splits
              .apply("Reshuffle", Reshuffle.viaRandomKey())
              .apply("Read", ParDo.of(new ReadFn()));
  output.setCoder(getCoder());
  return output;
}
 
源代码4 项目: beam   文件: DynamoDBIOTest.java
@Test
public void testMissingTotalSegments() {
  thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
  pipeline.apply(
      DynamoDBIO.read()
          .withScanRequestFn(
              (SerializableFunction<Void, ScanRequest>) input -> new ScanRequest(tableName))
          .withAwsClientsProvider(
              AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
  try {
    pipeline.run().waitUntilFinish();
    fail("TotalSegments is required with withScanRequestFn()");
  } catch (IllegalArgumentException ex) {
    assertEquals("TotalSegments is required with withScanRequestFn()", ex.getMessage());
  }
}
 
源代码5 项目: beam   文件: DynamoDBIOTest.java
@Test
public void testNegativeTotalSegments() {
  thrown.expectMessage("TotalSegments is required with withScanRequestFn() and greater zero");
  pipeline.apply(
      DynamoDBIO.read()
          .withScanRequestFn(
              (SerializableFunction<Void, ScanRequest>)
                  input -> new ScanRequest(tableName).withTotalSegments(-1))
          .withAwsClientsProvider(
              AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
  try {
    pipeline.run().waitUntilFinish();
    fail("withTotalSegments() is expected and greater than zero");
  } catch (IllegalArgumentException ex) {
    assertEquals(
        "TotalSegments is required with withScanRequestFn() and greater zero", ex.getMessage());
  }
}
 
/**
 * Returns the list of usernames stored in the identity table.
 * 
 * @return list of existing usernames in DynamoDB table
 */
public List<String> listUsers() {
    List<String> users = new ArrayList<String>(1000);

    ScanResult result = ddb.scan(new ScanRequest().withTableName(USER_TABLE).withLimit(1000));

    for (Map<String, AttributeValue> item : result.getItems()) {
        String s = "";

        for (Entry<String, AttributeValue> entry : item.entrySet()) {
            s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
        }

        users.add(s);
    }

    return users;
}
 
/**
 * @return the list of device ID (UID) stored in the identity table.
 */
public List<String> listDevices() {
    List<String> devices = new ArrayList<String>(1000);

    ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));

    for (Map<String, AttributeValue> item : result.getItems()) {
        String s = "";

        for (Entry<String, AttributeValue> entry : item.entrySet()) {
            s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
        }

        devices.add(s);
    }

    return devices;
}
 
public ScanResult scan(final ScanRequest request, final int permitsToConsume) throws BackendException {
    setUserAgent(request);
    ScanResult result;
    timedReadThrottle(SCAN, request.getTableName(), permitsToConsume);

    final Timer.Context apiTimerContext = getTimerContext(SCAN, request.getTableName());
    try {
        result = client.scan(request);
    } catch (Exception e) {
        throw processDynamoDbApiException(e, SCAN, request.getTableName());
    } finally {
        apiTimerContext.stop();
    }
    meterConsumedCapacity(SCAN, result.getConsumedCapacity());
    measureItemCount(SCAN, request.getTableName(), result.getCount());
    return result;
}
 
@Override
public KeyIterator getKeys(final SliceQuery query, final StoreTransaction txh) throws BackendException {
    log.debug("Entering getKeys table:{} query:{} txh:{}", getTableName(), encodeForLog(query), txh);

    final ScanRequest scanRequest = super.createScanRequest();

    final Scanner scanner;
    if (client.isEnableParallelScan()) {
        scanner = client.getDelegate().getParallelScanCompletionService(scanRequest);
    } else {
        scanner = new SequentialScanner(client.getDelegate(), scanRequest);
    }
    // Because SINGLE records cannot be split across scan results, we can use the same interpreter for both
    // sequential and parallel scans.
    final KeyIterator result = new ScanBackedKeyIterator(scanner, new SingleRowScanInterpreter(query));

    log.debug("Exiting getKeys table:{} query:{} txh:{} returning:{}", getTableName(), encodeForLog(query), txh, result);
    return result;
}
 
@Override
public KeyIterator getKeys(final SliceQuery query, final StoreTransaction txh) throws BackendException {
    log.debug("Entering getKeys table:{} query:{} txh:{}", getTableName(), encodeForLog(query), txh);
    final Expression filterExpression = new FilterExpressionBuilder().rangeKey()
                                                                     .range(query)
                                                                     .build();

    final ScanRequest scanRequest = super.createScanRequest()
             .withFilterExpression(filterExpression.getConditionExpression())
             .withExpressionAttributeValues(filterExpression.getAttributeValues());

    final Scanner scanner;
    final ScanContextInterpreter interpreter;
    if (client.isEnableParallelScan()) {
        scanner = client.getDelegate().getParallelScanCompletionService(scanRequest);
        interpreter = new MultiRowParallelScanInterpreter(this, query);
    } else {
        scanner = new SequentialScanner(client.getDelegate(), scanRequest);
        interpreter = new MultiRowSequentialScanInterpreter(this, query);
    }

    final KeyIterator result = new ScanBackedKeyIterator(scanner, interpreter);
    log.debug("Exiting getKeys table:{} query:{} txh:{} returning:{}", getTableName(), encodeForLog(query), txh, result);
    return result;
}
 
/**
 * This method gets a segmentedScanResult and submits the next scan request for that segment, if there is one.
 * @return the next available ScanResult
 * @throws ExecutionException if one of the segment pages threw while executing
 * @throws InterruptedException if one of the segment pages was interrupted while executing.
 */
private ScanContext grab() throws ExecutionException, InterruptedException {
    final Future<ScanContext> ret = exec.take();

    final ScanRequest originalRequest = ret.get().getScanRequest();
    final int segment = originalRequest.getSegment();

    final ScanSegmentWorker sw = workers[segment];

    if (sw.hasNext()) {
        currentFutures[segment] = exec.submit(sw);
    } else {
        finishSegment(segment);
        currentFutures[segment] = null;
    }

    //FYI, This might block if nothing is available.
    return ret.get();
}
 
/**
 * Begins to pipe the log results by parallel scanning the table and the
 * consumer writing the results.
 */
public void pipe(final AbstractLogConsumer consumer)
        throws ExecutionException, InterruptedException {
    final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit,
            client);

    final ScanRequest request = new ScanRequest().withTableName(tableName)
            .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
            .withLimit(BootstrapConstants.SCAN_LIMIT)
            .withConsistentRead(consistentScan);

    final ParallelScanExecutor scanService = scanner
            .getParallelScanCompletionService(request, numSegments,
                    threadPool, section, totalSections);

    while (!scanService.finished()) {
        SegmentedScanResult result = scanService.grab();
        consumer.writeResult(result);
    }

    shutdown(true);
    consumer.shutdown(true);
}
 
/**
 * This function copies a scan request for the number of segments and then
 * adds those workers to the executor service to begin scanning.
 * 
 * @param totalSections
 * @param section
 * 
 * @return <ParallelScanExecutor> the parallel scan executor to grab results
 *         when a segment is finished.
 */
public ParallelScanExecutor getParallelScanCompletionService(
        ScanRequest initialRequest, int numSegments, Executor executor,
        int section, int totalSections) {
    final int segments = Math.max(1, numSegments);
    final ParallelScanExecutor completion = new ParallelScanExecutor(
            executor, segments);

    int sectionSize = segments / totalSections;
    int start = sectionSize * section;
    int end = start + sectionSize;
    if (section + 1 == totalSections) {
        end = segments;
    }

    for (int segment = start; segment < end; segment++) {
        ScanRequest scanSegment = copyScanRequest(initialRequest)
                .withTotalSegments(segments).withSegment(segment);
        completion.addWorker(new ScanSegmentWorker(this.client,
                this.rateLimiter, scanSegment), segment);
    }

    return completion;
}
 
private void dumpTables() {
    for (String table : client.listTables().getTableNames()) {
        ScanResult scanResult;
        Map<String, AttributeValue> lastKey = null;
        do {
            scanResult = client.scan(new ScanRequest().withTableName(table).withExclusiveStartKey(lastKey));
            lastKey = scanResult.getLastEvaluatedKey();
            for (Map<String, AttributeValue> map : scanResult.getItems()) {
                for (Map.Entry<String, AttributeValue> item : map.entrySet()) {
                    System.out.print("item.put(\"");
                    System.out.print(item.getKey());
                    System.out.print("\", b642Av(\"");
                    System.out.print(Base64.encodeAsString(AttributeValueMarshaller.marshall(item.getValue()).array()));
                    System.out.println("\"));");
                }
                System.out.print("ddb.putItem(new PutItemRequest(\"");
                System.out.print(table);
                System.out.println("\", item));");
                System.out.println("item.clear();");
                System.out.println();
            }
        } while (lastKey != null);

    }
}
 
源代码15 项目: geowave   文件: DynamoDBOperations.java
private ScanResult getResults(
    final String tableName,
    final short adapterId,
    final List<GeoWaveRow> resultList,
    final Map<String, AttributeValue> lastEvaluatedKey) {
  final ScanRequest request = new ScanRequest(tableName);
  if ((lastEvaluatedKey != null) && !lastEvaluatedKey.isEmpty()) {
    request.setExclusiveStartKey(lastEvaluatedKey);
  }
  final ScanResult result = client.scan(request);
  result.getItems().forEach(objMap -> {
    final byte[] dataId = objMap.get(DynamoDBRow.GW_PARTITION_ID_KEY).getB().array();
    final AttributeValue valueAttr = objMap.get(DynamoDBRow.GW_VALUE_KEY);
    final byte[] value = valueAttr == null ? null : valueAttr.getB().array();
    final AttributeValue visAttr = objMap.get(DynamoDBRow.GW_VISIBILITY_KEY);
    final byte[] vis = visAttr == null ? new byte[0] : visAttr.getB().array();

    resultList.add(DataIndexUtils.deserializeDataIndexRow(dataId, adapterId, value, vis));
  });
  return result;
}
 
/**
 * @return the list of device ID (UID) stored in the identity table.
 */
public List<String> listDevices() {
    List<String> devices = new ArrayList<String>(1000);

    ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));

    for (Map<String, AttributeValue> item : result.getItems()) {
        String s = "";

        for (Entry<String, AttributeValue> entry : item.entrySet()) {
            s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
        }

        devices.add(s);
    }

    return devices;
}
 
/**
 * Returns the list of usernames stored in the identity table.
 * 
 * @return list of existing usernames in DynamoDB table
 */
public List<String> listUsers() {
    List<String> users = new ArrayList<String>(1000);

    ScanResult result = ddb.scan(new ScanRequest().withTableName(USER_TABLE).withLimit(1000));

    for (Map<String, AttributeValue> item : result.getItems()) {
        String s = "";

        for (Entry<String, AttributeValue> entry : item.entrySet()) {
            s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
        }

        users.add(s);
    }

    return users;
}
 
/**
 * @return the list of device ID (UID) stored in the identity table.
 */
public List<String> listDevices() {
    List<String> devices = new ArrayList<String>(1000);

    ScanResult result = ddb.scan(new ScanRequest().withTableName(DEVICE_TABLE).withLimit(1000));

    for (Map<String, AttributeValue> item : result.getItems()) {
        String s = "";

        for (Entry<String, AttributeValue> entry : item.entrySet()) {
            s += " ** " + entry.getKey() + " = " + entry.getValue().getS();
        }

        devices.add(s);
    }

    return devices;
}
 
源代码19 项目: aws-dynamodb-examples   文件: LowLevelScan.java
private static void findProductsForPriceLessThanZero() {
    
    Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>();
    expressionAttributeValues.put(":pr", new AttributeValue().withN("100"));
    
    ScanRequest scanRequest = new ScanRequest()
        .withTableName(tableName)
        .withFilterExpression("Price < :pr")
        .withExpressionAttributeValues(expressionAttributeValues)
        .withProjectionExpression("Id, Title, ProductCategory, Price");

    ScanResult result = client.scan(scanRequest);
    
    System.out.println("Scan of " + tableName + " for items with a price less than 100.");
    for (Map<String, AttributeValue> item : result.getItems()) {
        System.out.println("");
        printItem(item);
    }
}
 
/**
 * Derives an Arrow {@link Schema} for the given table by performing a small table scan and mapping the returned
 * attribute values' types to Arrow types. If the table is empty, only attributes found in the table's metadata
 * are added to the return schema.
 *
 * @param tableName the table to derive a schema for
 * @param invoker the ThrottlingInvoker to call DDB with
 * @param ddbClient the DDB client to use
 * @return the table's derived schema
 */
public static Schema peekTableForSchema(String tableName, ThrottlingInvoker invoker, AmazonDynamoDB ddbClient)
        throws TimeoutException
{
    ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withLimit(SCHEMA_INFERENCE_NUM_RECORDS);
    ScanResult scanResult = invoker.invoke(() -> ddbClient.scan(scanRequest));
    List<Map<String, AttributeValue>> items = scanResult.getItems();
    Set<String> discoveredColumns = new HashSet<>();
    SchemaBuilder schemaBuilder = new SchemaBuilder();
    if (!items.isEmpty()) {
        for (Map<String, AttributeValue> item : items) {
            for (Map.Entry<String, AttributeValue> column : item.entrySet()) {
                if (!discoveredColumns.contains(column.getKey())) {
                    Field field = DDBTypeUtils.inferArrowField(column.getKey(), ItemUtils.toSimpleValue(column.getValue()));
                    if (field != null) {
                        schemaBuilder.addField(field);
                        discoveredColumns.add(column.getKey());
                    }
                }
            }
        }
    }
    else {
        // there's no items, so use any attributes defined in the table metadata
        DynamoDBTable table = getTable(tableName, invoker, ddbClient);
        for (AttributeDefinition attributeDefinition : table.getKnownAttributeDefinitions()) {
            schemaBuilder.addField(DDBTypeUtils.getArrowFieldFromDDBType(attributeDefinition.getAttributeName(), attributeDefinition.getAttributeType()));
        }
    }
    return schemaBuilder.build();
}
 
@Override
public final Response scan(final Request request) {

    final String projectionExpression = request.getPartitionKey() + ", " + request.getSortKey();
    ScanRequest scanRequest = new ScanRequest()
            .withTableName(request.getTableName())
            .withProjectionExpression(projectionExpression);

    StringBuilder filterExpressionBuilder;
    Map<String, AttributeValue> expressionAttributeValues;
    if (request.getFilterData() != null) {
        filterExpressionBuilder = new StringBuilder();
        expressionAttributeValues = new HashMap<>();
        processFilterData(request, filterExpressionBuilder, expressionAttributeValues);
        // Add to ScanRequest.
        scanRequest.withFilterExpression(filterExpressionBuilder.toString());
        scanRequest.withExpressionAttributeValues(expressionAttributeValues);
    }



    final ScanResult scanResult = dynamoDBClient.scan(scanRequest);

    final StringBuilder response = new StringBuilder();
    response.append("PK of items read with scan (V2): ");
    for (Map<String, AttributeValue> item : scanResult.getItems()) {
        response.append(prepareKeyStr(item, request));
    }

    return new Response(response.toString(), null);
}
 
源代码22 项目: chatbot   文件: DBUtil.java
private static int getCharactersCount() {
    ScanRequest scanRequest = new ScanRequest()
            .withTableName("starwars");
    ScanResult result = getClient().scan(scanRequest);
    System.out.println("Total number of characters found: " + result.getCount());

    // scanning only returns 1MB of data
    // need to aggressively scan and return an accurate count

    return result.getCount();
}
 
源代码23 项目: strongbox   文件: GenericDynamoDB.java
private Stream<Entry> scan(SecretEventStream.Filter<Entry> filter, Converters converters) {
    ScanRequest scanRequest = new ScanRequest();
    scanRequest.withConsistentRead(true);
    scanRequest.withTableName(tableName);

    FilterGenerator filterGenerator = new FilterGenerator();
    FilterGenerator.Filter generated = filterGenerator.process(filter.parsedAttributeCondition.get(), converters);

    if(!generated.expressionAttributeNames.isEmpty()) {
        scanRequest.withExpressionAttributeNames(generated.expressionAttributeNames);
    }

    if (!generated.expressionAttributeValues.isEmpty()) {
        scanRequest.withExpressionAttributeValues(generated.expressionAttributeValues);
    }

    scanRequest.withFilterExpression(generated.filterExpression);

    ScanResult result = client.scan(scanRequest);

    List<Map<String, AttributeValue>> results = new ArrayList<>();
    results.addAll(result.getItems());

    while (result.getLastEvaluatedKey() != null) {
        scanRequest = scanRequest.withExclusiveStartKey(result.getLastEvaluatedKey());

        result = client.scan(scanRequest);

        results.addAll(result.getItems());
    }

    Stream<Entry> typedResult = results.stream().map(this::fromMap);

    if (filter.reverse) {
        typedResult = Lists.reverse(typedResult.collect(Collectors.toCollection(LinkedList::new))).stream();
    }

    return typedResult;
}
 
源代码24 项目: strongbox   文件: GenericDynamoDBTest.java
@Test
public void testKeySet() throws Exception {
    ScanRequest request = new ScanRequest().withConsistentRead(true).withTableName(tableName);
    ScanResult result = constructScanResult();
    when(mockDynamoDBClient.scan(request)).thenReturn(result);

    // Call the KeySet method and assert the expected secret identifiers are returned.
    Set<SecretIdentifier> keys = dynamoDB.keySet();
    assertEquals(keys.size(), 2);
    assertTrue(keys.contains(new SecretIdentifier(SECRET_NAME)));
    assertTrue(keys.contains(new SecretIdentifier(SECRET2_NAME)));

    verify(mockDynamoDBClient, times(1)).scan(request);
}
 
源代码25 项目: strongbox   文件: GenericDynamoDBTest.java
@Test
public void testKeySetEmpty() throws Exception {
    ScanRequest request = new ScanRequest().withConsistentRead(true).withTableName(tableName);
    ScanResult result = new ScanResult().withCount(0).withItems(new ArrayList<>());
    when(mockDynamoDBClient.scan(request)).thenReturn(result);

    // Call the KeySet method and check that is it empty.
    Set<SecretIdentifier> keySet = dynamoDB.keySet();
    assertTrue(keySet.isEmpty());
    verify(mockDynamoDBClient, times(1)).scan(request);
}
 
@Override
public void run() {
    System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments
        + " segments " + itemLimit + " items at a time...");
    Map<String, AttributeValue> exclusiveStartKey = null;
    int totalScannedItemCount = 0;
    int totalScanRequestCount = 0;
    try {
        while (true) {
            ScanRequest scanRequest = new ScanRequest().withTableName(tableName).withLimit(itemLimit)
                .withExclusiveStartKey(exclusiveStartKey).withTotalSegments(totalSegments).withSegment(segment);

            ScanResult result = client.scan(scanRequest);

            totalScanRequestCount++;
            totalScannedItemCount += result.getScannedCount();

            // print items returned from scan request
            processScanResult(segment, result);

            exclusiveStartKey = result.getLastEvaluatedKey();
            if (exclusiveStartKey == null) {
                break;
            }
        }
    }
    catch (AmazonServiceException ase) {
        System.err.println(ase.getMessage());
    }
    finally {
        System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of "
            + totalSegments + " of " + tableName + " with " + totalScanRequestCount + " scan requests");
    }
}
 
源代码27 项目: beam   文件: DynamoDBIO.java
@ProcessElement
public void processElement(@Element Read<T> spec, OutputReceiver<Read<T>> out) {
  ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
  for (int i = 0; i < scanRequest.getTotalSegments(); i++) {
    out.output(spec.withSegmentId(i));
  }
}
 
源代码28 项目: beam   文件: DynamoDBIO.java
@ProcessElement
public void processElement(@Element Read<T> spec, OutputReceiver<T> out) {
  AmazonDynamoDB client = spec.getAwsClientsProvider().createDynamoDB();
  ScanRequest scanRequest = spec.getScanRequestFn().apply(null);
  scanRequest.setSegment(spec.getSegmentId());
  ScanResult scanResult = client.scan(scanRequest);
  out.output(spec.getScanResultMapperFn().apply(scanResult));
}
 
源代码29 项目: beam   文件: DynamoDBIOTestHelper.java
static List<Map<String, AttributeValue>> generateTestData(String tableName, int numOfItems) {
  BatchWriteItemRequest batchWriteItemRequest =
      generateBatchWriteItemRequest(tableName, numOfItems);

  dynamoDBClient.batchWriteItem(batchWriteItemRequest);
  ScanResult scanResult = dynamoDBClient.scan(new ScanRequest().withTableName(tableName));

  List<Map<String, AttributeValue>> items = scanResult.getItems();
  Assert.assertEquals(numOfItems, items.size());
  return items;
}
 
源代码30 项目: beam   文件: DynamoDBIOTest.java
@Test
public void testReadScanResult() {
  PCollection<List<Map<String, AttributeValue>>> actual =
      pipeline.apply(
          DynamoDBIO.<List<Map<String, AttributeValue>>>read()
              .withAwsClientsProvider(
                  AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))
              .withScanRequestFn(
                  (SerializableFunction<Void, ScanRequest>)
                      input -> new ScanRequest(tableName).withTotalSegments(1))
              .items());
  PAssert.that(actual).containsInAnyOrder(expected);
  pipeline.run().waitUntilFinish();
}
 
 类方法
 同包方法