下面列出了java.util.Map#forEach ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private T instanceObj(Map<Integer, Field> fieldMap, DataFormatter formatter, Row row) {
T obj = ReflectUtil.newInstance(dataType);
fieldMap.forEach((index, field) -> {
if (field.getType() == InputStream.class) {
convertPicture(row, obj, index, field);
return;
}
Cell cell = row.getCell(index, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
if (cell == null) {
return;
}
String content = formatter.formatCellValue(cell);
if (content == null) {
return;
}
content = trim.apply(content);
context.reset(obj, field, content, row.getRowNum(), index);
ReadConverterContext.convert(obj, context, convertContext, exceptionFunction);
});
return obj;
}
/**
* Recursively replace List objects with Object[]. JOLT expects arrays to be of type List where our Record code uses Object[].
*
* @param o The object to normalize with respect to JOLT
*/
@SuppressWarnings("unchecked")
protected static Object normalizeJoltObjects(final Object o) {
if (o instanceof Map) {
Map<String, Object> m = ((Map<String, Object>) o);
m.forEach((k, v) -> m.put(k, normalizeJoltObjects(v)));
return m;
} else if (o instanceof Object[]) {
return Arrays.stream(((Object[]) o)).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList());
} else if (o instanceof Collection) {
Collection c = (Collection) o;
return c.stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList());
} else {
return o;
}
}
private String returnPropertiesAsString(boolean asProperties) throws IOException {
Map<String, List<String[]>> finalValues = obtainFinalValuesAndOrigin();
try (StringWriter theSW = new StringWriter(); BufferedWriter theBW = new BufferedWriter(theSW)) {
finalValues.forEach((origin, values) -> {
try {
final boolean encrypted = ENCRIPTED_MAPS.contains(origin);
theBW.write("# Derived from: ");
theBW.write(origin);
theBW.newLine();
if (asProperties) {
printPropertyValues(theBW, values, encrypted);
} else {
printYamlValues(theBW, values, encrypted);
}
theBW.newLine();
} catch (IOException e) {
log.error("Error printing values", e);
}
});
theBW.flush();
return theSW.toString();
}
}
@Override
public StructuralNode apply(StructuralNode serverSection, Parameters params) {
Map<String, ApiResponse> apiResponses = params.apiResponses;
if (null == apiResponses || apiResponses.isEmpty()) return serverSection;
TableImpl pathResponsesTable = new TableImpl(serverSection, new HashMap<>(), new ArrayList<>());
pathResponsesTable.setOption("header");
pathResponsesTable.setAttribute("caption", "", true);
pathResponsesTable.setAttribute("cols", ".^2a,.^14a,.^4a", true);
pathResponsesTable.setTitle(labels.getLabel(TABLE_TITLE_RESPONSES));
pathResponsesTable.setHeaderRow(
labels.getLabel(TABLE_HEADER_HTTP_CODE),
labels.getLabel(TABLE_HEADER_DESCRIPTION),
labels.getLabel(TABLE_HEADER_LINKS));
apiResponses.forEach((httpCode, apiResponse) ->
pathResponsesTable.addRow(
generateInnerDoc(pathResponsesTable, httpCode),
getResponseDescriptionColumnDocument(pathResponsesTable, apiResponse),
linkComponent.apply(pathResponsesTable, apiResponse.getLinks())
));
serverSection.append(pathResponsesTable);
return serverSection;
}
@Test
public void primitiveProperties() {
ModelDescriptor<?> descriptor = ModelDescriptor
.get(BasicTypeModel.class);
Map<String, ModelType> expectedTypes = new HashMap<>();
expectedTypes.put("boolean", BasicModelType.get(Boolean.class).get());
expectedTypes.put("booleanPrimitive",
BasicModelType.get(boolean.class).get());
expectedTypes.put("string", BasicModelType.get(String.class).get());
expectedTypes.put("double", BasicModelType.get(Double.class).get());
expectedTypes.put("doublePrimitive",
BasicModelType.get(double.class).get());
expectedTypes.put("int", BasicModelType.get(int.class).get());
expectedTypes.put("integer", BasicModelType.get(Integer.class).get());
Set<String> propertyNames = descriptor.getPropertyNames()
.collect(Collectors.toSet());
Assert.assertEquals(expectedTypes.keySet(), propertyNames);
expectedTypes.forEach((propertyName, expectedType) -> Assert.assertSame(
expectedType, descriptor.getPropertyType(propertyName)));
}
private void updateCapacityGroupCounters(Map<String, Map<String, Histogram.Builder>> capacityGroupsHistograms, Map<String, Tier> tierMap) {
capacityGroupsHistograms.forEach((capacityGroup, histograms) -> {
Id baseId = registry.createId(
TASK_IN_STATE_METRIC_NAME,
"tier", tierMap.get(capacityGroup).name(),
"capacityGroup", capacityGroup
);
Map<String, List<Gauge>> capacityMetricsByState = capacityGroupsMetrics.computeIfAbsent(capacityGroup, k -> new HashMap<>());
for (String state : TRACKED_STATES) {
List<Gauge> updatedGauges = updateStateCounters(baseId, state, histograms.get(state), capacityMetricsByState.get(state));
if (updatedGauges.isEmpty()) {
capacityMetricsByState.remove(capacityGroup);
} else {
capacityMetricsByState.put(state, updatedGauges);
}
}
});
}
/**
* Merges the options part of {@code CREATE TABLE} statement.
*/
public Map<String, String> mergeOptions(
MergingStrategy mergingStrategy,
Map<String, String> sourceOptions,
Map<String, String> derivedOptions) {
Map<String, String> options = new HashMap<>();
if (mergingStrategy != MergingStrategy.EXCLUDING) {
options.putAll(sourceOptions);
}
derivedOptions.forEach((key, value) -> {
if (mergingStrategy != MergingStrategy.OVERWRITING && options.containsKey(key)) {
throw new ValidationException(String.format(
"There already exists an option ['%s' -> '%s'] in the " +
"base table. You might want to specify EXCLUDING OPTIONS or OVERWRITING OPTIONS.",
key,
options.get(key)));
}
options.put(key, value);
});
return options;
}
public void setValue(Map<String, Boolean> map) {
dataTypes.clear();
map.keySet().forEach(dt -> dataTypes.add(dt));
map.forEach((dt, b) -> {
if (b) {
checkList.getCheckModel().check(dt);
}
});
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {
if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) {
data.putAll(map);
return;
}
map.forEach(this::put);
}
/**
* Stop only persistent stores. In case of certain stores and store mode (such as RocksDB), this
* can invoke compaction.
*/
public void stopPersistentStores() {
Map<String, StorageEngine> persistentStores = this.taskStores.entrySet().stream().filter(e -> {
return e.getValue().getStoreProperties().isPersistedToDisk();
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
persistentStores.forEach((storeName, storageEngine) -> {
storageEngine.stop();
this.taskStores.remove(storeName);
});
LOG.info("Stopped persistent stores {}", persistentStores);
}
/**
* 批量保存设置
*
* @param options options
*/
@Override
@CacheEvict(value = POSTS_CACHE_NAME, allEntries = true, beforeInvocation = true)
public void saveOptions(Map<String, String> options) {
if (null != options && !options.isEmpty()) {
options.forEach((k, v) -> saveOption(k, v));
}
}
@Test
public void testRuleCategory() throws Exception {
Map<String, RuleCatalog> ruleCatalog = deserializeRuleFile();
List<String> allErrors = new ArrayList<String>();
ruleCatalog.forEach((k, curRuleCatalog) -> {
List<String> errors = validateCategories(curRuleCatalog);
allErrors.addAll(errors);
});
if(!allErrors.isEmpty()) {
fail("Here is a list of category related errors:\n"+allErrors);
}
}
/**
* Copy all attributes in the supplied {@code Map} into this {@code Map},
* with existing objects of the same name taking precedence (i.e. not getting
* replaced).
*/
public ModelMap mergeAttributes(@Nullable Map<String, ?> attributes) {
if (attributes != null) {
attributes.forEach((key, value) -> {
if (!containsKey(key)) {
put(key, value);
}
});
}
return this;
}
private void describeNodeConfig(int controllerNodeId, Node node) throws InterruptedException, ExecutionException {
if (!doesNodeSupportDescribeConfigApi(node)) {
Logger.warn(String.format("Node '%s' does not support describeConfig api. Cannot show cluster properties", node));
return;
}
DescribeConfigsResult configs = kafkaClientsAdminClient.describeConfigs(
singleton(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(node.id()))));
final Map<ConfigResource, Config> configResourceConfigMap = configs.all().get();
configResourceConfigMap.forEach((configResource, config) ->
clusterSummary.addNodeInfo(new ClusterNodeInfo(node.id() == controllerNodeId,
node.idString(),
new HashSet<>(config.entries()))));
}
public static <T> void checkResults(final Map<T, Long> expectedResults, final Traversal<?, T> traversal) {
final List<T> list = new ArrayList<>();
expectedResults.forEach((k, v) -> {
for (int i = 0; i < v; i++) {
list.add(k);
}
});
checkResults(list, traversal);
}
private Map<String, List<String>> adaptHeaders(Map<String, List<String>> toConvert) {
Map<String, List<String>> adapted = new TreeMap<>();
toConvert.forEach((name, value) -> adapted.put(name, new ArrayList<>(value)));
return adapted;
}
public Map<String, Object> computeCountOnResult(final String ds, final String collection, final AggregateNode aggregateNode, final Map<String, List<JSONObject>> resultMap) throws OperationException {
final Map<String, Object> aggregateMap = new ConcurrentHashMap<>();
ValueNode operand = aggregateNode.getOperand();
if (operand instanceof ColumnReference) {
final String columnName = ((ColumnReference) operand).getColumnName();
Column column = schemaManager.readSchema(ds, collection).getColumn(columnName);
if(column == null) {
throw new OperationException(ErrorCode.UNKNOWN_COLUMN, "No column found with name " + columnName);
}
FieldType fieldType = column.getFieldType();
validateNumeric(fieldType);
resultMap.forEach((key, records) -> {
final Aggregate count = new Aggregate();
count.addCount(records.size());
try {
aggregateMap.put(key, fieldType.convert(count.getCount()));
} catch (OperationException e) {
e.printStackTrace();
}
});
if(aggregateMap.size() != resultMap.size()) {
throw new OperationException(ErrorCode.INTERNAL_OPERATION_ERROR, "Could not produce aggregate result for column "
+ columnName + " for at least one aggregate group");
}
final String aggregateName = aggregateNode.getAggregateName() + "(" + ((ColumnReference) aggregateNode.getOperand()).getColumnName() + ")";
resultMap.forEach((key, records) -> {
final Object value = aggregateMap.get(key);
records.parallelStream().forEach(record -> {
if(record != null) {
record.put(aggregateName, value);
}
});
});
} else if (operand instanceof ConstantNode) {
throw new OperationException(ErrorCode.OPERATION_NOT_SUPPORTED, "COUNT(" + ((ConstantNode) operand).getValue() + ") not supported");
}
throw new OperationException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
/**
* This method performs batch operations of insert and update on a same table, further other
* operations can be added to if it is necessary.
*
* @param keySpaceName
* @param tableName
* @param inputData
* @return
*/
@Override
public Response performBatchAction(
String keySpaceName, String tableName, Map<String, Object> inputData) {
long startTime = System.currentTimeMillis();
ProjectLogger.log(
"Cassandra Service performBatchAction method started at ==" + startTime,
LoggerEnum.INFO.name());
Session session = connectionManager.getSession(keySpaceName);
Response response = new Response();
BatchStatement batchStatement = new BatchStatement();
ResultSet resultSet = null;
try {
inputData.forEach(
(key, inputMap) -> {
Map<String, Object> record = (Map<String, Object>) inputMap;
if (key.equals(JsonKey.INSERT)) {
Insert insert = createInsertStatement(keySpaceName, tableName, record);
batchStatement.add(insert);
} else if (key.equals(JsonKey.UPDATE)) {
Update update = createUpdateStatement(keySpaceName, tableName, record);
batchStatement.add(update);
}
});
resultSet = session.execute(batchStatement);
response.put(Constants.RESPONSE, Constants.SUCCESS);
} catch (QueryExecutionException
| QueryValidationException
| NoHostAvailableException
| IllegalStateException e) {
ProjectLogger.log(
"Cassandra performBatchAction Failed." + e.getMessage(), LoggerEnum.ERROR.name());
throw new ProjectCommonException(
ResponseCode.SERVER_ERROR.getErrorCode(),
ResponseCode.SERVER_ERROR.getErrorMessage(),
ResponseCode.SERVER_ERROR.getResponseCode());
}
logQueryElapseTime("performBatchAction", startTime);
return response;
}
@Test
public void testLoadOffsetsWithoutGroup() throws Exception {
Map<TopicPartition, Long> committedOffsets = new HashMap<>();
committedOffsets.put(
new TopicPartition("foo", 0), 23L);
committedOffsets.put(
new TopicPartition("foo", 1), 455L);
committedOffsets.put(
new TopicPartition("bar", 0), 8992L);
List<SimpleRecord> offsetCommitRecords = createCommittedOffsetRecords(
committedOffsets,
groupId
);
ByteBuffer buffer = newMemoryRecordsBuffer(offsetCommitRecords);
byte[] key = groupMetadataKey(groupId);
Producer<ByteBuffer> producer = groupMetadataManager.getOffsetsTopicProducer(groupPartitionId).get();
producer.newMessage()
.keyBytes(key)
.value(buffer)
.eventTime(Time.SYSTEM.milliseconds())
.send();
CompletableFuture<GroupMetadata> onLoadedFuture = new CompletableFuture<>();
groupMetadataManager.scheduleLoadGroupAndOffsets(
groupPartitionId,
groupMetadata -> onLoadedFuture.complete(groupMetadata)
).get();
GroupMetadata group = onLoadedFuture.get();
GroupMetadata groupInCache = groupMetadataManager.getGroup(groupId).orElseGet(() -> {
fail("Group was not loaded into the cache");
return null;
});
assertSame(group, groupInCache);
assertEquals(groupId, group.groupId());
assertEquals(Empty, group.currentState());
assertEquals(committedOffsets.size(), group.allOffsets().size());
committedOffsets.forEach((tp, offset) ->
assertEquals(Optional.of(offset), group.offset(tp).map(OffsetAndMetadata::offset)));
}
/**
* Set session attributes.
* @param sessionAttributes the session attributes
*/
public MockHttpServletRequestBuilder sessionAttrs(Map<String, Object> sessionAttributes) {
Assert.notEmpty(sessionAttributes, "'sessionAttributes' must not be empty");
sessionAttributes.forEach(this::sessionAttr);
return this;
}