下面列出了怎么用org.apache.spark.sql.catalyst.util.GenericArrayData的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public MapData nonNullRead(ColumnVector vector, int row) {
MapColumnVector mapVector = (MapColumnVector) vector;
int offset = (int) mapVector.offsets[row];
long length = mapVector.lengths[row];
List<Object> keys = Lists.newArrayListWithExpectedSize((int) length);
List<Object> values = Lists.newArrayListWithExpectedSize((int) length);
for (int c = 0; c < length; c++) {
keys.add(keyReader.read(mapVector.keys, offset + c));
values.add(valueReader.read(mapVector.values, offset + c));
}
return new ArrayBasedMapData(
new GenericArrayData(keys.toArray()),
new GenericArrayData(values.toArray()));
}
@Override
public GenericArrayData read(Decoder decoder, Object reuse) throws IOException {
reusedList.clear();
long chunkLength = decoder.readArrayStart();
while (chunkLength > 0) {
for (int i = 0; i < chunkLength; i += 1) {
reusedList.add(elementReader.read(decoder, null));
}
chunkLength = decoder.arrayNext();
}
// this will convert the list to an array so it is okay to reuse the list
return new GenericArrayData(reusedList.toArray());
}
@Override
public ArrayBasedMapData read(Decoder decoder, Object reuse) throws IOException {
reusedKeyList.clear();
reusedValueList.clear();
long chunkLength = decoder.readArrayStart();
while (chunkLength > 0) {
for (int i = 0; i < chunkLength; i += 1) {
reusedKeyList.add(keyReader.read(decoder, null));
reusedValueList.add(valueReader.read(decoder, null));
}
chunkLength = decoder.arrayNext();
}
return new ArrayBasedMapData(
new GenericArrayData(reusedKeyList.toArray()),
new GenericArrayData(reusedValueList.toArray()));
}
@Override
public ArrayBasedMapData read(Decoder decoder, Object reuse) throws IOException {
reusedKeyList.clear();
reusedValueList.clear();
long chunkLength = decoder.readMapStart();
while (chunkLength > 0) {
for (int i = 0; i < chunkLength; i += 1) {
reusedKeyList.add(keyReader.read(decoder, null));
reusedValueList.add(valueReader.read(decoder, null));
}
chunkLength = decoder.mapNext();
}
return new ArrayBasedMapData(
new GenericArrayData(reusedKeyList.toArray()),
new GenericArrayData(reusedValueList.toArray()));
}
@SuppressWarnings("unchecked")
private <T> GenericArrayData fillArray(Collection<?> values, Function<Object[], BiConsumer<Integer, T>> makeSetter) {
Object[] array = new Object[values.size()];
BiConsumer<Integer, T> setter = makeSetter.apply(array);
int index = 0;
for (Object value : values) {
if (value == null) {
array[index] = null;
} else {
setter.accept(index, (T) value);
}
index += 1;
}
return new GenericArrayData(array);
}
@Override
public GenericArrayData read(Decoder decoder, Object reuse) throws IOException {
reusedList.clear();
long chunkLength = decoder.readArrayStart();
while (chunkLength > 0) {
for (int i = 0; i < chunkLength; i += 1) {
reusedList.add(elementReader.read(decoder, null));
}
chunkLength = decoder.arrayNext();
}
// this will convert the list to an array so it is okay to reuse the list
return new GenericArrayData(reusedList.toArray());
}
@Override
public ArrayBasedMapData read(Decoder decoder, Object reuse) throws IOException {
reusedKeyList.clear();
reusedValueList.clear();
long chunkLength = decoder.readArrayStart();
while (chunkLength > 0) {
for (int i = 0; i < chunkLength; i += 1) {
reusedKeyList.add(keyReader.read(decoder, null));
reusedValueList.add(valueReader.read(decoder, null));
}
chunkLength = decoder.arrayNext();
}
return new ArrayBasedMapData(
new GenericArrayData(reusedKeyList.toArray()),
new GenericArrayData(reusedValueList.toArray()));
}
@Override
public ArrayBasedMapData read(Decoder decoder, Object reuse) throws IOException {
reusedKeyList.clear();
reusedValueList.clear();
long chunkLength = decoder.readMapStart();
while (chunkLength > 0) {
for (int i = 0; i < chunkLength; i += 1) {
reusedKeyList.add(keyReader.read(decoder, null));
reusedValueList.add(valueReader.read(decoder, null));
}
chunkLength = decoder.mapNext();
}
return new ArrayBasedMapData(
new GenericArrayData(reusedKeyList.toArray()),
new GenericArrayData(reusedValueList.toArray()));
}
@Override
public ArrayData nonNullRead(ColumnVector vector, int row) {
ListColumnVector listVector = (ListColumnVector) vector;
int offset = (int) listVector.offsets[row];
int length = (int) listVector.lengths[row];
List<Object> elements = Lists.newArrayListWithExpectedSize(length);
for (int c = 0; c < length; ++c) {
elements.add(elementReader.read(listVector.child, offset + c));
}
return new GenericArrayData(elements.toArray());
}
static Object convert(Field field, Object value) {
if (value == null) {
return null;
}
if (field.getMode() == Field.Mode.REPEATED) {
// rather than recurring down we strip off the repeated mode
// Due to serialization issues, reconstruct the type using reflection:
// See: https://github.com/googleapis/google-cloud-java/issues/3942
LegacySQLTypeName fType = LegacySQLTypeName.valueOfStrict(field.getType().name());
Field nestedField = Field.newBuilder(field.getName(), fType, field.getSubFields())
// As long as this is not repeated it works, but technically arrays cannot contain
// nulls, so select required instead of nullable.
.setMode(Field.Mode.REQUIRED)
.build();
List<Object> valueList = (List<Object>) value;
return new GenericArrayData(valueList.stream().map(v -> convert(nestedField, v)).collect(Collectors.toList()));
}
if (LegacySQLTypeName.INTEGER.equals(field.getType()) ||
LegacySQLTypeName.FLOAT.equals(field.getType()) ||
LegacySQLTypeName.BOOLEAN.equals(field.getType()) ||
LegacySQLTypeName.DATE.equals(field.getType()) ||
LegacySQLTypeName.TIME.equals(field.getType()) ||
LegacySQLTypeName.TIMESTAMP.equals(field.getType())) {
return value;
}
if (LegacySQLTypeName.STRING.equals(field.getType()) ||
LegacySQLTypeName.DATETIME.equals(field.getType()) ||
LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) {
return UTF8String.fromBytes(((Utf8) value).getBytes());
}
if (LegacySQLTypeName.BYTES.equals(field.getType())) {
return getBytes((ByteBuffer) value);
}
if (LegacySQLTypeName.NUMERIC.equals(field.getType())) {
byte[] bytes = getBytes((ByteBuffer) value);
BigDecimal b = new BigDecimal(new BigInteger(bytes), BQ_NUMERIC_SCALE);
Decimal d = Decimal.apply(b, BQ_NUMERIC_PRECISION, BQ_NUMERIC_SCALE);
return d;
}
if (LegacySQLTypeName.RECORD.equals(field.getType())) {
return convertAll(field.getSubFields(),
(GenericRecord) value,
field.getSubFields().stream().map(f -> f.getName()).collect(Collectors.toList()));
}
throw new IllegalStateException("Unexpected type: " + field.getType());
}
@Override
protected int run() throws Exception {
SparkConf sparkConf = new SparkConf()
.setAppName("Data2CoNLL")
.set("spark.hadoop.validateOutputSpecs", "false")
.set("spark.yarn.executor.memoryOverhead", "3072")
.set("spark.rdd.compress", "true")
.set("spark.core.connection.ack.wait.timeout", "600")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(new Class[] {SCAS.class, LabeledPoint.class, SparseVector.class, int[].class, double[].class,
InternalRow[].class, GenericInternalRow.class, Object[].class, GenericArrayData.class,
VectorIndexer.class})
;//.setMaster("local[4]"); //Remove this if you run it on the server.
JavaSparkContext sc = new JavaSparkContext(sparkConf);
int totalCores = Integer.parseInt(sc.getConf().get("spark.executor.instances"))
* Integer.parseInt(sc.getConf().get("spark.executor.cores"));
FileSystem fs = FileSystem.get(new Configuration());
int partitionNumber = 3 * totalCores;
if(partitions != null) {
partitionNumber = partitions;
}
//Read training documents serialized as SCAS
JavaRDD<SCAS> documents = sc.sequenceFile(input, Text.class, SCAS.class, partitionNumber).values();
JavaRDD<String> docStrings = documents.map( s -> {
JCas jCas = s.getJCas();
NYTArticleMetaData metadata = JCasUtil.selectSingle(jCas, NYTArticleMetaData.class);
StringJoiner docBuilder = new StringJoiner("\n");
docBuilder.add("-DOCSTART- (" + metadata.getGuid() + ")");
docBuilder.add("");
Collection<Sentence> sentences = JCasUtil.select(jCas, Sentence.class);
for(Sentence sentence: sentences) {
List<Token> tokens = JCasUtil.selectCovered(jCas, Token.class, sentence);
for(Token token: tokens) {
CoreLabel taggedWord = CoreNlpUtils.tokenToWord(token);
StringJoiner lineBuilder = new StringJoiner("\t");
lineBuilder.add(taggedWord.word().toLowerCase());
docBuilder.add(lineBuilder.toString());
}
docBuilder.add("");
}
return docBuilder.toString();
});
docStrings.saveAsTextFile(output);
sc.stop();
return 0;
}
@Override
protected int run() throws Exception {
SparkConf sparkConf = new SparkConf()
.setAppName("EntitySalienceTrainingSparkRunner")
.set("spark.hadoop.validateOutputSpecs", "false")
.set("spark.yarn.executor.memoryOverhead", "3072")
.set("spark.rdd.compress", "true")
.set("spark.core.connection.ack.wait.timeout", "600")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(new Class[] {SCAS.class, LabeledPoint.class, SparseVector.class, int[].class, double[].class,
InternalRow[].class, GenericInternalRow.class, Object[].class, GenericArrayData.class,
VectorIndexer.class})
;//.setMaster("local[4]"); //Remove this if you run it on the server.
TrainingSettings trainingSettings = new TrainingSettings();
if(folds != null) {
trainingSettings.setNumFolds(folds);
}
if(method != null) {
trainingSettings.setClassificationMethod(TrainingSettings.ClassificationMethod.valueOf(method));
}
if(defaultConf != null) {
trainingSettings.setAidaDefaultConf(defaultConf);
}
if(scalingFactor != null) {
trainingSettings.setPositiveInstanceScalingFactor(scalingFactor);
}
JavaSparkContext sc = new JavaSparkContext(sparkConf);
int totalCores = Integer.parseInt(sc.getConf().get("spark.executor.instances"))
* Integer.parseInt(sc.getConf().get("spark.executor.cores"));
// int totalCores = 4;
//// trainingSettings.setFeatureExtractor(TrainingSettings.FeatureExtractor.ANNOTATE_AND_ENTITY_SALIENCE);
//// trainingSettings.setAidaDefaultConf("db");
// //trainingSettings.setClassificationMethod(TrainingSettings.ClassificationMethod.LOG_REG);
// trainingSettings.setPositiveInstanceScalingFactor(1);
//Add the cache files to each node only if annotation is required.
//The input documents could already be annotated, and in this case no caches are needed.
if(trainingSettings.getFeatureExtractor().equals(TrainingSettings.FeatureExtractor.ANNOTATE_AND_ENTITY_SALIENCE)) {
sc.addFile(trainingSettings.getBigramCountCache());
sc.addFile(trainingSettings.getKeywordCountCache());
sc.addFile(trainingSettings.getWordContractionsCache());
sc.addFile(trainingSettings.getWordExpansionsCache());
if (trainingSettings.getAidaDefaultConf().equals("db")) {
sc.addFile(trainingSettings.getDatabaseAida());
} else {
sc.addFile(trainingSettings.getCassandraConfig());
}
}
SQLContext sqlContext = new SQLContext(sc);
FileSystem fs = FileSystem.get(new Configuration());
int partitionNumber = 3 * totalCores;
if(partitions != null) {
partitionNumber = partitions;
}
//Read training documents serialized as SCAS
JavaRDD<SCAS> documents = sc.sequenceFile(input, Text.class, SCAS.class, partitionNumber).values();
//Instanciate a training spark runner
TrainingSparkRunner trainingSparkRunner = new TrainingSparkRunner();
//Train a model
CrossValidatorModel model = trainingSparkRunner.crossValidate(sc, sqlContext, documents, trainingSettings);
//Create the model path
String modelPath = output+"/"+sc.getConf().getAppId()+"/model_"+trainingSettings.getClassificationMethod();
//Delete the old model if there is one
fs.delete(new Path(modelPath), true);
//Save the new model model
List<Model> models = new ArrayList<>();
models.add(model.bestModel());
sc.parallelize(models, 1).saveAsObjectFile(modelPath);
//Save the model stats
SparkClassificationModel.saveStats(model, trainingSettings, output+"/"+sc.getConf().getAppId()+"/");
return 0;
}
@Override
protected int run() throws Exception {
SparkConf sparkConf = new SparkConf()
.setAppName("EntitySalienceTrainingSparkRunner")
.set("spark.hadoop.validateOutputSpecs", "false")
//.set("spark.yarn.executor.memoryOverhead", "4096")
.set("spark.rdd.compress", "true")
.set("spark.core.connection.ack.wait.timeout", "600")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(new Class[] {SCAS.class, LabeledPoint.class, SparseVector.class, int[].class, double[].class,
InternalRow[].class, GenericInternalRow.class, Object[].class, GenericArrayData.class,
VectorIndexer.class})
;//setMaster("local"); //Remove this if you run it on the server.
TrainingSettings trainingSettings = new TrainingSettings();
if(defaultConf != null) {
trainingSettings.setAidaDefaultConf(defaultConf);
}
JavaSparkContext sc = new JavaSparkContext(sparkConf);
int totalCores = Integer.parseInt(sc.getConf().get("spark.executor.instances"))
* Integer.parseInt(sc.getConf().get("spark.executor.cores"));
// int totalCores = 2;
//trainingSettings.setClassificationMethod(TrainingSettings.ClassificationMethod.LOG_REG);
trainingSettings.setPositiveInstanceScalingFactor(1);
if(trainingSettings.getFeatureExtractor().equals(TrainingSettings.FeatureExtractor.ANNOTATE_AND_ENTITY_SALIENCE)) {
sc.addFile(trainingSettings.getBigramCountCache());
sc.addFile(trainingSettings.getKeywordCountCache());
sc.addFile(trainingSettings.getWordContractionsCache());
sc.addFile(trainingSettings.getWordExpansionsCache());
if (trainingSettings.getAidaDefaultConf().equals("db")) {
sc.addFile(trainingSettings.getDatabaseAida());
} else {
sc.addFile(trainingSettings.getCassandraConfig());
}
}
SQLContext sqlContext = new SQLContext(sc);
int partitionNumber = 3 * totalCores;
//Read training documents serialized as SCAS
JavaPairRDD<Text, SCAS> documents = sc.sequenceFile(input, Text.class, SCAS.class, partitionNumber);
//Instanciate a training spark runner
TrainingSparkRunner trainingSparkRunner = new TrainingSparkRunner();
PipelineModel trainingModel = (PipelineModel) sc.objectFile(model).first();
//Evaluate the model and write down the evaluation metrics.
trainingSparkRunner.evaluate(sc, sqlContext, documents, trainingModel, trainingSettings, output+"/"+sc.getConf().getAppId()+"/");
return 0;
}
@Override
public ArrayData copy() {
return new GenericArrayData(array());
}
@Override
public ArrayData copy() {
return new GenericArrayData(array());
}