下面列出了org.apache.hadoop.hbase.mapreduce.TableOutputFormat#org.apache.pig.ResourceSchema 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Test invalid Resource Schema: bag without tuple field
* @throws IOException
*/
@Test(expected=FrontendException.class)
public void testToPigSchemaWithInvalidSchema2() throws IOException {
ResourceFieldSchema[] level0 = new ResourceFieldSchema[] {
new ResourceFieldSchema()
.setName("fld0").setType(DataType.CHARARRAY)
};
ResourceSchema rSchema0 = new ResourceSchema()
.setFields(level0);
ResourceFieldSchema[] level2 = new ResourceFieldSchema[] {
new ResourceFieldSchema()
.setName("t2").setType(DataType.BAG).setSchema(rSchema0)
};
}
@Test
public void testPrimitive() throws IOException {
Schema icebergSchema = new Schema(
optional(1, "b", BooleanType.get()),
optional(2, "i", IntegerType.get()),
optional(3, "l", LongType.get()),
optional(4, "f", FloatType.get()),
optional(5, "d", DoubleType.get()),
optional(6, "dec", DecimalType.of(0, 2)),
optional(7, "s", StringType.get()),
optional(8, "bi", BinaryType.get())
);
ResourceSchema pigSchema = SchemaUtil.convert(icebergSchema);
assertEquals(
"b:boolean,i:int,l:long,f:float,d:double,dec:bigdecimal,s:chararray,bi:bytearray", pigSchema.toString());
}
@Test
public void testTupleInMap() throws IOException {
Schema icebergSchema = new Schema(
optional(
1, "nested_list",
MapType.ofOptional(
2, 3,
StringType.get(),
ListType.ofOptional(
4, StructType.of(
required(5, "id", LongType.get()),
optional(6, "data", StringType.get()))))));
ResourceSchema pigSchema = SchemaUtil.convert(icebergSchema);
// The output should contain a nested struct within a list within a map, I think.
assertEquals("nested_list:[{(id:long,data:chararray)}]", pigSchema.toString());
}
public void prepareToWrite(RecordWriter writer) {
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema != null) {
// Parse the schema from the string stored in the properties object.
try {
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
} catch (ParserException pex) {
logger.warn("Could not parse schema for storing.");
}
}
if (headerTreatment == Headers.DEFAULT) {
headerTreatment = Headers.SKIP_OUTPUT_HEADER;
}
// PigStorage's prepareToWrite()
super.prepareToWrite(writer);
}
@Test
public void testPrimitive() throws IOException {
Schema icebergSchema = new Schema(
optional(1, "b", BooleanType.get()),
optional(1, "i", IntegerType.get()),
optional(2, "l", LongType.get()),
optional(3, "f", FloatType.get()),
optional(4, "d", DoubleType.get()),
optional(5, "dec", DecimalType.of(0,2)),
optional(5, "s", StringType.get()),
optional(6,"bi", BinaryType.get())
);
ResourceSchema pigSchema = SchemaUtil.convert(icebergSchema);
assertEquals("b:boolean,i:int,l:long,f:float,d:double,dec:bigdecimal,s:chararray,bi:bytearray", pigSchema.toString());
}
@Test
public void testTupleInMap() throws IOException {
Schema icebergSchema = new Schema(
optional(
1, "nested_list",
MapType.ofOptional(
2, 3,
StringType.get(),
ListType.ofOptional(
4, StructType.of(
required(5, "id", LongType.get()),
optional(6, "data", StringType.get()))))));
ResourceSchema pigSchema = SchemaUtil.convert(icebergSchema);
assertEquals("nested_list:[{(id:long,data:chararray)}]", pigSchema.toString()); // The output should contain a nested struct within a list within a map, I think.
}
/**
* Test invalid Resource Schema: multiple fields for a bag
* @throws IOException
*/
@Test(expected=FrontendException.class)
public void testToPigSchemaWithInvalidSchema() throws IOException {
ResourceFieldSchema[] level0 = new ResourceFieldSchema[] {
new ResourceFieldSchema()
.setName("fld0").setType(DataType.CHARARRAY),
new ResourceFieldSchema()
.setName("fld1").setType(DataType.DOUBLE),
new ResourceFieldSchema()
.setName("fld2").setType(DataType.INTEGER)
};
ResourceSchema rSchema0 = new ResourceSchema()
.setFields(level0);
ResourceFieldSchema[] level2 = new ResourceFieldSchema[] {
new ResourceFieldSchema()
.setName("t2").setType(DataType.BAG).setSchema(rSchema0)
};
}
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
// Store the record writer reference so we can use it when it's time
// to write tuples
this.writer = writer;
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
// Parse the schema from the string stored in the properties object.
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
// Build a Json factory
jsonFactory = new JsonFactory();
}
/**
* Test one-level Pig Schema: multiple fields for a bag
*/
@Test
public void testResourceSchemaWithInvalidPigSchema()
throws FrontendException {
String [] aliases ={"f1", "f2"};
byte[] types = {DataType.CHARARRAY, DataType.INTEGER};
Schema level0 = TypeCheckingTestUtil.genFlatSchema(
aliases,types);
Schema.FieldSchema fld0 =
new Schema.FieldSchema("f0", level0, DataType.BAG);
Schema level1 = new Schema(fld0);
try {
Schema.getPigSchema(new ResourceSchema(level1));
Assert.fail();
} catch(FrontendException e) {
assertTrue(e.getErrorCode()==2218);
}
}
@Override
public final void checkSchema(final ResourceSchema rs) throws IOException {
if (rs == null) {
throw new IOException("checkSchema: called with null ResourceSchema");
}
Schema avroSchema = AvroStorageSchemaConversionUtilities
.resourceSchemaToAvroSchema(rs,
(schemaName == null || schemaName.length() == 0)
? "pig_output" : schemaName,
schemaNameSpace,
Maps.<String, List<Schema>> newHashMap(),
doubleColonsToDoubleUnderscores);
if (avroSchema == null) {
throw new IOException("checkSchema: could not translate ResourceSchema to Avro Schema");
}
setOutputAvroSchema(avroSchema);
}
@SuppressWarnings("unchecked")
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = reader;
// Get the schema string from the UDFContext object.
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
// Parse the schema from the string stored in the properties object.
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
jsonFactory = new JsonFactory();
}
/**
* Test that ResourceSchema is correctly created given a
* pig.Schema and vice versa
*/
@Test
public void testResourceFlatSchemaCreation()
throws ExecException, SchemaMergeException, FrontendException {
String [] aliases ={"f1", "f2"};
byte[] types = {DataType.CHARARRAY, DataType.INTEGER};
Schema origSchema = TypeCheckingTestUtil.genFlatSchema(
aliases,types);
ResourceSchema rsSchema = new ResourceSchema(origSchema);
assertEquals("num fields", aliases.length, rsSchema.getFields().length);
ResourceSchema.ResourceFieldSchema[] fields = rsSchema.getFields();
for (int i=0; i<fields.length; i++) {
assertEquals(fields[i].getName(), aliases[i]);
assertEquals(fields[i].getType(), types[i]);
}
Schema genSchema = Schema.getPigSchema(rsSchema);
assertTrue("generated schema equals original",
Schema.equals(genSchema, origSchema, true, false));
}
@Override
public List<String> getPredicateFields(String location, Job job) throws IOException {
ResourceSchema schema = getSchema(location, job);
List<String> predicateFields = new ArrayList<String>();
for (ResourceFieldSchema field : schema.getFields()) {
switch(field.getType()) {
case DataType.BOOLEAN:
case DataType.INTEGER:
case DataType.LONG:
case DataType.FLOAT:
case DataType.DOUBLE:
case DataType.DATETIME:
case DataType.CHARARRAY:
case DataType.BIGINTEGER:
case DataType.BIGDECIMAL:
predicateFields.add(field.getName());
break;
default:
// Skip DataType.BYTEARRAY, DataType.TUPLE, DataType.MAP and DataType.BAG
break;
}
}
return predicateFields;
}
protected ResourceSchema columnSchema() throws IOException {
ResourceSchema schema = new ResourceSchema();
List<ResourceFieldSchema> fields = new ArrayList<>();
fields.add(field("name", DataType.BYTEARRAY));
fields.add(field("value", DataType.BYTEARRAY));
fields.add(field("ts", DataType.LONG));
fields.add(field("status", DataType.CHARARRAY));
fields.add(field("ttl", DataType.LONG));
ResourceSchema tuple = new ResourceSchema();
tuple.setFields(fields.toArray(new ResourceFieldSchema[0]));
ResourceFieldSchema fs = new ResourceFieldSchema();
fs.setName("column");
fs.setType(DataType.TUPLE);
fs.setSchema(tuple);
fields.clear();
fields.add(fs);
schema.setFields(fields.toArray(new ResourceFieldSchema[0]));
return schema;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
// Save reader to use in getNext()
this.reader = reader;
splitIndex = split.getSplitIndex();
// Get schema from front-end
UDFContext udfc = UDFContext.getUDFContext();
Properties p = udfc.getUDFProperties(this.getClass(), new String[] { udfContextSignature });
String strSchema = p.getProperty(SCHEMA_SIGNATURE);
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}
schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
requiredFields = (boolean[]) ObjectSerializer.deserialize(p.getProperty(REQUIRED_FIELDS_SIGNATURE));
if (requiredFields != null) {
numRequiredFields = 0;
for (int i = 0; i < requiredFields.length; i++) {
if (requiredFields[i])
numRequiredFields++;
}
}
}
public static ResourceFieldSchema getSmallTupDataBagFieldSchema() throws IOException {
ResourceFieldSchema tuplefs = getSmallTupleFieldSchema();
ResourceSchema bagSchema = new ResourceSchema();
bagSchema.setFields(new ResourceFieldSchema[]{tuplefs});
ResourceFieldSchema bagfs = new ResourceFieldSchema();
bagfs.setSchema(bagSchema);
bagfs.setType(DataType.BAG);
return bagfs;
}
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
LOG.info(format("[%s]: getSchema() -> %s", signature, location));
Schema schema = load(location, job).schema();
storeInUDFContext(ICEBERG_SCHEMA, schema);
return SchemaUtil.convert(schema);
}
/**
* Append newly specified schema
*/
@Override
public void checkSchema(ResourceSchema s) throws IOException {
AvroStorageLog.funcCall("Check schema");
Properties property = getUDFProperties();
String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
String key = getSchemaKey();
Map<String, String> schemaMap = (prevSchemaStr != null)
? parseSchemaMap(prevSchemaStr)
: null;
if (schemaMap != null && schemaMap.containsKey(key)) {
AvroStorageLog.warn("Duplicate value for key-" + key + ". Will ignore the new schema.");
return;
}
/* validate and convert output schema */
Schema schema = outputAvroSchema != null
? (checkSchema
? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
: outputAvroSchema)
: PigSchema2Avro.convert(s, nullable);
AvroStorageLog.info("key=" + key + " outputSchema=" + schema);
String schemaStr = schema.toString();
String append = key + SCHEMA_KEYVALUE_DELIM + schemaStr;
String newSchemaStr = (schemaMap != null)
? prevSchemaStr + SCHEMA_DELIM + append
: append;
property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
AvroStorageLog.details("New schemas=" + newSchemaStr);
}
public void checkSchema(ResourceSchema s) throws IOException {
// Not actually checking schema
// Actually, just storing it to use in the backend
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
p.setProperty(SCHEMA_SIGNATURE, s.toString());
}
private ResourceSchema createSchema(String schema) {
try {
return new ResourceSchema(Utils.getSchemaFromString(schema));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;
final String resourceSchemaAsStr = getValueFromUDFContext(this.contextSignature,RESOURCE_SCHEMA_SIGNATURE);
if (resourceSchemaAsStr == null) {
throw new IOException("Could not find schema in UDF context");
}
schema = (ResourceSchema)ObjectSerializer.deserialize(resourceSchemaAsStr);
}
@Test
public void testMapLongValueType() throws Exception{
String myMap = "[key1#1l]";
Schema schema = Utils.getSchemaFromString("m:map[long]");
ResourceFieldSchema rfs = new ResourceSchema(schema).getFields()[0];
Map<String, Object> map = ps.getLoadCaster().bytesToMap(myMap.getBytes(), rfs);
String key = map.keySet().iterator().next();
Object v = map.get("key1");
assertEquals("key1", key);
assertTrue(v instanceof Long);
String value = String.valueOf(v);
assertEquals("1", value);
}
public static ResourceSchema getResourceSchema(final Configuration configuration) throws IOException {
final ResourceSchema schema = new ResourceSchema();
try {
List<ColumnInfo> columns = null;
final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration);
if(SchemaType.QUERY.equals(schemaType)) {
final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration);
Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration);
columns = function.apply(sqlQuery);
} else {
columns = PhoenixConfigurationUtil.getSelectColumnMetadataList(configuration);
}
ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
int i = 0;
for(ColumnInfo cinfo : columns) {
int sqlType = cinfo.getSqlType();
PDataType phoenixDataType = PDataType.fromTypeId(sqlType);
byte pigType = TypeUtil.getPigDataTypeForPhoenixType(phoenixDataType);
ResourceFieldSchema field = new ResourceFieldSchema();
field.setType(pigType).setName(cinfo.getDisplayName());
fields[i++] = field;
}
schema.setFields(fields);
} catch(SQLException sqle) {
LOG.error(String.format("Error: SQLException [%s] ",sqle.getMessage()));
throw new IOException(sqle);
}
return schema;
}
/**
* Convert an Avro schema to a Pig schema
*/
public static ResourceSchema convert(Schema schema) throws IOException
{
if (AvroStorageUtils.containsGenericUnion(schema))
throw new IOException("We don't accept schema containing generic unions.");
Set<Schema> visitedRecords = new HashSet<Schema>();
ResourceFieldSchema inSchema = inconvert(schema, FIELD, visitedRecords);
ResourceSchema tupleSchema;
if (inSchema.getType() == DataType.TUPLE)
{
tupleSchema = inSchema.getSchema();
}
else
{ // other typs
ResourceFieldSchema tupleWrapper = AvroStorageUtils.wrapAsTuple(inSchema);
ResourceSchema topSchema = new ResourceSchema();
topSchema.setFields(new ResourceFieldSchema[] { tupleWrapper });
tupleSchema = topSchema;
}
return tupleSchema;
}
/**
* Add a field schema to a bag schema
*/
static protected void add2BagSchema(ResourceFieldSchema fieldSchema,
ResourceFieldSchema subFieldSchema) throws IOException
{
ResourceFieldSchema wrapped =
(subFieldSchema.getType() == DataType.TUPLE) ? subFieldSchema
: AvroStorageUtils.wrapAsTuple(subFieldSchema);
ResourceSchema listSchema = new ResourceSchema();
listSchema.setFields(new ResourceFieldSchema[] { wrapped });
fieldSchema.setSchema(listSchema);
}
/**
* Convert a pig ResourceSchema to avro schema
*
*/
public static Schema convert(ResourceSchema pigSchema, boolean nullable) throws IOException {
ResourceFieldSchema[] pigFields = pigSchema.getFields();
/* remove the pig tuple wrapper */
if (pigFields.length == 1) {
AvroStorageLog.details("Ignore the pig tuple wrapper.");
return convert(pigFields[0], nullable);
} else
return convertRecord(pigFields, nullable);
}
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
ResourceSchema resourceSchema = new ResourceSchema();
List<ResourceFieldSchema> fields = new ArrayList<>();
fields.add(field("key", DataType.BYTEARRAY));
fields.add(field("deletedat", DataType.LONG));
fields.add(subfield("map_columns", DataType.MAP, columnSchema()));
fields.add(subfield("bag_columns", DataType.BAG, columnSchema()));
resourceSchema.setFields(fields.toArray(new ResourceFieldSchema[0]));
return resourceSchema;
}
/** wrap a pig schema as tuple */
public static ResourceFieldSchema wrapAsTuple(ResourceFieldSchema subFieldSchema) throws IOException {
ResourceSchema listSchema = new ResourceSchema();
listSchema.setFields(new ResourceFieldSchema[] { subFieldSchema });
ResourceFieldSchema tupleWrapper = new ResourceFieldSchema();
tupleWrapper.setType(DataType.TUPLE);
tupleWrapper.setName(PIG_TUPLE_WRAPPER);
tupleWrapper.setSchema(listSchema);
return tupleWrapper;
}
/** schema: (value, value, value) where keys are in the front. */
public ResourceSchema getSchema(String location, Job job) throws IOException
{
setLocation(location, job);
CfInfo cfInfo = getCfInfo(loadSignature);
CfDef cfDef = cfInfo.cfDef;
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
// get default marshallers and validators
Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
// will contain all fields for this schema
List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
for (ColumnDef cdef : cfDef.column_metadata)
{
ResourceFieldSchema valSchema = new ResourceFieldSchema();
AbstractType validator = validators.get(cdef.name);
if (validator == null)
validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
valSchema.setName(new String(cdef.getName()));
valSchema.setType(getPigType(validator));
allSchemaFields.add(valSchema);
}
// top level schema contains everything
schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
return schema;
}
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
if (LOG.isDebugEnabled()) {
String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
LOG.debug("LoadMetadata.getSchema({}, {})", location, jobToString);
}
setInput(location, job);
return new ResourceSchema(schema);
}