下面列出了怎么用org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema的API类实例代码及写法,或者点击链接到github查看源代码。
public QueryResult getSparkQueryResult(Entry<List<Attribute>, List<GenericRowWithSchema>> sparkData)
throws Exception {
if (sparkData == null) {
return new QueryResult(new ArrayList<>(), new ArrayList<>());
}
if (CollectionUtils.isEmpty(sparkData.getKey())) {
throw new SparkException("collect data error");
}
List<Attribute> attributes = sparkData.getKey();
List<GenericRowWithSchema> value = sparkData.getValue();
List<Object> data = new ArrayList<>();
List<ColumnMetaData> meta = new ArrayList<>();
value.stream().forEach(column -> {
data.add(column.values());
});
for (int index = 0; index < sparkData.getKey().size(); index++) {
Attribute attribute = sparkData.getKey().get(index);
ScalarType columnType = getColumnType(attribute.dataType());
meta.add(new ColumnMetaData(index, false, true, false, false,
attribute.nullable() ? 1 : 0, true, -1, attribute.name(), attribute.name(), null, -1, -1, null, null,
columnType, true, false, false, columnType.columnClassName()));
}
return new QueryResult(meta, data);
}
private void updateCollectorItemMetricDetail(CollectorItemMetricDetail collectorItemMetricDetail, Row row) {
Date timeWindowDt = row.getAs(STR_TIMEWINDOW);
List<String> performanceMetricList = Arrays.asList(STR_AVG_RESPONSE_TIME,STR_CALLSPER_MINUTE,STR_ERROR_RATE);
GenericRowWithSchema pefMetrics = row.getAs("metrics");
for(String perfMetric :performanceMetricList){
double value;
try {
Long valueStr = pefMetrics.getAs(perfMetric);
value = valueStr.doubleValue();
}catch (IllegalArgumentException exception){
value = 0.0;
}
MetricCount mc = getMetricCount("", value, perfMetric);
if (!mc.getLabel().isEmpty()) {
collectorItemMetricDetail.setStrategy(getCollectionStrategy());
collectorItemMetricDetail.addCollectorItemMetricCount(timeWindowDt, mc);
collectorItemMetricDetail.setLastScanDate(timeWindowDt);
}
}
}
private void addEnvironmentsToProduct(Product product, Collection<Object> environmentNames) {
if (CollectionUtils.isEmpty(environmentNames)) {
return;
}
environmentNames.forEach(obj -> {
Row optErow =
Optional.ofNullable(environmentRowsList)
.orElseGet(Collections::emptyList).stream()
.filter(c -> Objects.equals(c.getAs("configurationItem"), obj))
.findFirst().orElse(null);
if (optErow != null) {
Environment environment = new Environment();
environment.setName((String) obj);
environment.setCommonName((String) optErow.getAs("componentName"));
environment.setLob(product.getLob());
environment.setMetricLevel(MetricLevel.ENVIRONMENT);
String componentId = (String) ((GenericRowWithSchema) optErow.getAs("componentId")).values()[0];
environment.setId(new ObjectId(componentId));
LOGGER.debug("Environment Name = " + obj);
product.addEnvironment(environment);
}
});
}
private void updateCollectorItemMetricDetail(CollectorItemMetricDetail collectorItemMetricDetail, Row itemRow) {
Date timeWindowDt = itemRow.getAs("timeWindow");
List<String> scaMetricList = Arrays.asList("coverage");
Collection<Object> javaCollection = JavaConversions.asJavaCollection(((WrappedArray) itemRow.getAs("metrics")).toList());
Optional.ofNullable(javaCollection)
.orElseGet(Collections::emptyList)
.forEach(m -> {
GenericRowWithSchema genericRowWithSchema = (GenericRowWithSchema) m;
String existingLabelName = genericRowWithSchema.getAs("name");
if (scaMetricList.contains(existingLabelName)) {
String valueStr = genericRowWithSchema.getAs("value");
try{
double value = Double.parseDouble(valueStr);
MetricCount mc = getMetricCount("", value, "unit-test-coverage");
if (mc != null) {
collectorItemMetricDetail.setStrategy(getCollectionStrategy());
collectorItemMetricDetail.addCollectorItemMetricCount(timeWindowDt, mc);
collectorItemMetricDetail.setLastScanDate(timeWindowDt);
}
}catch (Exception e){
LOGGER.info("Exception: Not a number, 'value' = "+valueStr,e);
}
}
});
}
/**
*
* @param collectorItemMetricDetail
* @param itemRow
*
* Update collectorItemMetric details with latest timeSeries data and summary.
*
*/
private void updateCollectorItemMetricDetail(CollectorItemMetricDetail collectorItemMetricDetail,Row itemRow) {
Date timeWindowDt = itemRow.getAs(STR_TIME_WINDOW);
List<String> testTypes = Arrays.asList(STR_AUTOMATED, STR_MANUAL);
GenericRowWithSchema javaCollection = itemRow.getAs(STR_TRACEABILITY);
testTypes.forEach(testType -> {
double value = javaCollection.getAs(testType);
MetricCount mc = getMetricCount("", value, testType);
if (mc != null) {
collectorItemMetricDetail.setStrategy(getCollectionStrategy());
collectorItemMetricDetail.addCollectorItemMetricCount(timeWindowDt, mc);
collectorItemMetricDetail.setLastScanDate(timeWindowDt);
}
});
}
private void updateCollectorItemMetricDetail(CollectorItemMetricDetail collectorItemMetricDetail, Row itemRow) {
Date timeWindowDt = itemRow.getAs("timeWindow");
List<String> scaMetricList = Arrays.asList("blocker_violations", "critical_violations", "major_violations", "sqale_index");
Collection<Object> javaCollection = JavaConversions.asJavaCollection(((WrappedArray) itemRow.getAs("metrics")).toList());
Optional.ofNullable(javaCollection)
.orElseGet(Collections::emptyList)
.forEach(m -> {
GenericRowWithSchema genericRowWithSchema = (GenericRowWithSchema) m;
String existingLabelName = genericRowWithSchema.getAs("name");
if (scaMetricList.contains(existingLabelName)) {
String valueStr = genericRowWithSchema.getAs("value");
try {
double value = Double.parseDouble(valueStr);
MetricCount mc = getMetricCount("", value, existingLabelName);
if (mc != null) {
collectorItemMetricDetail.setStrategy(getCollectionStrategy());
collectorItemMetricDetail.addCollectorItemMetricCount(timeWindowDt, mc);
collectorItemMetricDetail.setLastScanDate(timeWindowDt);
}
} catch (NumberFormatException e) {
LOGGER.info("Exception: Not a number, 'value' = "+valueStr,e);
}
}
});
}
private void updateCollectorItemMetricDetail(CollectorItemMetricDetail collectorItemMetricDetail, Row itemRow) {
Date timeWindowDt = itemRow.getAs("timeWindow");
List<String> metricList = Arrays.asList("High", "Medium", "Low");
Collection<Object> javaCollection = JavaConversions.asJavaCollection(((WrappedArray) itemRow.getAs("metrics")).toList());
Optional.ofNullable(javaCollection)
.orElseGet(Collections::emptyList)
.forEach(m -> {
GenericRowWithSchema genericRowWithSchema = (GenericRowWithSchema) m;
String existingLabelName = genericRowWithSchema.getAs("name");
if (metricList.contains(existingLabelName)) {
String valueStr = genericRowWithSchema.getAs("value");
try {
double value = Double.parseDouble(valueStr);
MetricCount mc = getMetricCount("", value, existingLabelName);
if (mc != null) {
collectorItemMetricDetail.setStrategy(getCollectionStrategy());
collectorItemMetricDetail.addCollectorItemMetricCount(timeWindowDt, mc);
collectorItemMetricDetail.setLastScanDate(timeWindowDt);
}
} catch (NumberFormatException e) {
LOGGER.info("Exception: Not a number, 'value' = "+valueStr,e);
}
}
});
}
private void updateCollectorItemMetricDetail(CollectorItemMetricDetail collectorItemMetricDetail, Row itemRow, String type) {
Date timeWindowDt = itemRow.getAs("timeWindow");
Collection<Object> javaCollection = JavaConversions.asJavaCollection(((WrappedArray) itemRow.getAs(type)).toList());
Optional.ofNullable(javaCollection)
.orElseGet(Collections::emptyList)
.forEach(m -> {
GenericRowWithSchema genericRowWithSchema = (GenericRowWithSchema) m;
String level = genericRowWithSchema.getAs("level");
int value = genericRowWithSchema.getAs("count");
MetricCount mc = getMetricCount(level, value, type);
if (mc != null) {
collectorItemMetricDetail.setStrategy(getCollectionStrategy());
collectorItemMetricDetail.addCollectorItemMetricCount(timeWindowDt, mc);
collectorItemMetricDetail.setLastScanDate(timeWindowDt);
}
});
}
@Override
protected Object getChild(Object composite, int index) {
// The row being converted may have come from a different schema or profile
// than what is being requested by the caller, so we must look up fields
// by name.
String fieldName = ((StructType) getDataType()).apply(index).name();
scala.Option fieldIndex = ((GenericRowWithSchema) composite)
.schema()
.getFieldIndex(fieldName);
if (fieldIndex.isDefined()) {
return ((Row) composite).get((Integer) fieldIndex.get());
} else {
return null;
}
}
@Override
public Row call(final SimpleFeature feature) throws Exception {
final Object[] fields = new Serializable[schema.size()];
for (int i = 0; i < schema.size(); i++) {
final Object fieldObj = feature.getAttribute(i);
if (fieldObj != null) {
final StructField structField = schema.apply(i);
if (structField.name().equals("geom")) {
fields[i] = fieldObj;
} else if (structField.dataType() == DataTypes.TimestampType) {
fields[i] = new Timestamp(((Date) fieldObj).getTime());
} else if (structField.dataType() != null) {
fields[i] = fieldObj;
} else {
LOGGER.error("Unexpected attribute in field(" + structField.name() + "): " + fieldObj);
}
}
}
return new GenericRowWithSchema(fields, schema);
}
private QuicksqlServerResultSet getResultSet(StatementHandle h, SqlRunner runner, String sql, int maxResNum,
Object collect) {
if (collect instanceof ResultSet) {
return getJDBCResultSet(h, collect, maxResNum);
} else if (collect instanceof Map.Entry) {
try {
Entry<List<Attribute>, List<GenericRowWithSchema>> sparkData = (Entry<List<Attribute>, List<GenericRowWithSchema>>) collect;
return getResultSet(h, sql, maxResNum, getSparkQueryResult(sparkData));
} catch (Exception e) {
throw new RuntimeException("result type error " + e.getMessage());
}
} else {
throw new RuntimeException("not matching result type");
}
}
public Map<String, List<Row>> collectAll() {
Map<String, List<Row>> rowMap = new HashMap<>();
DataFrameLoader.loadDataFrame(collectionName, javaSparkContext);
Dataset<Row> dataRows = null;
if (collectionName.contains("pipelines")) {
dataRows = getDataRowsForPipelines();
}
else {
dataRows = sparkSession.sql(query);
}
List<Row> rowList = dataRows.collectAsList();
rowList.forEach(row -> {
String item = (String) ((GenericRowWithSchema) row.getAs("collectorItemId")).get(0);
boolean matchingCollectorItem = !Objects.equals(
Optional.ofNullable(collectorItemIds).orElseGet(Collections::emptyList).stream()
.filter(Predicate.isEqual(item)).findFirst().orElse(""), "");
Date timeWindowDt = row.getAs("timeWindow");
long daysAgo = HygieiaExecutiveUtil.getDaysAgo(timeWindowDt);
if (matchingCollectorItem && (daysAgo < 90)) {
String collectorItemId = (String) ((GenericRowWithSchema) row.getAs("collectorItemId")).get(0);
List<Row> existingRowList = rowMap.get(collectorItemId);
if (existingRowList == null) {
List<Row> newRow = new ArrayList<>();
newRow.add(row);
rowMap.put(collectorItemId, newRow);
} else {
existingRowList.add(row);
rowMap.put(collectorItemId, existingRowList);
}
}
});
return rowMap;
}
public static final List<DashboardCollectorItem> getPipelineDashboardCollectorItems(Dataset<Row> ds) {
List<DashboardCollectorItem> arr = new ArrayList<>();
List<Row> dd = ds.collectAsList();
dd.forEach(row -> {
WrappedArray dashboardIds = row.getAs("dashboardIds");
Iterator iterdashboardIds = dashboardIds.iterator();
WrappedArray itemArray = row.getAs("collectorItems");
Iterator iter = itemArray.iterator();
for (int i = 0; i < dashboardIds.length(); i++) {
String productName = row.getAs("productName");
String componentName = row.getAs("componentName");
String dashboardId = (String) ((GenericRowWithSchema) row.getAs("dashboardId")).values()[0];
List<String> itemIds = new ArrayList<>();
DashboardCollectorItem dashboardCollectorItem = null;
dashboardCollectorItem = new DashboardCollectorItem();
String grs = (String) iterdashboardIds.next();
dashboardCollectorItem.setDashboardId(grs);
GenericRowWithSchema collId = (GenericRowWithSchema) iter.next();
itemIds.add((String) collId.get(0));
dashboardCollectorItem.setItems(itemIds);
String dashboardTitle = row.getAs("title");
String key = productName + DELIMITER + componentName + DELIMITER + dashboardTitle;
dashboardCollectorItem.setName(key);
dashboardCollectorItem.setProductDashboardIds(dashboardId);
arr.add(dashboardCollectorItem);
}
});
return arr;
}
private void addProductToPortfolio(Portfolio portfolio, Row productRow) {
String productName = productRow.getAs("productName");
String productDept = productRow.getAs("ownerDept");
String commonName = productRow.getAs("commonName");
String productId = (String) ((GenericRowWithSchema) productRow.getAs("productId")).values()[0];
LOGGER.debug(" Product Name = " + productName + " ; Owner Dept = " + productDept);
// For a given portfolio, check if the current product already exists in the product list for the portfolio
// If not, add it to the product list
Product product =
Optional.ofNullable(portfolio.getProducts())
.orElseGet(Collections::emptyList).stream()
.filter(p -> p.getName().equalsIgnoreCase(productName)
&& p.getLob().equalsIgnoreCase(productDept))
.findFirst().orElse(null);
if (product == null) {
product = new Product();
product.setId(new ObjectId(productId));
product.setLob(productDept);
product.setName(productName);
product.setCommonName(commonName);
product.setMetricLevel(MetricLevel.PRODUCT);
}
if (productRow.getAs("environments") != null) {
Collection<Object> environmentNames = JavaConversions.asJavaCollection(((WrappedArray) productRow.getAs("environments")).toStream().toList());
addEnvironmentsToProduct(product, environmentNames);
}
if (productRow.getAs("components") != null) {
Collection<Object> componentNames = JavaConversions.asJavaCollection(((WrappedArray) productRow.getAs("components")).toStream().toList());
addComponentsToProduct(product, componentNames);
}
product.addOwner(new PeopleRoleRelation(getPeople(productRow.getAs("appServiceOwner"), "appServiceOwner"), RoleRelationShipType.AppServiceOwner));
product.addOwner(new PeopleRoleRelation(getPeople(productRow.getAs("supportOwner"), "supportOwner"), RoleRelationShipType.SupportOwner));
product.addOwner(new PeopleRoleRelation(getPeople(productRow.getAs("developmentOwner"), "developmentOwner"), RoleRelationShipType.DevelopmentOwner));
portfolio.addProduct(product);
}
private void addProductToLob(Lob lob, Row productRow) {
String productName = productRow.getAs("productName");
String productDept = productRow.getAs("ownerDept");
String commonName = productRow.getAs("commonName");
String productId = (String) ((GenericRowWithSchema) productRow.getAs("productId")).values()[0];
LOGGER.debug(" Product Name = " + productName + " ; Owner Dept = " + productDept);
// For a given portfolio, check if the current product already exists in the product list for the portfolio
// If not, add it to the product list
Product product =
Optional.ofNullable(lob.getProducts())
.orElseGet(Collections::emptyList).stream()
.filter(p -> p.getName().equalsIgnoreCase(productName)
&& p.getLob().equalsIgnoreCase(productDept))
.findFirst().orElse(null);
if (product == null) {
product = new Product();
product.setId(new ObjectId(productId));
product.setLob(productDept);
product.setName(productName);
product.setCommonName(commonName);
product.setMetricLevel(MetricLevel.PRODUCT);
}
if (productRow.getAs("environments") != null) {
Collection<Object> environmentNames = JavaConversions.asJavaCollection(((WrappedArray) productRow.getAs("environments")).toStream().toList());
addEnvironmentsToProduct(product, environmentNames);
}
if (productRow.getAs("components") != null) {
Collection<Object> componentNames = JavaConversions.asJavaCollection(((WrappedArray) productRow.getAs("components")).toStream().toList());
addComponentsToProduct(product, componentNames);
}
product.addOwner(new PeopleRoleRelation(getPeople(productRow.getAs("appServiceOwner"), "appServiceOwner"), RoleRelationShipType.AppServiceOwner));
product.addOwner(new PeopleRoleRelation(getPeople(productRow.getAs("supportOwner"), "supportOwner"), RoleRelationShipType.SupportOwner));
product.addOwner(new PeopleRoleRelation(getPeople(productRow.getAs("developmentOwner"), "developmentOwner"), RoleRelationShipType.DevelopmentOwner));
lob.addProduct(product);
}
private void updateCollectorItemMetricDetail(CollectorItemMetricDetail collectorItemMetricDetail, Row itemRow) {
Date timeWindowDt = itemRow.getAs("timeWindow");
LOGGER.info("TimeWindow:" +timeWindowDt );
LOGGER.info("itemRow :" + itemRow);
Collection<Object> javaCollection = JavaConversions.asJavaCollection(((WrappedArray) itemRow.getAs("prodStageList")).toList());
Optional.ofNullable(javaCollection)
.orElseGet(Collections::emptyList).stream().map(m -> (GenericRowWithSchema) m).forEach(genericRowWithSchema -> {
Long pipelineTimeL = genericRowWithSchema.getAs("timestamp");
Date dateObj = new Timestamp(new Date(pipelineTimeL).getTime());
LOGGER.info("Date Object :" + dateObj);
Long scmTimeL = genericRowWithSchema.getAs("scmCommitTimestamp");
Long pipelineTimeAfterIgnore = pipelineTimeL/1000;
Long scmTimeAfterIgnore = scmTimeL/1000;
try {
Long diffTimestamp = Math.abs(pipelineTimeAfterIgnore - scmTimeAfterIgnore);
String strTimestampInsec = Long.toString(diffTimestamp);
double value = Double.parseDouble(strTimestampInsec);
MetricCount mc = getMetricCount("", value, "pipeline-lead-time");
if (mc != null) {
collectorItemMetricDetail.setStrategy(getCollectionStrategy());
collectorItemMetricDetail.addCollectorItemMetricCount(dateObj, mc);
collectorItemMetricDetail.setLastScanDate(dateObj);
}
} catch (NumberFormatException e) {
LOGGER.info("Exception: Not a number, 'value' = " + scmTimeAfterIgnore, e);
}
});
}
@Override
public Iterable<Row> call(List<List<Writable>> sequence) throws Exception {
if (sequence.size() == 0)
return Collections.emptyList();
String sequenceUUID = UUID.randomUUID().toString();
List<Row> out = new ArrayList<>(sequence.size());
int stepCount = 0;
for (List<Writable> step : sequence) {
Object[] values = new Object[step.size() + 2];
values[0] = sequenceUUID;
values[1] = stepCount++;
for (int i = 0; i < step.size(); i++) {
switch (schema.getColumnTypes().get(i)) {
case Double:
values[i + 2] = step.get(i).toDouble();
break;
case Integer:
values[i + 2] = step.get(i).toInt();
break;
case Long:
values[i + 2] = step.get(i).toLong();
break;
case Float:
values[i + 2] = step.get(i).toFloat();
break;
default:
throw new IllegalStateException(
"This api should not be used with strings , binary data or ndarrays. This is only for columnar data");
}
}
Row row = new GenericRowWithSchema(values, structType);
out.add(row);
}
return out;
}
@Override
public Iterator<Row> call(List<List<Writable>> sequence) throws Exception {
if (sequence.size() == 0)
return Collections.emptyIterator();
String sequenceUUID = UUID.randomUUID().toString();
List<Row> out = new ArrayList<>(sequence.size());
int stepCount = 0;
for (List<Writable> step : sequence) {
Object[] values = new Object[step.size() + 2];
values[0] = sequenceUUID;
values[1] = stepCount++;
for (int i = 0; i < step.size(); i++) {
switch (schema.getColumnTypes().get(i)) {
case Double:
values[i + 2] = step.get(i).toDouble();
break;
case Integer:
values[i + 2] = step.get(i).toInt();
break;
case Long:
values[i + 2] = step.get(i).toLong();
break;
case Float:
values[i + 2] = step.get(i).toFloat();
break;
default:
throw new IllegalStateException(
"This api should not be used with strings , binary data or ndarrays. This is only for columnar data");
}
}
Row row = new GenericRowWithSchema(values, structType);
out.add(row);
}
return out.iterator();
}
private ExecuteResult getExecuteResultSet(StatementHandle h, QuicksqlConnectionImpl connection, String sql) {
MetaResultSet resultSet = null;
String responseUrl = "";
System.out.println("sql:" + sql);
try {
if (sql.toLowerCase().startsWith("explain")) {
String logicalPlanView = new SqlLogicalPlanView().getLogicalPlanView(sql.replaceAll("explain ",""));
resultSet = getResultSet(h, sql, 1, getExplainResult(logicalPlanView));
return new ExecuteResult(Collections.singletonList(resultSet));
}
int maxResNum = Integer
.parseInt(StringUtils.defaultIfBlank(connection.getInfoByName("acceptedResultsNum"), "100000"));
responseUrl = connection.getInfoByName("responseUrl");
SqlRunner runner = SqlRunner.builder()
.setTransformRunner(RunnerType.value(connection.getInfoByName("runner")))
.setSchemaPath(StringUtils.isNotBlank(connection.getInfoByName("schemaPath")) ? "inline:" + connection
.getInfoByName("schemaPath") : SqlUtil.getSchemaPath(SqlUtil.parseTableName(sql).tableNames))
.setAppName(StringUtils.defaultIfBlank(connection.getInfoByName("appName"), ""))
.setAcceptedResultsNum(maxResNum)
.ok();
if (sql.contains("HDFS")) {
insertResult(sql, runner, connection);
resultSet = getResultSet(h, sql, 0, new QueryResult(new ArrayList<>(), new ArrayList<>()));
} else {
RunnerType runnerType = RunnerType.value(connection.getInfoByName("runner"));
Object collect = runner.sql(sql).collect();
switch (runnerType) {
case JDBC:
resultSet = getJDBCResultSet(h, collect, maxResNum);
break;
case SPARK:
Entry<List<Attribute>, List<GenericRowWithSchema>> sparkData = (Entry<List<Attribute>, List<GenericRowWithSchema>>) collect;
resultSet = getResultSet(h, sql, maxResNum, getSparkQueryResult(sparkData));
break;
case FLINK:
Entry<TableSchema, List<Row>> flinkData = (Entry<TableSchema, List<Row>>) collect;
resultSet = getResultSet(h, sql, maxResNum, getFlinkQueryResult(flinkData));
break;
case DEFAULT:
resultSet = getResultSet(h, runner, sql, maxResNum, collect);
break;
}
}
} catch (Exception e) {
e.printStackTrace();
setHttpResponse(responseUrl, 0, "Quicksql error :" + e.getCause());
throw new RuntimeException(e);
}
return new ExecuteResult(Collections.singletonList(resultSet));
}
private void addComponentsToProduct(Product product, Collection<Object> componentNames) {
if (CollectionUtils.isEmpty(componentNames)) {
return;
}
componentNames.forEach(obj -> {
Row optCrow =
Optional.ofNullable(componentRowsList)
.orElseGet(Collections::emptyList).stream()
.filter(c -> Objects.equals(c.getAs("configurationItem"), obj))
.findFirst().orElse(null);
if (optCrow != null) {
ProductComponent productComponent = new ProductComponent();
productComponent.setName((String) obj);
productComponent.setLob(product.getLob());
productComponent.setMetricLevel(MetricLevel.COMPONENT);
productComponent.setCommonName((String) optCrow.getAs("componentName"));
// Check if the productComponent (BAP: Business Application) is associated with a dashboard,i.e. if it is being reported.
String componentId = (String) ((GenericRowWithSchema) optCrow.getAs("componentId")).values()[0];
String configItem = optCrow.getAs("configurationItem");
productComponent.setId(new ObjectId(componentId));
Row matchingDashboard =
Optional.ofNullable(dashboardRowsList)
.orElseGet(Collections::emptyList).stream()
.filter(c -> (
((String) c.getAs("componentName")).equalsIgnoreCase(configItem))
&& ((String) c.getAs("productName")).equalsIgnoreCase(product.getName())
).findFirst().orElse(null);
if (matchingDashboard != null) {
productComponent.setReporting(true);
productComponent.setProductComponentDashboardId(new ObjectId((String) ((GenericRowWithSchema) matchingDashboard.getAs("dashboardId")).values()[0]));
} else {
productComponent.setReporting(false);
}
product.addProductComponent(productComponent);
//Debug Statement
LOGGER.debug("Component Name = " + productComponent.getName());
}
});
}
public JavaDStream<Row> createSource(JavaStreamingContext ssc, KafkaSourceConfig08 config, SourceContext context)
{
String topics = requireNonNull(config.getTopics(), "topics not setting");
String brokers = requireNonNull(config.getBrokers(), "brokers not setting"); //需要把集群的host 配置到程序所在机器
String groupId = requireNonNull(config.getGroupid(), "group.id not setting"); //消费者的名字
String offsetMode = requireNonNull(config.getOffsetMode(), "offsetMode not setting");
Map<String, String> otherConfig = config.getOtherConfig().entrySet()
.stream()
.filter(x -> x.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().toString()));
Map<String, String> kafkaParams = new HashMap<>(otherConfig);
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
//kafkaParams.put("auto.commit.enable", true); //不自动提交偏移量
// "fetch.message.max.bytes" ->
// "session.timeout.ms" -> "30000", //session默认是30秒
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetMode); //largest smallest
//----get fromOffsets
@SuppressWarnings("unchecked")
scala.collection.immutable.Map<String, String> map = (scala.collection.immutable.Map<String, String>) Map$.MODULE$.apply(JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toSeq());
final KafkaCluster kafkaCluster = new KafkaCluster(map);
Map<TopicAndPartition, Long> fromOffsets = getFromOffset(kafkaCluster, topics, groupId);
//--- createDirectStream DirectKafkaInputDStream.class
org.apache.spark.api.java.function.Function<MessageAndMetadata<byte[], byte[]>, ConsumerRecord<byte[], byte[]>> messageHandler =
mmd -> new ConsumerRecord<>(mmd.topic(), mmd.partition(), mmd.key(), mmd.message(), mmd.offset());
@SuppressWarnings("unchecked")
Class<ConsumerRecord<byte[], byte[]>> recordClass = (Class<ConsumerRecord<byte[], byte[]>>) ClassTag$.MODULE$.<ConsumerRecord<byte[], byte[]>>apply(ConsumerRecord.class).runtimeClass();
JavaInputDStream<ConsumerRecord<byte[], byte[]>> inputStream = KafkaUtils.createDirectStream(ssc,
byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, recordClass,
kafkaParams, fromOffsets,
messageHandler
);
JavaDStream<ConsumerRecord<byte[], byte[]>> dStream = settingCommit(inputStream, kafkaParams, kafkaCluster, groupId);
if ("json".equalsIgnoreCase(config.getValueType())) {
JsonSchema jsonParser = new JsonSchema(context.getSchema());
return dStream
.map(record -> {
return jsonParser.deserialize(record.key(), record.value(), record.topic(), record.partition(), record.offset());
});
}
else {
StructType structType = schemaToSparkType(context.getSchema());
return dStream
.map(record -> {
String[] names = structType.names();
Object[] values = new Object[names.length];
for (int i = 0; i < names.length; i++) {
switch (names[i]) {
case "_topic":
values[i] = record.topic();
continue;
case "_message":
values[i] = new String(record.value(), UTF_8);
continue;
case "_key":
values[i] = new String(record.key(), UTF_8);
continue;
case "_partition":
values[i] = record.partition();
continue;
case "_offset":
values[i] = record.offset();
default:
values[i] = null;
}
}
return (Row) new GenericRowWithSchema(values, structType);
}); //.window(Duration(10 * 1000))
}
}
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> map = MAPPER.readValue(message, Map.class);
String[] names = rowTypeInfo.names();
Object[] values = new Object[names.length];
for (int i = 0; i < names.length; i++) {
String key = names[i];
switch (key) {
case "_topic":
values[i] = topic;
continue;
case "_message":
values[i] = new String(message, UTF_8);
continue;
case "_key":
values[i] = new String(messageKey, UTF_8);
continue;
case "_partition":
values[i] = partition;
continue;
case "_offset":
values[i] = offset;
continue;
}
Object value = map.get(key);
if (value == null) {
continue;
}
DataType type = rowTypeInfo.apply(i).dataType();
if (type instanceof MapType && ((MapType) type).valueType() == DataTypes.StringType) {
scala.collection.mutable.Map convertValue = new scala.collection.mutable.HashMap(); //必须是scala的map
for (Map.Entry entry : ((Map<?, ?>) value).entrySet()) {
convertValue.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
}
values[i] = convertValue;
}
else if (value instanceof ArrayType) {
//Class<?> aClass = type.getTypeClass();
//values[i] = MAPPER.convertValue(value, aClass);
//todo: Spark List to Array
values[i] = value;
}
else if (type == DataTypes.LongType) {
values[i] = ((Number) value).longValue();
}
else {
values[i] = value;
}
}
return new GenericRowWithSchema(values, rowTypeInfo);
}
private static Dataset<Row> createSource(SparkSession spark, KafkaSourceConfig config, SourceContext context)
{
String topics = config.getTopics();
String brokers = config.getBrokers(); //需要把集群的host 配置到程序所在机器
String groupId = config.getGroupid(); //消费者的名字
String offsetMode = config.getOffsetMode();
checkState(!"largest".equals(offsetMode), "kafka 0.10+, use latest");
checkState(!"smallest".equals(offsetMode), "kafka 0.10+, use earliest");
Map<String, Object> kafkaParams = new HashMap<>(config.getOtherConfig());
kafkaParams.put("subscribe", topics);
kafkaParams.put("kafka.bootstrap.servers", brokers);
kafkaParams.put("startingOffsets", offsetMode); //latest earliest
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class.getName()); //StringDeserializer
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class.getName()); //StringDeserializer
// "fetch.message.max.bytes" ->
// "session.timeout.ms" -> "30000", //session默认是30秒
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
Dataset<Row> inputStream = KafkaSourceUtil.getSource(spark, kafkaParams);
if ("json".equalsIgnoreCase(config.getValueType())) {
JsonSchema jsonParser = new JsonSchema(context.getSchema());
return inputStream
.map((MapFunction<Row, Row>) record -> {
return jsonParser.deserialize(record.getAs("key"),
record.getAs("value"),
record.<String>getAs("topic"),
record.<Integer>getAs("partition"),
record.<Long>getAs("offset"));
}, RowEncoder.apply(jsonParser.getProducedType()));
}
else {
StructType structType = schemaToSparkType(context.getSchema());
return inputStream
.map((MapFunction<Row, Row>) record -> {
String[] names = structType.names();
Object[] values = new Object[names.length];
for (int i = 0; i < names.length; i++) {
switch (names[i]) {
case "_topic":
values[i] = record.<String>getAs("topic");
continue;
case "_message":
values[i] = new String(record.getAs("value"), UTF_8);
continue;
case "_key":
byte[] key = record.getAs("key");
values[i] = key == null ? null : new String(key, UTF_8);
continue;
case "_partition":
values[i] = record.<Integer>getAs("partition");
continue;
case "_offset":
values[i] = record.<Long>getAs("offset");
default:
values[i] = null;
}
}
return (Row) new GenericRowWithSchema(values, structType);
}, RowEncoder.apply(structType));
}
}
@Override
protected Object createComposite(Object[] children) {
return new GenericRowWithSchema(children, (StructType) getDataType());
}
@Override
protected Object createContained(Object[] contained) {
GenericRowWithSchema[] containerArray = new GenericRowWithSchema[contained.length];
for (int i = 0; i < contained.length; i++) {
GenericRowWithSchema[] containerFields =
new GenericRowWithSchema[structTypeHashToIndex.size()];
GenericRowWithSchema containedRow = (GenericRowWithSchema) contained[i];
int containedEntryStructTypeHash = containedRow.schema().hashCode();
containerFields[structTypeHashToIndex.get(containedEntryStructTypeHash)] = containedRow;
containerArray[i] = new GenericRowWithSchema(containerFields, containerType);
}
return containerArray;
}
@Override
protected List<ContainerEntry> getContained(Object container) {
WrappedArray containedArray = (WrappedArray) container;
List<ContainerEntry> containedEntries = new ArrayList<>();
for (int i = 0; i < containedArray.length(); i++) {
GenericRowWithSchema resourceContainer = (GenericRowWithSchema) containedArray.apply(i);
// The number of contained fields will be low, so this nested loop has low cost
for (int j = 0; j < resourceContainer.schema().fields().length; j++) {
if (resourceContainer.get(j) != null) {
GenericRowWithSchema row = (GenericRowWithSchema) resourceContainer.get(j);
String columnName = resourceContainer.schema().fields()[j].name();
containedEntries.add(new ContainerEntry(columnName, row));
break;
}
}
}
return containedEntries;
}