下面列出了怎么用org.apache.spark.sql.catalyst.expressions.GenericInternalRow的API类实例代码及写法,或者点击链接到github查看源代码。
static GenericInternalRow convertAll(FieldList fieldList,
GenericRecord record,
List<String> namesInOrder) {
Map<String, Object> fieldMap = new HashMap<>();
fieldList.stream().forEach(field ->
fieldMap.put(field.getName(), convert(field, record.get(field.getName()))));
Object[] values = new Object[namesInOrder.size()];
for (int i = 0; i < namesInOrder.size(); i++) {
values[i] = fieldMap.get(namesInOrder.get(i));
}
return new GenericInternalRow(values);
}
PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
StructField[] fields = partitionType.fields();
this.types = new DataType[fields.length];
this.positions = new int[types.length];
this.javaTypes = new Class<?>[types.length];
this.reusedRow = new GenericInternalRow(types.length);
List<PartitionField> partitionFields = spec.fields();
for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
this.types[rowIndex] = fields[rowIndex].dataType();
int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
PartitionField field = spec.fields().get(specIndex);
if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
positions[rowIndex] = specIndex;
javaTypes[rowIndex] = spec.javaClasses()[specIndex];
break;
}
}
}
}
@Override
protected GenericInternalRow newStructData(InternalRow reuse) {
if (reuse instanceof GenericInternalRow) {
return (GenericInternalRow) reuse;
} else {
return new GenericInternalRow(numFields);
}
}
@Override
protected InternalRow reuseOrCreate(Object reuse) {
if (reuse instanceof GenericInternalRow && ((GenericInternalRow) reuse).numFields() == numFields) {
return (InternalRow) reuse;
}
return new GenericInternalRow(numFields);
}
@Override
protected GenericInternalRow newStructData(InternalRow reuse) {
if (reuse instanceof GenericInternalRow) {
return (GenericInternalRow) reuse;
} else {
return new GenericInternalRow(numFields);
}
}
public static Class[] getSerializableClasses() {
return new Class[]{
Instance.class, Predicate.class, RelationPredicate.class, RelationRow.class,
TypeID.class, HashMap.class, HashSet.class, LiteralType.class, Object[].class,
InternalRow[].class, GenericInternalRow.class, IndexMap.class, Quad.class
};
}
@Override
public InternalRow serialize(final T obj) {
final byte[] bytes = new TWKBWriter().write(obj);
final InternalRow returnRow = new GenericInternalRow(bytes.length);
returnRow.update(0, bytes);
return returnRow;
}
@Override
protected int run() throws Exception {
SparkConf sparkConf = new SparkConf()
.setAppName("Data2CoNLL")
.set("spark.hadoop.validateOutputSpecs", "false")
.set("spark.yarn.executor.memoryOverhead", "3072")
.set("spark.rdd.compress", "true")
.set("spark.core.connection.ack.wait.timeout", "600")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(new Class[] {SCAS.class, LabeledPoint.class, SparseVector.class, int[].class, double[].class,
InternalRow[].class, GenericInternalRow.class, Object[].class, GenericArrayData.class,
VectorIndexer.class})
;//.setMaster("local[4]"); //Remove this if you run it on the server.
JavaSparkContext sc = new JavaSparkContext(sparkConf);
int totalCores = Integer.parseInt(sc.getConf().get("spark.executor.instances"))
* Integer.parseInt(sc.getConf().get("spark.executor.cores"));
FileSystem fs = FileSystem.get(new Configuration());
int partitionNumber = 3 * totalCores;
if(partitions != null) {
partitionNumber = partitions;
}
//Read training documents serialized as SCAS
JavaRDD<SCAS> documents = sc.sequenceFile(input, Text.class, SCAS.class, partitionNumber).values();
JavaRDD<String> docStrings = documents.map( s -> {
JCas jCas = s.getJCas();
NYTArticleMetaData metadata = JCasUtil.selectSingle(jCas, NYTArticleMetaData.class);
StringJoiner docBuilder = new StringJoiner("\n");
docBuilder.add("-DOCSTART- (" + metadata.getGuid() + ")");
docBuilder.add("");
Collection<Sentence> sentences = JCasUtil.select(jCas, Sentence.class);
for(Sentence sentence: sentences) {
List<Token> tokens = JCasUtil.selectCovered(jCas, Token.class, sentence);
for(Token token: tokens) {
CoreLabel taggedWord = CoreNlpUtils.tokenToWord(token);
StringJoiner lineBuilder = new StringJoiner("\t");
lineBuilder.add(taggedWord.word().toLowerCase());
docBuilder.add(lineBuilder.toString());
}
docBuilder.add("");
}
return docBuilder.toString();
});
docStrings.saveAsTextFile(output);
sc.stop();
return 0;
}
@Override
protected int run() throws Exception {
SparkConf sparkConf = new SparkConf()
.setAppName("EntitySalienceTrainingSparkRunner")
.set("spark.hadoop.validateOutputSpecs", "false")
.set("spark.yarn.executor.memoryOverhead", "3072")
.set("spark.rdd.compress", "true")
.set("spark.core.connection.ack.wait.timeout", "600")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(new Class[] {SCAS.class, LabeledPoint.class, SparseVector.class, int[].class, double[].class,
InternalRow[].class, GenericInternalRow.class, Object[].class, GenericArrayData.class,
VectorIndexer.class})
;//.setMaster("local[4]"); //Remove this if you run it on the server.
TrainingSettings trainingSettings = new TrainingSettings();
if(folds != null) {
trainingSettings.setNumFolds(folds);
}
if(method != null) {
trainingSettings.setClassificationMethod(TrainingSettings.ClassificationMethod.valueOf(method));
}
if(defaultConf != null) {
trainingSettings.setAidaDefaultConf(defaultConf);
}
if(scalingFactor != null) {
trainingSettings.setPositiveInstanceScalingFactor(scalingFactor);
}
JavaSparkContext sc = new JavaSparkContext(sparkConf);
int totalCores = Integer.parseInt(sc.getConf().get("spark.executor.instances"))
* Integer.parseInt(sc.getConf().get("spark.executor.cores"));
// int totalCores = 4;
//// trainingSettings.setFeatureExtractor(TrainingSettings.FeatureExtractor.ANNOTATE_AND_ENTITY_SALIENCE);
//// trainingSettings.setAidaDefaultConf("db");
// //trainingSettings.setClassificationMethod(TrainingSettings.ClassificationMethod.LOG_REG);
// trainingSettings.setPositiveInstanceScalingFactor(1);
//Add the cache files to each node only if annotation is required.
//The input documents could already be annotated, and in this case no caches are needed.
if(trainingSettings.getFeatureExtractor().equals(TrainingSettings.FeatureExtractor.ANNOTATE_AND_ENTITY_SALIENCE)) {
sc.addFile(trainingSettings.getBigramCountCache());
sc.addFile(trainingSettings.getKeywordCountCache());
sc.addFile(trainingSettings.getWordContractionsCache());
sc.addFile(trainingSettings.getWordExpansionsCache());
if (trainingSettings.getAidaDefaultConf().equals("db")) {
sc.addFile(trainingSettings.getDatabaseAida());
} else {
sc.addFile(trainingSettings.getCassandraConfig());
}
}
SQLContext sqlContext = new SQLContext(sc);
FileSystem fs = FileSystem.get(new Configuration());
int partitionNumber = 3 * totalCores;
if(partitions != null) {
partitionNumber = partitions;
}
//Read training documents serialized as SCAS
JavaRDD<SCAS> documents = sc.sequenceFile(input, Text.class, SCAS.class, partitionNumber).values();
//Instanciate a training spark runner
TrainingSparkRunner trainingSparkRunner = new TrainingSparkRunner();
//Train a model
CrossValidatorModel model = trainingSparkRunner.crossValidate(sc, sqlContext, documents, trainingSettings);
//Create the model path
String modelPath = output+"/"+sc.getConf().getAppId()+"/model_"+trainingSettings.getClassificationMethod();
//Delete the old model if there is one
fs.delete(new Path(modelPath), true);
//Save the new model model
List<Model> models = new ArrayList<>();
models.add(model.bestModel());
sc.parallelize(models, 1).saveAsObjectFile(modelPath);
//Save the model stats
SparkClassificationModel.saveStats(model, trainingSettings, output+"/"+sc.getConf().getAppId()+"/");
return 0;
}
@Override
protected int run() throws Exception {
SparkConf sparkConf = new SparkConf()
.setAppName("EntitySalienceTrainingSparkRunner")
.set("spark.hadoop.validateOutputSpecs", "false")
//.set("spark.yarn.executor.memoryOverhead", "4096")
.set("spark.rdd.compress", "true")
.set("spark.core.connection.ack.wait.timeout", "600")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(new Class[] {SCAS.class, LabeledPoint.class, SparseVector.class, int[].class, double[].class,
InternalRow[].class, GenericInternalRow.class, Object[].class, GenericArrayData.class,
VectorIndexer.class})
;//setMaster("local"); //Remove this if you run it on the server.
TrainingSettings trainingSettings = new TrainingSettings();
if(defaultConf != null) {
trainingSettings.setAidaDefaultConf(defaultConf);
}
JavaSparkContext sc = new JavaSparkContext(sparkConf);
int totalCores = Integer.parseInt(sc.getConf().get("spark.executor.instances"))
* Integer.parseInt(sc.getConf().get("spark.executor.cores"));
// int totalCores = 2;
//trainingSettings.setClassificationMethod(TrainingSettings.ClassificationMethod.LOG_REG);
trainingSettings.setPositiveInstanceScalingFactor(1);
if(trainingSettings.getFeatureExtractor().equals(TrainingSettings.FeatureExtractor.ANNOTATE_AND_ENTITY_SALIENCE)) {
sc.addFile(trainingSettings.getBigramCountCache());
sc.addFile(trainingSettings.getKeywordCountCache());
sc.addFile(trainingSettings.getWordContractionsCache());
sc.addFile(trainingSettings.getWordExpansionsCache());
if (trainingSettings.getAidaDefaultConf().equals("db")) {
sc.addFile(trainingSettings.getDatabaseAida());
} else {
sc.addFile(trainingSettings.getCassandraConfig());
}
}
SQLContext sqlContext = new SQLContext(sc);
int partitionNumber = 3 * totalCores;
//Read training documents serialized as SCAS
JavaPairRDD<Text, SCAS> documents = sc.sequenceFile(input, Text.class, SCAS.class, partitionNumber);
//Instanciate a training spark runner
TrainingSparkRunner trainingSparkRunner = new TrainingSparkRunner();
PipelineModel trainingModel = (PipelineModel) sc.objectFile(model).first();
//Evaluate the model and write down the evaluation metrics.
trainingSparkRunner.evaluate(sc, sqlContext, documents, trainingModel, trainingSettings, output+"/"+sc.getConf().getAppId()+"/");
return 0;
}
@Override
protected Object getField(GenericInternalRow intermediate, int pos) {
return intermediate.genericGet(pos);
}
@Override
protected InternalRow buildStruct(GenericInternalRow struct) {
return struct;
}
@Override
protected void set(GenericInternalRow row, int pos, Object value) {
row.update(pos, value);
}
@Override
protected void setNull(GenericInternalRow row, int pos) {
row.setNullAt(pos);
}
@Override
protected void setBoolean(GenericInternalRow row, int pos, boolean value) {
row.setBoolean(pos, value);
}
@Override
protected void setInteger(GenericInternalRow row, int pos, int value) {
row.setInt(pos, value);
}
@Override
protected void setLong(GenericInternalRow row, int pos, long value) {
row.setLong(pos, value);
}
@Override
protected void setFloat(GenericInternalRow row, int pos, float value) {
row.setFloat(pos, value);
}
@Override
protected void setDouble(GenericInternalRow row, int pos, double value) {
row.setDouble(pos, value);
}
@Override
protected InternalRow create() {
return new GenericInternalRow(numFields);
}
@Override
protected Object getField(GenericInternalRow intermediate, int pos) {
return intermediate.genericGet(pos);
}
@Override
protected InternalRow buildStruct(GenericInternalRow struct) {
return struct;
}
@Override
protected void set(GenericInternalRow row, int pos, Object value) {
row.update(pos, value);
}
@Override
protected void setNull(GenericInternalRow row, int pos) {
row.setNullAt(pos);
}
@Override
protected void setBoolean(GenericInternalRow row, int pos, boolean value) {
row.setBoolean(pos, value);
}
@Override
protected void setInteger(GenericInternalRow row, int pos, int value) {
row.setInt(pos, value);
}
@Override
protected void setLong(GenericInternalRow row, int pos, long value) {
row.setLong(pos, value);
}
@Override
protected void setFloat(GenericInternalRow row, int pos, float value) {
row.setFloat(pos, value);
}
@Override
protected void setDouble(GenericInternalRow row, int pos, double value) {
row.setDouble(pos, value);
}