下面列出了com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider#org.apache.hadoop.conf.Configurable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private CompressionCodec createCompressionCodec(String codecName)
{
try {
Class<? extends CompressionCodec> codecClass = classLoader.loadClass(codecName).asSubclass(CompressionCodec.class);
Constructor<? extends CompressionCodec> constructor = codecClass.getDeclaredConstructor();
constructor.setAccessible(true);
CompressionCodec codec = constructor.newInstance();
if (codec instanceof Configurable) {
// Hadoop is crazy... you have to give codecs an empty configuration or they throw NPEs
// but you need to make sure the configuration doesn't "load" defaults or it spends
// forever loading XML with no useful information
((Configurable) codec).setConf(new Configuration(false));
}
return codec;
}
catch (ReflectiveOperationException e) {
throw new IllegalArgumentException("Unknown codec: " + codecName, e);
}
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
// read the parent fields and the final fields
in.defaultReadObject();
// the job conf knows how to deserialize itself
jobConf = new JobConf();
jobConf.readFields(in);
try {
hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
}
catch (Exception e) {
throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
}
if (hadoopInputSplit instanceof Configurable) {
((Configurable) hadoopInputSplit).setConf(this.jobConf);
}
else if (hadoopInputSplit instanceof JobConfigurable) {
((JobConfigurable) hadoopInputSplit).configure(this.jobConf);
}
hadoopInputSplit.readFields(in);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
// read the parent fields and the final fields
in.defaultReadObject();
// the job conf knows how to deserialize itself
jobConf = new JobConf();
jobConf.readFields(in);
try {
hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
}
catch (Exception e) {
throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
}
if (hadoopInputSplit instanceof Configurable) {
((Configurable) hadoopInputSplit).setConf(this.jobConf);
}
else if (hadoopInputSplit instanceof JobConfigurable) {
((JobConfigurable) hadoopInputSplit).configure(this.jobConf);
}
hadoopInputSplit.readFields(in);
}
protected void startDaemonProcesses(Configuration configuration) {
String daemonClassNames = configuration.get(DAEMON_PROCESSES_PROPERTY);
if (daemonClassNames == null) {
return;
}
for (String className : StringUtils.split(daemonClassNames, ',')) {
try {
@SuppressWarnings("unchecked")
Class<? extends Runnable> daemonClass = (Class<? extends Runnable>) Class.forName(className.trim());
Runnable daemon = daemonClass.newInstance();
if (daemon instanceof Configurable) {
Configurable configurable = (Configurable) daemon;
configurable.setConf(configuration);
}
Thread daemonThread = new Thread(daemon);
daemonThread.setDaemon(true);
daemonThread.start();
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
throw new IllegalArgumentException(e);
}
}
}
public void testBarWritable() throws Exception {
System.out.println("Testing Writable, Configurable wrapped in GenericWritable");
FooGenericWritable generic = new FooGenericWritable();
generic.setConf(conf);
Bar bar = new Bar();
bar.setConf(conf);
generic.set(bar);
//test writing generic writable
FooGenericWritable after
= (FooGenericWritable)TestWritable.testWritable(generic, conf);
//test configuration
System.out.println("Testing if Configuration is passed to wrapped classes");
assertTrue(after.get() instanceof Configurable);
assertNotNull(((Configurable)after.get()).getConf());
}
/**
* Get the 'value' corresponding to the last read 'key'.
*
* @param val : The 'value' to be read.
*/
public synchronized void getCurrentValue(Writable val)
throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
val.readFields(valIn);
if (valIn.read() > 0) {
log.info("available bytes: " + valIn.available());
throw new IOException(val + " read " + (valBuffer.getPosition() - keyLength)
+ " bytes, should read " +
(valBuffer.getLength() - keyLength));
}
}
/**
* Get the 'value' corresponding to the last read 'key'.
*
* @param val : The 'value' to be read.
*/
public synchronized WALEntry getCurrentValue(WALEntry val)
throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
val = deserializeValue(val);
if (valIn.read() > 0) {
log.info("available bytes: " + valIn.available());
throw new IOException(val + " read " + (valBuffer.getPosition() - keyLength)
+ " bytes, should read " +
(valBuffer.getLength() - keyLength));
}
return val;
}
public void testBarWritable() throws Exception {
System.out.println("Testing Writable, Configurable wrapped in GenericWritable");
FooGenericWritable generic = new FooGenericWritable();
generic.setConf(conf);
Bar bar = new Bar();
bar.setConf(conf);
generic.set(bar);
//test writing generic writable
FooGenericWritable after
= (FooGenericWritable)TestWritable.testWritable(generic, conf);
//test configuration
System.out.println("Testing if Configuration is passed to wrapped classes");
assertTrue(after.get() instanceof Configurable);
assertNotNull(((Configurable)after.get()).getConf());
}
/**
* Create new record record from the original InputFormat and initialize it.
*
* @throws IOException
* @throws InterruptedException
*/
private void createNewRecordReader() throws IOException,
InterruptedException
{
FileSplit split =
new FileSplit(combineFileSplit.getPath(currentFileIndex),
combineFileSplit.getOffset(currentFileIndex),
combineFileSplit.getLength(currentFileIndex),
null);
if (split instanceof Configurable)
{
((Configurable) split).setConf(context.getConfiguration());
}
current = inputFormat.createRecordReader(split, context);
current.initialize(split, context);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
// read the parent fields and the final fields
in.defaultReadObject();
try {
hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
}
if (needsJobConf(hadoopInputSplit)) {
// the job conf knows how to deserialize itself
jobConf = new JobConf();
jobConf.readFields(in);
if (hadoopInputSplit instanceof Configurable) {
((Configurable) hadoopInputSplit).setConf(this.jobConf);
} else if (hadoopInputSplit instanceof JobConfigurable) {
((JobConfigurable) hadoopInputSplit).configure(this.jobConf);
}
}
hadoopInputSplit.readFields(in);
}
private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
CompressionCodec compressor) throws IOException {
Compressor poolCompressor = null;
try {
if (compressor != null) {
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
poolCompressor = CodecPool.getCompressor(compressor);
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
}
encoder.flush();
} catch (BufferOverflowException | IndexOutOfBoundsException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
if (poolCompressor != null) {
CodecPool.returnCompressor(poolCompressor);
}
}
}
private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
int osInitialSize) throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
ByteBufferOutputStream bbos;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
bbos = new ByteBufferOutputStream(osInitialSize);
IOUtils.copy(cis, bbos);
bbos.close();
return bbos.getByteBuffer();
} finally {
CodecPool.returnDecompressor(poolDecompressor);
}
}
private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) {
Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
if (clazz == null) { return null; }
try {
UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance();
if (unboundRecordFilter instanceof Configurable) {
((Configurable)unboundRecordFilter).setConf(configuration);
}
return unboundRecordFilter;
} catch (InstantiationException | IllegalAccessException e) {
throw new BadConfigurationException(
"could not instantiate unbound record filter class", e);
}
}
public void testBarWritable() throws Exception {
System.out.println("Testing Writable, Configurable wrapped in GenericWritable");
FooGenericWritable generic = new FooGenericWritable();
generic.setConf(conf);
Bar bar = new Bar();
bar.setConf(conf);
generic.set(bar);
//test writing generic writable
FooGenericWritable after
= (FooGenericWritable)TestWritable.testWritable(generic, conf);
//test configuration
System.out.println("Testing if Configuration is passed to wrapped classes");
assertTrue(after.get() instanceof Configurable);
assertNotNull(((Configurable)after.get()).getConf());
}
private static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
throws IOException {
String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
if (bufferSizeProp != null) {
Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
int bufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
DEFAULT_BUFFER_SIZE, bufferSizeProp, bufSize);
conf.setInt(bufferSizeProp, bufSize);
}
return codec.createInputStream(checksumIn, decompressor);
}
@Test
public void testInMemoryBufferSize() throws IOException {
// for smaller amount of data, codec buffer should be sized according to compressed data length
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
Writer writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
Configurable configurableCodec = (Configurable) codec;
Assert.assertEquals(writer.getCompressedLength(),
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
// buffer size cannot grow infinitely with compressed data size
data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
Assert.assertEquals(128*1024,
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
}
public void testBarWritable() throws Exception {
System.out.println("Testing Writable, Configurable wrapped in GenericWritable");
FooGenericWritable generic = new FooGenericWritable();
generic.setConf(conf);
Bar bar = new Bar();
bar.setConf(conf);
generic.set(bar);
//test writing generic writable
FooGenericWritable after
= (FooGenericWritable)TestWritable.testWritable(generic, conf);
//test configuration
System.out.println("Testing if Configuration is passed to wrapped classes");
assertTrue(after.get() instanceof Configurable);
assertNotNull(((Configurable)after.get()).getConf());
}
/**
* Starts HddsDatanode services.
*
* @param service The service instance invoking this method
*/
@Override
public void start(Object service) {
if (service instanceof Configurable) {
start(new OzoneConfiguration(((Configurable) service).getConf()));
} else {
start(new OzoneConfiguration());
}
}
private static Optional<EncryptionMaterialsProvider> createEncryptionMaterialsProvider(Configuration hadoopConfig)
{
String kmsKeyId = hadoopConfig.get(S3_KMS_KEY_ID);
if (kmsKeyId != null) {
return Optional.of(new KMSEncryptionMaterialsProvider(kmsKeyId));
}
String empClassName = hadoopConfig.get(S3_ENCRYPTION_MATERIALS_PROVIDER);
if (empClassName == null) {
return Optional.empty();
}
try {
Object instance = Class.forName(empClassName).getConstructor().newInstance();
if (!(instance instanceof EncryptionMaterialsProvider)) {
throw new RuntimeException("Invalid encryption materials provider class: " + instance.getClass().getName());
}
EncryptionMaterialsProvider emp = (EncryptionMaterialsProvider) instance;
if (emp instanceof Configurable) {
((Configurable) emp).setConf(hadoopConfig);
}
return Optional.of(emp);
}
catch (ReflectiveOperationException e) {
throw new RuntimeException("Unable to load or create S3 encryption materials provider: " + empClassName, e);
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configuration() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR InputFormat if necessary
if (this.mapredInputFormat instanceof Configurable) {
((Configurable) this.mapredInputFormat).setConf(this.jobConf);
} else if (this.mapredInputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
}
}
}
@Override
public void open(HadoopInputSplit split) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configure() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR OutputFormat if necessary
if (this.mapredOutputFormat instanceof Configurable) {
((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
} else if (this.mapredOutputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
}
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configuration() calls
synchronized (CONFIGURE_MUTEX) {
if (mapreduceInputFormat instanceof Configurable) {
((Configurable) mapreduceInputFormat).setConf(configuration);
}
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configure() calls
synchronized (CONFIGURE_MUTEX) {
if (this.mapreduceOutputFormat instanceof Configurable) {
((Configurable) this.mapreduceOutputFormat).setConf(this.configuration);
}
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configuration() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR InputFormat if necessary
if (this.mapredInputFormat instanceof Configurable) {
((Configurable) this.mapredInputFormat).setConf(this.jobConf);
} else if (this.mapredInputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
}
}
}
@Override
public void open(HadoopInputSplit split) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configure() calls
synchronized (CONFIGURE_MUTEX) {
// configure MR OutputFormat if necessary
if (this.mapredOutputFormat instanceof Configurable) {
((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
} else if (this.mapredOutputFormat instanceof JobConfigurable) {
((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
}
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configuration() calls
synchronized (CONFIGURE_MUTEX) {
if (mapreduceInputFormat instanceof Configurable) {
((Configurable) mapreduceInputFormat).setConf(configuration);
}
}
}
@Override
public void configure(Configuration parameters) {
// enforce sequential configure() calls
synchronized (CONFIGURE_MUTEX) {
if (this.mapreduceOutputFormat instanceof Configurable) {
((Configurable) this.mapreduceOutputFormat).setConf(this.configuration);
}
}
}
private Partitioner<BulkIngestKey,Value> createConfiguredPartitioner(Class<? extends Partitioner<BulkIngestKey,Value>> clazz, String prefix) {
try {
Partitioner<BulkIngestKey,Value> partitioner = clazz.newInstance();
if (partitioner instanceof Configurable) {
((Configurable) partitioner).setConf(conf);
}
// If this supports by-table configurations, attempt to use it
if (prefix != null && partitioner instanceof DelegatePartitioner) {
((DelegatePartitioner) partitioner).configureWithPrefix(prefix);
}
return partitioner;
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate delegate partitioner class: " + e.getMessage(), e);
}
}