下面列出了org.apache.hadoop.mapred.JobContextImpl#org.apache.hadoop.hive.ql.exec.FileSinkOperator 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Write a file that contains a number of rows with 1 BIGINT column, and some rows have null values.
*/
private static TempFile createSingleColumnFileWithNullValues(int rows)
throws IOException, SerDeException
{
Serializer serde = new OrcSerde();
TempFile tempFile = new TempFile();
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(tempFile.getFile(), ORC_12, CompressionKind.NONE, BIGINT);
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", BIGINT);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < rows; i++) {
if (i % 10 == 0) {
objectInspector.setStructFieldData(row, field, null);
}
else {
objectInspector.setStructFieldData(row, field, (long) i);
}
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
return tempFile;
}
/**
* Write a file that contains a number of rows with 1 VARCHAR column, and all values are not null.
*/
private static TempFile createSingleColumnVarcharFile(int count, int length)
throws Exception
{
Serializer serde = new OrcSerde();
TempFile tempFile = new TempFile();
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(tempFile.getFile(), ORC_12, CompressionKind.NONE, VARCHAR);
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", VARCHAR);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < count; i++) {
objectInspector.setStructFieldData(row, field, Strings.repeat("0", length));
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
return tempFile;
}
private static FileSinkOperator.RecordWriter createOrcRecordWriter(File outputFile, Format format, CompressionKind compression, ObjectInspector columnObjectInspector)
throws IOException
{
JobConf jobConf = new JobConf();
OrcConf.WRITE_FORMAT.setString(jobConf, format == ORC_12 ? "0.12" : "0.11");
OrcConf.COMPRESS.setString(jobConf, compression.name());
Properties tableProperties = new Properties();
tableProperties.setProperty(IOConstants.COLUMNS, "test");
tableProperties.setProperty(IOConstants.COLUMNS_TYPES, columnObjectInspector.getTypeName());
tableProperties.setProperty(OrcConf.STRIPE_SIZE.getAttribute(), "120000");
return new OrcOutputFormat().getHiveRecordWriter(
jobConf,
new Path(outputFile.toURI()),
Text.class,
compression != NONE,
tableProperties,
() -> {});
}
private static void createMultiStripeFile(File file)
throws IOException, ReflectiveOperationException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, CompressionKind.NONE, BIGINT);
Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", BIGINT);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < 300; i += 3) {
if ((i > 0) && (i % 60 == 0)) {
flushWriter(writer);
}
objectInspector.setStructFieldData(row, field, (long) i);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
private static void createSequentialFile(File file, int count)
throws IOException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, CompressionKind.NONE, BIGINT);
Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", BIGINT);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < count; i++) {
objectInspector.setStructFieldData(row, field, (long) i);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(
JobConf jobConf,
Path finalOutPath,
Class<? extends Writable> valueClass,
boolean isCompressed,
Properties tableProperties,
Progressable progress)
throws IOException
{
if (schema.isPresent()) {
DataWritableWriteSupport.setSchema(schema.get(), jobConf);
return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress, tableProperties);
}
return super.getHiveRecordWriter(jobConf, finalOutPath, valueClass, isCompressed, tableProperties, progress);
}
private static FileSinkOperator.RecordWriter createOrcRecordWriter(File outputFile, Format format, Compression compression, ObjectInspector columnObjectInspector)
throws IOException
{
JobConf jobConf = new JobConf();
jobConf.set("hive.exec.orc.write.format", format == ORC_12 ? "0.12" : "0.11");
jobConf.set("hive.exec.orc.default.compress", compression.name());
Properties tableProperties = new Properties();
tableProperties.setProperty("columns", "test");
tableProperties.setProperty("columns.types", columnObjectInspector.getTypeName());
tableProperties.setProperty("orc.stripe.size", "1200000");
return new OrcOutputFormat().getHiveRecordWriter(
jobConf,
new Path(outputFile.toURI()),
Text.class,
compression != NONE,
tableProperties,
() -> { });
}
private static void createMultiStripeFile(File file)
throws IOException, ReflectiveOperationException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, OrcTester.Compression.NONE, javaLongObjectInspector);
@SuppressWarnings("deprecation") Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", javaLongObjectInspector);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < 300; i += 3) {
if ((i > 0) && (i % 60 == 0)) {
flushWriter(writer);
}
objectInspector.setStructFieldData(row, field, (long) i);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
private static void createSequentialFile(File file, int count)
throws IOException, ReflectiveOperationException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, OrcTester.Compression.NONE, javaLongObjectInspector);
@SuppressWarnings("deprecation") Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", javaLongObjectInspector);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < count; i++) {
objectInspector.setStructFieldData(row, field, (long) i);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
/**
* Write a file that contains a given number of maps where each row has 10 entries in total
* and some entries have null keys/values.
*/
private static TempFile createSingleColumnMapFileWithNullValues(Type mapType, int rows)
throws IOException, SerDeException
{
Serializer serde = new OrcSerde();
TempFile tempFile = new TempFile();
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(tempFile.getFile(), ORC_12, CompressionKind.NONE, mapType);
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", mapType);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 1; i <= rows; i++) {
HashMap<Long, Long> map = new HashMap<>();
for (int j = 1; j <= 8; j++) {
Long value = (long) j;
map.put(value, value);
}
// Add null values so that the StreamReader nullVectors are not empty.
map.put(null, 0L);
map.put(0L, null);
objectInspector.setStructFieldData(row, field, map);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
return tempFile;
}
private static void flushWriter(FileSinkOperator.RecordWriter writer)
throws IOException, ReflectiveOperationException
{
Field field = OrcOutputFormat.class.getClassLoader()
.loadClass(OrcOutputFormat.class.getName() + "$OrcRecordWriter")
.getDeclaredField("writer");
field.setAccessible(true);
((Writer) field.get(writer)).writeIntermediateFooter();
}
private static void createGrowingSequentialFile(File file, int count, int step, int initialLength)
throws IOException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, CompressionKind.NONE, VARCHAR);
Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", VARCHAR);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < initialLength; i++) {
builder.append("0");
}
String seedString = builder.toString();
// gradually grow the length of a cell
int previousLength = initialLength;
for (int i = 0; i < count; i++) {
if ((i / step + 1) * initialLength > previousLength) {
previousLength = (i / step + 1) * initialLength;
builder.append(seedString);
}
objectInspector.setStructFieldData(row, field, builder.toString());
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
private static void writeEmptyFile(ConnectorSession session, Path target, JobConf conf, Properties properties, String serDe, String outputFormatName)
{
// Some serializers such as Avro set a property in the schema.
initializeSerializer(conf, properties, serDe);
// The code below is not a try with resources because RecordWriter is not Closeable.
FileSinkOperator.RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(target, conf, properties, outputFormatName, session);
try {
recordWriter.close(false);
}
catch (IOException e) {
throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error write empty file to Hive", e);
}
}
HivePartitionWriter(JobConf jobConf, OutputFormat outputFormat, FileSinkOperator.RecordWriter recordWriter,
OutputCommitter outputCommitter) {
this.jobConf = jobConf;
this.outputFormat = outputFormat;
this.recordWriter = recordWriter;
this.outputCommitter = outputCommitter;
}
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath) {
try {
Class utilClass = HiveFileFormatUtils.class;
HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();
Method utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, HiveOutputFormat.class,
Class.class, boolean.class, Properties.class, Path.class, Reporter.class);
return (FileSinkOperator.RecordWriter) utilMethod.invoke(null,
jobConf, outputFormat, outValClz, isCompressed, tableProps, outPath, Reporter.NULL);
} catch (Exception e) {
throw new CatalogException("Failed to create Hive RecordWriter", e);
}
}
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath) {
try {
Class utilClass = HiveFileFormatUtils.class;
OutputFormat outputFormat = (OutputFormat) outputFormatClz.newInstance();
Method utilMethod = utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, OutputFormat.class,
Class.class, boolean.class, Properties.class, Path.class, Reporter.class);
return (FileSinkOperator.RecordWriter) utilMethod.invoke(null,
jobConf, outputFormat, outValClz, isCompressed, tableProps, outPath, Reporter.NULL);
} catch (Exception e) {
throw new CatalogException("Failed to create Hive RecordWriter", e);
}
}
@Override
public HadoopPathBasedBulkWriter<RowData> create(Path targetPath, Path inProgressPath) throws IOException {
FileSinkOperator.RecordWriter recordWriter = factory.createRecordWriter(inProgressPath);
Function<RowData, Writable> rowConverter = factory.createRowDataConverter();
FileSystem fs = FileSystem.get(inProgressPath.toUri(), factory.getJobConf());
return new HadoopPathBasedBulkWriter<RowData>() {
@Override
public long getSize() throws IOException {
return fs.getFileStatus(inProgressPath).getLen();
}
@Override
public void dispose() {
// close silently.
try {
recordWriter.close(true);
} catch (IOException ignored) {
}
}
@Override
public void addElement(RowData element) throws IOException {
recordWriter.write(rowConverter.apply(element));
}
@Override
public void flush() {
}
@Override
public void finish() throws IOException {
recordWriter.close(false);
}
};
}
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass,
boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException {
URI uri = finalOutPath.toUri();
assertEquals(TEST_URI_SCHEME, uri.getScheme());
assertEquals(TEST_URI_AUTHORITY, uri.getAuthority());
return null;
}
/**
*
* Create the parquet schema from the hive schema, and return the RecordWriterWrapper which
* contains the real output format
*/
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(
final JobConf jobConf,
final Path finalOutPath,
final Class<? extends Writable> valueClass,
final boolean isCompressed,
final Properties tableProperties,
final Progressable progress) throws IOException {
LOG.info("creating new record writer...{}", this);
final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS);
final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES);
List<String> columnNames;
List<TypeInfo> columnTypes;
if (columnNameProperty.length() == 0) {
columnNames = new ArrayList<String>();
} else {
columnNames = Arrays.asList(columnNameProperty.split(","));
}
if (columnTypeProperty.length() == 0) {
columnTypes = new ArrayList<TypeInfo>();
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
}
@Test
public void testEmptyFile() throws Exception {
JobConf job = new JobConf(conf);
Properties properties = new Properties();
HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
FileSinkOperator.RecordWriter writer =
outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
properties, Reporter.NULL);
writer.close(true);
properties.setProperty("columns", "x,y");
properties.setProperty("columns.types", "int:int");
SerDe serde = new OrcSerde();
serde.initialize(conf, properties);
InputFormat<?,?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, testFilePath.toString());
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(1, splits.length);
// read the whole file
conf.set("hive.io.file.readcolumn.ids", "0,1");
org.apache.hadoop.mapred.RecordReader reader =
in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
assertEquals(0.0, reader.getProgress(), 0.00001);
assertEquals(0, reader.getPos());
assertEquals(false, reader.next(key, value));
reader.close();
assertEquals(null, serde.getSerDeStats());
}
private static void flushWriter(FileSinkOperator.RecordWriter writer)
throws IOException, ReflectiveOperationException
{
Field field = OrcOutputFormat.class.getClassLoader()
.loadClass(OrcOutputFormat.class.getName() + "$OrcRecordWriter")
.getDeclaredField("writer");
field.setAccessible(true);
((Writer) field.get(writer)).writeIntermediateFooter();
}
private static void createParquetFile(
Path path,
StandardStructObjectInspector inspector,
Iterator<?>[] iterators,
MessageType parquetSchema,
List<String> columnNames)
{
Properties tableProperties = createTableProperties(columnNames, Collections.singletonList(inspector));
JobConf jobConf = new JobConf();
jobConf.setEnum(COMPRESSION, UNCOMPRESSED);
jobConf.setBoolean(ENABLE_DICTIONARY, false);
jobConf.setEnum(WRITER_VERSION, PARQUET_2_0);
try {
FileSinkOperator.RecordWriter recordWriter = new TestMapredParquetOutputFormat(Optional.of(parquetSchema), true)
.getHiveRecordWriter(
jobConf,
path,
Text.class,
false,
tableProperties,
() -> {});
Object row = inspector.create();
List<StructField> fields = ImmutableList.copyOf(inspector.getAllStructFieldRefs());
while (stream(iterators).allMatch(Iterator::hasNext)) {
for (int i = 0; i < fields.size(); i++) {
Object value = iterators[i].next();
inspector.setStructFieldData(row, fields.get(i), value);
}
ParquetHiveSerDe serde = new ParquetHiveSerDe();
serde.initialize(jobConf, tableProperties, null);
Writable record = serde.serialize(row, inspector);
recordWriter.write(record);
}
recordWriter.close(false);
}
catch (IOException | SerDeException e) {
throw new RuntimeException(e);
}
}
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class<? extends Writable> aClass, boolean b, Properties properties, Progressable progressable) throws IOException {
return new TSFHiveRecordWriter(jobConf, path, null);
}
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Path path, Class aClass, boolean b, Properties properties, Progressable progressable) throws IOException {
//hive调用这个writer
return new SolrHiveWriter(jobConf);
}
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf conf, Path path,
Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
Progressable reporter) throws IOException {
ReaderWriterProfiler.setProfilerOptions(conf);
String stripeSizeStr = tableProperties.getProperty(OrcFile.STRIPE_SIZE);
long stripeSize;
if (stripeSizeStr != null) {
stripeSize = Long.valueOf(stripeSizeStr);
} else {
stripeSize = OrcConf.getLongVar(conf, OrcConf.ConfVars.HIVE_ORC_STRIPE_SIZE);
}
String compression = tableProperties.getProperty(OrcFile.COMPRESSION);
if (compression == null) {
compression = OrcConf.getVar(conf, OrcConf.ConfVars.HIVE_ORC_COMPRESSION);
}
String compressionSizeStr = tableProperties.getProperty(OrcFile.COMPRESSION_BLOCK_SIZE);
int compressionSize;
if (compressionSizeStr != null) {
compressionSize = Integer.valueOf(compressionSizeStr);
} else {
compressionSize = OrcConf.getIntVar(conf,
OrcConf.ConfVars.HIVE_ORC_COMPRESSION_BLOCK_SIZE);
}
String rowIndexStrideStr = tableProperties.getProperty(OrcFile.ROW_INDEX_STRIDE);
int rowIndexStride;
if (rowIndexStrideStr != null) {
rowIndexStride = Integer.valueOf(rowIndexStrideStr);
} else {
rowIndexStride = OrcConf.getIntVar(conf, OrcConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE);
}
String enableIndexesStr = tableProperties.getProperty(OrcFile.ENABLE_INDEXES);
boolean enableIndexes;
if (enableIndexesStr != null) {
enableIndexes = Boolean.valueOf(enableIndexesStr);
} else {
enableIndexes = OrcConf.getBoolVar(conf, OrcConf.ConfVars.HIVE_ORC_CREATE_INDEX);
}
if (!enableIndexes) {
rowIndexStride = 0;
}
return new OrcRecordWriter(path.getFileSystem(conf), path, conf,
stripeSize, compression, compressionSize, rowIndexStride);
}
/**
* Get Hive's FileSinkOperator.RecordWriter.
*/
FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, Class outputFormatClz,
Class<? extends Writable> outValClz, boolean isCompressed, Properties tableProps, Path outPath);