下面列出了怎么用org.apache.hadoop.io.serializer.SerializationFactory的API类实例代码及写法,或者点击链接到github查看源代码。
public KeyValueWriter(Configuration conf, OutputStream output,
Class<K> kyClass, Class<V> valClass
) throws IOException {
keyClass = kyClass;
valueClass = valClass;
dataBuffer = new DataOutputBuffer();
SerializationFactory serializationFactory
= new SerializationFactory(conf);
keySerializer
= (Serializer<K>)serializationFactory.getSerializer(keyClass);
keySerializer.open(dataBuffer);
valueSerializer
= (Serializer<V>)serializationFactory.getSerializer(valueClass);
valueSerializer.open(dataBuffer);
outputStream = new DataOutputStream(output);
}
private <K> K serDeser(K conf) throws Exception {
SerializationFactory factory = new SerializationFactory(CONF);
Serializer<K> serializer =
factory.getSerializer(GenericsUtil.getClass(conf));
Deserializer<K> deserializer =
factory.getDeserializer(GenericsUtil.getClass(conf));
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(conf);
serializer.close();
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
K after = deserializer.deserialize(null);
deserializer.close();
return after;
}
public ValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
Progressable reporter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.reporter = reporter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
readNextKey();
key = nextKey;
nextKey = null; // force new instance creation
hasNext = more;
}
public ValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
Progressable reporter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.reporter = reporter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
readNextKey();
key = nextKey;
nextKey = null; // force new instance creation
hasNext = more;
}
/**
* Make a copy of the writable object using serialization to a buffer
* @param src the object to copy from
* @param dst the object to copy into, which is destroyed
* @return dst param (the copy)
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <T> T copy(Configuration conf,
T src, T dst) throws IOException {
CopyInCopyOutBuffer buffer = cloneBuffers.get();
buffer.outBuffer.reset();
SerializationFactory factory = getFactory(conf);
Class<T> cls = (Class<T>) src.getClass();
Serializer<T> serializer = factory.getSerializer(cls);
serializer.open(buffer.outBuffer);
serializer.serialize(src);
buffer.moveData();
Deserializer<T> deserializer = factory.getDeserializer(cls);
deserializer.open(buffer.inBuffer);
dst = deserializer.deserialize(dst);
return dst;
}
public KeyValueWriter(Configuration conf, OutputStream output,
Class<K> kyClass, Class<V> valClass
) throws IOException {
keyClass = kyClass;
valueClass = valClass;
dataBuffer = new DataOutputBuffer();
SerializationFactory serializationFactory
= new SerializationFactory(conf);
keySerializer
= (Serializer<K>)serializationFactory.getSerializer(keyClass);
keySerializer.open(dataBuffer);
valueSerializer
= (Serializer<V>)serializationFactory.getSerializer(valueClass);
valueSerializer.open(dataBuffer);
outputStream = new DataOutputStream(output);
}
private <K> K serDeser(K conf) throws Exception {
SerializationFactory factory = new SerializationFactory(CONF);
Serializer<K> serializer =
factory.getSerializer(GenericsUtil.getClass(conf));
Deserializer<K> deserializer =
factory.getDeserializer(GenericsUtil.getClass(conf));
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(conf);
serializer.close();
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
K after = deserializer.deserialize(null);
deserializer.close();
return after;
}
public ValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
Progressable reporter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.reporter = reporter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
readNextKey();
key = nextKey;
nextKey = null; // force new instance creation
hasNext = more;
}
/**
* Make a copy of the writable object using serialization to a buffer
* @param src the object to copy from
* @param dst the object to copy into, which is destroyed
* @return dst param (the copy)
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <T> T copy(Configuration conf,
T src, T dst) throws IOException {
CopyInCopyOutBuffer buffer = cloneBuffers.get();
buffer.outBuffer.reset();
SerializationFactory factory = getFactory(conf);
Class<T> cls = (Class<T>) src.getClass();
Serializer<T> serializer = factory.getSerializer(cls);
serializer.open(buffer.outBuffer);
serializer.serialize(src);
buffer.moveData();
Deserializer<T> deserializer = factory.getDeserializer(cls);
deserializer.open(buffer.inBuffer);
dst = deserializer.deserialize(dst);
return dst;
}
/**
* Gets serializer for specified class.
*
* @param cls Class.
* @param jobConf Job configuration.
* @return Appropriate serializer.
*/
@SuppressWarnings("unchecked")
private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
A.notNull(cls, "cls");
SerializationFactory factory = new SerializationFactory(jobConf);
Serialization<?> serialization = factory.getSerialization(cls);
if (serialization == null)
throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
if (serialization.getClass() == WritableSerialization.class)
return new HadoopWritableSerialization((Class<? extends Writable>)cls);
return new HadoopSerializationWrapper(serialization, cls);
}
public BlockIterator(Configuration conf,
InputStream inStream,
Class<V> valueClass,
BlockSerializationType serializationType,
BlockSchema schema) throws IOException,
InstantiationException,
IllegalAccessException
{
switch (serializationType)
{
case DEFAULT:
SerializationFactory serializationFactory = new SerializationFactory(conf);
deserializer = serializationFactory.getDeserializer(valueClass);
break;
case COMPACT:
deserializer = new CompactDeserializer<V>(schema);
break;
default:
deserializer = null;
}
deserializer.open(inStream);
}
/**
* Make a copy of the writable object using serialization to a buffer
* @param dst the object to copy from
* @param src the object to copy into, which is destroyed
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <T> T copy(Configuration conf,
T src, T dst) throws IOException {
CopyInCopyOutBuffer buffer = cloneBuffers.get();
buffer.outBuffer.reset();
SerializationFactory factory = getFactory(conf);
Class<T> cls = (Class<T>) src.getClass();
Serializer<T> serializer = factory.getSerializer(cls);
serializer.open(buffer.outBuffer);
serializer.serialize(src);
buffer.moveData();
Deserializer<T> deserializer = factory.getDeserializer(cls);
deserializer.open(buffer.inBuffer);
dst = deserializer.deserialize(dst);
return dst;
}
public ValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
Progressable reporter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.reporter = reporter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
readNextKey();
key = nextKey;
nextKey = null; // force new instance creation
hasNext = more;
}
public ReduceContext(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
}
private <K> K serDeser(K conf) throws Exception {
SerializationFactory factory = new SerializationFactory(CONF);
Serializer<K> serializer =
factory.getSerializer(GenericsUtil.getClass(conf));
Deserializer<K> deserializer =
factory.getDeserializer(GenericsUtil.getClass(conf));
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(conf);
serializer.close();
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
K after = deserializer.deserialize(null);
deserializer.close();
return after;
}
/**
* Make a copy of the writable object using serialization to a buffer
* @param dst the object to copy from
* @param src the object to copy into, which is destroyed
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static <T> T copy(Configuration conf,
T src, T dst) throws IOException {
CopyInCopyOutBuffer buffer = cloneBuffers.get();
buffer.outBuffer.reset();
SerializationFactory factory = getFactory(conf);
Class<T> cls = (Class<T>) src.getClass();
Serializer<T> serializer = factory.getSerializer(cls);
serializer.open(buffer.outBuffer);
serializer.serialize(src);
buffer.moveData();
Deserializer<T> deserializer = factory.getDeserializer(cls);
deserializer.open(buffer.inBuffer);
dst = deserializer.deserialize(dst);
return dst;
}
public ShuffledUnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
TezCounter inputRecordCounter)
throws IOException {
this.shuffleManager = shuffleManager;
this.codec = codec;
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
this.ifileBufferSize = ifileBufferSize;
this.inputRecordCounter = inputRecordCounter;
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
this.keyIn = new DataInputBuffer();
this.valIn = new DataInputBuffer();
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(valIn);
}
public ValuesIterator (TezRawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
TezCounter inputKeyCounter,
TezCounter inputValueCounter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
}
public ValuesIterator (TezRawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
Progressable reporter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.reporter = reporter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
readNextKey();
key = nextKey;
nextKey = null; // force new instance creation
hasNext = more;
}
@Private
public static <T extends org.apache.hadoop.mapreduce.InputSplit> MRSplitProto createSplitProto(
T newSplit, SerializationFactory serializationFactory)
throws IOException, InterruptedException {
MRSplitProto.Builder builder = MRSplitProto
.newBuilder();
builder.setSplitClassName(newSplit.getClass().getName());
@SuppressWarnings("unchecked")
Serializer<T> serializer = serializationFactory
.getSerializer((Class<T>) newSplit.getClass());
ByteString.Output out = ByteString
.newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
serializer.open(out);
serializer.serialize(newSplit);
// TODO MR Compat: Check against max block locations per split.
ByteString splitBs = out.toByteString();
builder.setSplitBytes(splitBs);
return builder.build();
}
@SuppressWarnings("unchecked")
public static InputSplit createOldFormatSplitFromUserPayload(
MRSplitProto splitProto, SerializationFactory serializationFactory)
throws IOException {
// This may not need to use serialization factory, since OldFormat
// always uses Writable to write splits.
Preconditions.checkNotNull(splitProto, "splitProto cannot be null");
String className = splitProto.getSplitClassName();
Class<InputSplit> clazz;
try {
clazz = (Class<InputSplit>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
}
Deserializer<InputSplit> deserializer = serializationFactory
.getDeserializer(clazz);
deserializer.open(splitProto.getSplitBytes().newInput());
InputSplit inputSplit = deserializer.deserialize(null);
deserializer.close();
return inputSplit;
}
public ReduceContext(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputCounter = inputCounter;
this.comparator = comparator;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
}
public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize,
TezCounter inputRecordCounter, InputContext context)
throws IOException {
this.shuffleManager = shuffleManager;
this.context = context;
this.codec = codec;
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
this.ifileBufferSize = ifileBufferSize;
this.inputRecordCounter = inputRecordCounter;
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
this.keyIn = new DataInputBuffer();
this.valIn = new DataInputBuffer();
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(valIn);
}
public Writer(Configuration conf, FSDataOutputStream outputStream,
Class keyClass, Class valueClass,
CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter,
boolean rle) throws IOException {
this.rawOut = outputStream;
this.writtenRecordsCounter = writesCounter;
this.serializedUncompressedBytes = serializedBytesCounter;
this.start = this.rawOut.getPos();
this.rle = rle;
setupOutputStream(codec);
writeHeader(outputStream);
if (keyClass != null) {
this.closeSerializers = true;
SerializationFactory serializationFactory =
new SerializationFactory(conf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.keySerializer.open(buffer);
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
} else {
this.closeSerializers = false;
}
}
public ValuesIterator (TezRawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
TezCounter inputKeyCounter,
TezCounter inputValueCounter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
}
/**
* Create an instance of {@link org.apache.hadoop.mapred.InputSplit} from the {@link
* org.apache.tez.mapreduce.input.MRInput} representation of a split.
*
* @param splitProto The {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto}
* instance representing the split
* @param serializationFactory the serialization mechanism used to write out the split
* @return an instance of the split
* @throws java.io.IOException
*/
@SuppressWarnings("unchecked")
@InterfaceStability.Evolving
@InterfaceAudience.LimitedPrivate({"hive, pig"})
public static InputSplit createOldFormatSplitFromUserPayload(
MRRuntimeProtos.MRSplitProto splitProto, SerializationFactory serializationFactory)
throws IOException {
// This may not need to use serialization factory, since OldFormat
// always uses Writable to write splits.
Objects.requireNonNull(splitProto, "splitProto cannot be null");
String className = splitProto.getSplitClassName();
Class<InputSplit> clazz;
try {
clazz = (Class<InputSplit>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
}
Deserializer<InputSplit> deserializer = serializationFactory
.getDeserializer(clazz);
deserializer.open(splitProto.getSplitBytes().newInput());
InputSplit inputSplit = deserializer.deserialize(null);
deserializer.close();
return inputSplit;
}
/**
* Create an instance of {@link org.apache.hadoop.mapreduce.InputSplit} from the {@link
* org.apache.tez.mapreduce.input.MRInput} representation of a split.
*
* @param splitProto The {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto}
* instance representing the split
* @param serializationFactory the serialization mechanism used to write out the split
* @return an instance of the split
* @throws IOException
*/
@InterfaceStability.Evolving
@SuppressWarnings("unchecked")
public static org.apache.hadoop.mapreduce.InputSplit createNewFormatSplitFromUserPayload(
MRRuntimeProtos.MRSplitProto splitProto, SerializationFactory serializationFactory)
throws IOException {
Objects.requireNonNull(splitProto, "splitProto must be specified");
String className = splitProto.getSplitClassName();
Class<org.apache.hadoop.mapreduce.InputSplit> clazz;
try {
clazz = (Class<org.apache.hadoop.mapreduce.InputSplit>) Class
.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
}
Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = serializationFactory
.getDeserializer(clazz);
deserializer.open(splitProto.getSplitBytes().newInput());
org.apache.hadoop.mapreduce.InputSplit inputSplit = deserializer
.deserialize(null);
deserializer.close();
return inputSplit;
}
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec, Counters.Counter writesCounter,
boolean ownOutputStream)
throws IOException {
this.writtenRecordsCounter = writesCounter;
this.checksumOut = new IFileOutputStream(out);
this.rawOut = out;
this.start = this.rawOut.getPos();
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
LOG.warn("Could not obtain compressor from CodecPool");
this.out = new FSDataOutputStream(checksumOut,null);
}
} else {
this.out = new FSDataOutputStream(checksumOut,null);
}
this.keyClass = keyClass;
this.valueClass = valueClass;
if (keyClass != null) {
SerializationFactory serializationFactory =
new SerializationFactory(conf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.keySerializer.open(buffer);
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
}
this.ownOutputStream = ownOutputStream;
}
@SuppressWarnings("unchecked")
public void write(DataOutput out) throws IOException {
Text.writeString(out, inputSplitClass.getName());
Text.writeString(out, inputFormatClass.getName());
Text.writeString(out, mapperClass.getName());
SerializationFactory factory = new SerializationFactory(conf);
Serializer serializer =
factory.getSerializer(inputSplitClass);
serializer.open((DataOutputStream)out);
serializer.serialize(inputSplit);
}
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}