下面列出了怎么用org.apache.hadoop.util.ReflectionUtils的API类实例代码及写法,或者点击链接到github查看源代码。
/** Get an array of FilterConfiguration specified in the conf */
private static FilterInitializer[] getFilterInitializers(Configuration conf) {
if (conf == null) {
return null;
}
Class<?>[] classes = conf.getClasses(FILTER_INITIALIZER_PROPERTY);
if (classes == null) {
return null;
}
FilterInitializer[] initializers = new FilterInitializer[classes.length];
for(int i = 0; i < classes.length; i++) {
initializers[i] = (FilterInitializer)ReflectionUtils.newInstance(
classes[i], conf);
}
return initializers;
}
@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
@SuppressWarnings("unchecked")
private KeyACLs getAcls(String clsStr) throws IOException {
KeyACLs keyAcl = null;
try {
Class<? extends KeyACLs> cls = null;
if (clsStr == null || clsStr.trim().equals("")) {
cls = KMSACLs.class;
} else {
Class<?> configClass = Class.forName(clsStr);
if(!KeyACLs.class.isAssignableFrom(configClass) ){
throw new RuntimeException(clsStr+" should implement KeyACLs");
}
cls = (Class<? extends KeyACLs>)configClass;
}
if (cls != null) {
keyAcl = ReflectionUtils.newInstance(cls, kmsConf);
}
} catch (Exception e) {
LOG.error("Unable to getAcls with an exception", e);
throw new IOException(e.getMessage());
}
return keyAcl;
}
/**
* Create an instance of the AppChecker service via reflection based on the
* {@link YarnConfiguration#SCM_APP_CHECKER_CLASS} parameter.
*
* @param conf
* @return an instance of the AppChecker class
*/
@Private
@SuppressWarnings("unchecked")
public static AppChecker createAppCheckerService(Configuration conf) {
Class<? extends AppChecker> defaultCheckerClass;
try {
defaultCheckerClass =
(Class<? extends AppChecker>) Class
.forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
} catch (Exception e) {
throw new YarnRuntimeException("Invalid default scm app checker class"
+ YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e);
}
AppChecker checker =
ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass,
AppChecker.class), conf);
return checker;
}
@SuppressWarnings("unchecked")
public void combine(RawKeyValueIterator kvIter,
OutputCollector<K,V> combineCollector
) throws IOException {
Reducer<K,V,K,V> combiner =
ReflectionUtils.newInstance(combinerClass, job);
try {
CombineValuesIterator<K,V> values =
new CombineValuesIterator<K,V>(kvIter, comparator, keyClass,
valueClass, job, reporter,
inputCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
reporter);
values.nextKey();
}
} finally {
combiner.close();
}
}
/**
* Called after a new FileSystem instance is constructed.
*
* @param name a uri whose authority section names the host, port, etc. for this FileSystem
* @param conf the configuration
*/
@Override
public void initialize(URI name, Configuration conf) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
//the real use is the one that has the Kerberos credentials needed for
//SPNEGO to work
realUser = ugi.getRealUser();
if (realUser == null) {
realUser = UserGroupInformation.getLoginUser();
}
super.initialize(name, conf);
try {
uri = new URI(name.getScheme() + "://" + name.getAuthority());
} catch (URISyntaxException ex) {
throw new IOException(ex);
}
Class<? extends DelegationTokenAuthenticator> klass =
getConf().getClass("httpfs.authenticator.class",
KerberosDelegationTokenAuthenticator.class,
DelegationTokenAuthenticator.class);
DelegationTokenAuthenticator authenticator =
ReflectionUtils.newInstance(klass, getConf());
authURL = new DelegationTokenAuthenticatedURL(authenticator);
}
private <E> E makeCopyForPassByValue(Serialization<E> serialization,
E obj) throws IOException {
Serializer<E> ser =
serialization.getSerializer(GenericsUtil.getClass(obj));
Deserializer<E> deser =
serialization.getDeserializer(GenericsUtil.getClass(obj));
DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
dof.reset();
ser.open(dof);
ser.serialize(obj);
ser.close();
obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
getChainJobConf());
ByteArrayInputStream bais =
new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
deser.open(bais);
deser.deserialize(obj);
deser.close();
return obj;
}
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
@SuppressWarnings("unchecked")
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!outer.nextKeyValue()) {
return false;
}
if (outer.getCurrentKey() == null) {
return true;
}
key = (K1) ReflectionUtils.newInstance(outer.getCurrentKey()
.getClass(), outer.getConfiguration());
key = ReflectionUtils.copy(outer.getConfiguration(),
outer.getCurrentKey(), key);
V1 outerVal = outer.getCurrentValue();
if (outerVal != null) {
value = (V1) ReflectionUtils.newInstance(outerVal.getClass(),
outer.getConfiguration());
value = ReflectionUtils.copy(outer.getConfiguration(),
outer.getCurrentValue(), value);
}
return true;
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
helper = new SqoopHCatImportHelper(conf);
String recordClassName = conf.get(ConfigurationHelper
.getDbInputClassProperty());
if (null == recordClassName) {
throw new IOException("DB Input class name is not set!");
}
try {
Class<?> cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == sqoopRecord) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
}
@Override
public RecordWriter<LongWritable, Document> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
job, GzipCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path path = getDefaultWorkFile(job, extension);
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(path, false);
if (!isCompressed) {
return new JSONFileOutputRecordWriter(out);
} else {
return new JSONFileOutputRecordWriter(new DataOutputStream(
codec.createOutputStream(out)));
}
}
@SuppressWarnings("unchecked")
private synchronized RecordWriter getNewRecordWriter(
TaskAttemptContext taskContext, String baseFileName)
throws IOException, InterruptedException {
// look for record-writer in the cache
RecordWriter writer = newRecordWriters.get(baseFileName);
// If not in cache, create a new one
if (writer == null) {
// get the record writer from context output format
taskContext.getConfiguration().set(
MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME, baseFileName);
try {
writer = ((OutputFormat) ReflectionUtils.newInstance(
taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
.getRecordWriter(taskContext);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
// add the record-writer to the cache
newRecordWriters.put(baseFileName, writer);
}
return writer;
}
@Override
public void run() {
LOG.info("Starting HeartbeatMonitor");
boolean forceExit = false;
long gap = 0;
while (running && !shuttingDown) {
long now = System.currentTimeMillis();
gap = now - lastHeartbeat;
if (gap > maxHeartbeatGap) {
forceExit = true;
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
if (forceExit) {
LOG.fatal("No heartbeat for " + gap + " msec, TaskTracker has to die");
ReflectionUtils.logThreadInfo(LOG, "No heartbeat", 1);
System.exit(-1);
} else {
LOG.info("Stopping HeartbeatMonitor, running=" + running +
", shuttingDown=" + shuttingDown);
}
}
@Override // Mapper
public void configure(JobConf conf) {
super.configure(conf);
// grab compression
String compression = getConf().get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
// try to initialize codec
try {
codec = (compression == null) ? null :
Class.forName(compression).asSubclass(CompressionCodec.class);
} catch(Exception e) {
throw new RuntimeException("Compression codec not found: ", e);
}
if(codec != null) {
compressionCodec = (CompressionCodec)
ReflectionUtils.newInstance(codec, getConf());
}
}
/**
* Test using the gzip codec and an empty input file
*/
public static void testGzipEmpty() throws IOException {
JobConf job = new JobConf();
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
localFs.delete(workDir, true);
writeFile(localFs, new Path(workDir, "empty.gz"), gzip, "");
FileInputFormat.setInputPaths(job, workDir);
TextInputFormat format = new TextInputFormat();
format.configure(job);
InputSplit[] splits = format.getSplits(job, 100);
assertEquals("Compressed files of length 0 are not returned from FileInputFormat.getSplits().",
1, splits.length);
List<Text> results = readSplit(format, splits[0], job);
assertEquals("Compressed empty file length == 0", 0, results.size());
}
/**
* Helper method to create FailoverProxyProvider.
*/
private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
Configuration conf, Class<T> protocol) {
Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
try {
defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
Class.forName(
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
} catch (Exception e) {
throw new YarnRuntimeException("Invalid default failover provider class" +
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
}
RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
defaultProviderClass, RMFailoverProxyProvider.class), conf);
provider.init(conf, (RMProxy<T>) this, protocol);
return provider;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked") // Explicit check for value class agreement
public V createValue() {
if (null == valueclass) {
final Class<?> cls = kids[0].createValue().getClass();
for (RecordReader<K,? extends V> rr : kids) {
if (!cls.equals(rr.createValue().getClass())) {
throw new ClassCastException("Child value classes fail to agree");
}
}
valueclass = cls.asSubclass(Writable.class);
ivalue = createInternalValue();
}
return (V) ReflectionUtils.newInstance(valueclass, null);
}
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (!HttpServer2.isInstrumentationAccessAllowed(getServletContext(),
request, response)) {
return;
}
response.setContentType("text/plain; charset=UTF-8");
try (PrintStream out = new PrintStream(
response.getOutputStream(), false, "UTF-8")) {
ReflectionUtils.printThreadInfo(out, "");
}
ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
}
@Test(timeout=60000)
public void testRR() throws Exception {
@SuppressWarnings("unchecked")
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
initPolicy(policy, 1.0f);
TestRoundRobinVolumeChoosingPolicy.testRR(policy);
}
private AWSCredentialsProvider getAWSCredentialsProvider(HiveConf conf) {
Class<? extends AWSCredentialsProviderFactory> providerFactoryClass = conf
.getClass(AWS_CATALOG_CREDENTIALS_PROVIDER_FACTORY_CLASS,
DefaultAWSCredentialsProviderFactory.class).asSubclass(
AWSCredentialsProviderFactory.class);
AWSCredentialsProviderFactory provider = ReflectionUtils.newInstance(
providerFactoryClass, conf);
return provider.buildAWSCredentialsProvider(conf);
}
public static void main(String[] args) throws Exception {
String usage = "Usage: MapFile inFile outFile";
if (args.length != 2) {
System.err.println(usage);
System.exit(-1);
}
String in = args[0];
String out = args[1];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
MapFile.Reader reader = null;
MapFile.Writer writer = null;
try {
reader = new MapFile.Reader(fs, in, conf);
writer =
new MapFile.Writer(conf, fs, out,
reader.getKeyClass().asSubclass(WritableComparable.class),
reader.getValueClass());
WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass()
.asSubclass(WritableComparable.class), conf);
Writable value = ReflectionUtils.newInstance(reader.getValueClass()
.asSubclass(Writable.class), conf);
while (reader.next(key, value)) // copy all entries
writer.append(key, value);
} finally {
IOUtils.cleanup(LOG, writer, reader);
}
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
++records;
if ((double) kept / records < freq) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++kept;
}
}
reader.close();
}
return (K[])samples.toArray();
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
/**
* Get the {@link InputFormat} implementation for the map-reduce job,
* defaults to {@link TextInputFormat} if not specified explicity.
*
* @return the {@link InputFormat} implementation for the map-reduce job.
*/
public InputFormat getInputFormat() {
return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
TextInputFormat.class,
InputFormat.class),
this);
}
/**
* Deserialize a {@link Writable} object from a string.
*
* @param writableClass the {@link Writable} implementation class
* @param serializedWritableStr the string containing a serialized {@link Writable} object
* @param configuration a {@link Configuration} object containing Hadoop configuration properties
* @return a {@link Writable} deserialized from the string
* @throws IOException if there's something wrong with the deserialization
*/
public static Writable deserializeFromString(Class<? extends Writable> writableClass, String serializedWritableStr,
Configuration configuration) throws IOException {
byte[] writableBytes = BaseEncoding.base64().decode(serializedWritableStr);
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(writableBytes);
DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
Writable writable = ReflectionUtils.newInstance(writableClass, configuration);
writable.readFields(dataInputStream);
return writable;
}
}
public void configure(JobConf job) {
super.configure(job);
String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
this.ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()) ||
job.getBoolean("stream.map.ignoreKey", false);
this.skipNewline = job.getBoolean("stream.map.skipNewline", false);
Class<?> c = job.getClass("stream.map.posthook", null, Mapper.class);
if(c != null) {
postMapper = (Mapper)ReflectionUtils.newInstance(c, job);
LOG.info("PostHook="+c.getName());
}
}
/** Create a new instance of a class with a defined factory. */
public static Writable newInstance(Class<? extends Writable> c, Configuration conf) {
WritableFactory factory = WritableFactories.getFactory(c);
if (factory != null) {
Writable result = factory.newInstance();
if (result instanceof Configurable) {
((Configurable) result).setConf(conf);
}
return result;
} else {
return ReflectionUtils.newInstance(c, conf);
}
}
/**
* Create the RegionSplitPolicy configured for the given table.
* @param region
* @param conf
* @return a RegionSplitPolicy
* @throws IOException
*/
public static RegionSplitPolicy create(HRegion region,
Configuration conf) throws IOException {
Preconditions.checkNotNull(region, "Region should not be null.");
Class<? extends RegionSplitPolicy> clazz = getSplitPolicyClass(
region.getTableDescriptor(), conf);
RegionSplitPolicy policy = ReflectionUtils.newInstance(clazz, conf);
policy.configureForRegion(region);
return policy;
}
@SuppressWarnings("unchecked")
public DelegatingRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
// Find the InputFormat and then the RecordReader from the TaggedInputSplit.
TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(
taggedInputSplit.getInputFormatClass(), context.getConfiguration());
originalRR = inputFormat.createRecordReader(taggedInputSplit.getInputSplit(), context);
}