下面列出了怎么用org.apache.hadoop.fs.s3a.Constants的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void setup(Configuration conf) throws IOException, RuntimeException {
try {
s3 = clientCache.get(S3ClientKey.create(conf));
useWhitelistedBuckets = !conf.get(S3StoragePlugin.WHITELISTED_BUCKETS,"").isEmpty();
if (!NONE_PROVIDER.equals(conf.get(Constants.AWS_CREDENTIALS_PROVIDER))
&& !conf.getBoolean(COMPATIBILITY_MODE, false)) {
verifyCredentials(conf);
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause == null) {
throw new RuntimeException(e);
}
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfUnchecked(cause);
throw new RuntimeException(cause);
}
}
@VisibleForTesting
protected AwsCredentialsProvider getAsync2Provider(Configuration config) {
switch(config.get(Constants.AWS_CREDENTIALS_PROVIDER)) {
case ACCESS_KEY_PROVIDER:
return StaticCredentialsProvider.create(AwsBasicCredentials.create(
config.get(Constants.ACCESS_KEY), config.get(Constants.SECRET_KEY)));
case EC2_METADATA_PROVIDER:
return InstanceProfileCredentialsProvider.create();
case NONE_PROVIDER:
return AnonymousCredentialsProvider.create();
case ASSUME_ROLE_PROVIDER:
return new STSCredentialProviderV2(config);
default:
throw new IllegalStateException(config.get(Constants.AWS_CREDENTIALS_PROVIDER));
}
}
public STSCredentialProviderV1(URI uri, Configuration conf) throws IOException {
AWSCredentialsProvider awsCredentialsProvider = null;
//TODO: Leverage S3AUtils createAwsCredentialProvider
if (S3StoragePlugin.ACCESS_KEY_PROVIDER.equals(conf.get(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER))) {
awsCredentialsProvider = new SimpleAWSCredentialsProvider(uri, conf);
} else if (S3StoragePlugin.EC2_METADATA_PROVIDER.equals(conf.get(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER))) {
awsCredentialsProvider = InstanceProfileCredentialsProvider.getInstance();
}
final String region = S3FileSystem.getAWSRegionFromConfigurationOrDefault(conf).toString();
final AWSSecurityTokenServiceClientBuilder builder = AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withClientConfiguration(S3AUtils.createAwsConf(conf, ""))
.withRegion(region);
S3FileSystem.getStsEndpoint(conf).ifPresent(e -> {
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(e, region));
});
this.stsAssumeRoleSessionCredentialsProvider = new STSAssumeRoleSessionCredentialsProvider.Builder(
conf.get(Constants.ASSUMED_ROLE_ARN), UUID.randomUUID().toString())
.withStsClient(builder.build())
.build();
}
@Override
protected Stream<ContainerCreator> getContainerCreators() throws IOException {
Stream<String> buckets = getBucketNamesFromConfigurationProperty(S3StoragePlugin.EXTERNAL_BUCKETS);
if (!NONE_PROVIDER.equals(getConf().get(Constants.AWS_CREDENTIALS_PROVIDER))) {
if (!useWhitelistedBuckets) {
// if we have authentication to access S3, add in owner buckets.
buckets = Stream.concat(buckets, s3.listBuckets().stream().map(Bucket::getName));
} else {
// Only add the buckets provided in the configuration.
buckets = Stream.concat(buckets, getBucketNamesFromConfigurationProperty(S3StoragePlugin.WHITELISTED_BUCKETS));
}
}
return buckets.distinct() // Remove duplicate bucket names.
.map(input -> new BucketCreator(getConf(), input));
}
static software.amazon.awssdk.regions.Region getAWSRegionFromConfigurationOrDefault(Configuration conf) {
final String regionOverride = conf.getTrimmed(REGION_OVERRIDE);
if (!Strings.isNullOrEmpty(regionOverride)) {
// set the region to what the user provided unless they provided an empty string.
return software.amazon.awssdk.regions.Region.of(regionOverride);
}
return getAwsRegionFromEndpoint(conf.get(Constants.ENDPOINT));
}
public STSCredentialProviderV2(Configuration conf) {
AwsCredentialsProvider awsCredentialsProvider = null;
if (S3StoragePlugin.ACCESS_KEY_PROVIDER.equals(conf.get(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER))) {
awsCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(
conf.get(Constants.ACCESS_KEY), conf.get(Constants.SECRET_KEY)));
} else if (S3StoragePlugin.EC2_METADATA_PROVIDER.equals(conf.get(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER))) {
awsCredentialsProvider = InstanceProfileCredentialsProvider.create();
}
final StsClientBuilder builder = StsClient.builder()
.credentialsProvider(awsCredentialsProvider)
.region(S3FileSystem.getAWSRegionFromConfigurationOrDefault(conf))
.httpClientBuilder(initConnectionSettings(conf));
S3FileSystem.getStsEndpoint(conf).ifPresent(e -> {
try {
builder.endpointOverride(new URI(e));
} catch (URISyntaxException use) {
throw UserException.sourceInBadState(use).buildSilently();
}
});
initUserAgent(builder, conf);
final AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
.roleArn(conf.get(Constants.ASSUMED_ROLE_ARN))
.roleSessionName(UUID.randomUUID().toString())
.build();
this.stsAssumeRoleCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.refreshRequest(assumeRoleRequest)
.stsClient(builder.build())
.build();
}
public static SdkHttpClient.Builder initConnectionSettings(Configuration conf) {
final ApacheHttpClient.Builder httpBuilder = ApacheHttpClient.builder();
httpBuilder.maxConnections(intOption(conf, Constants.MAXIMUM_CONNECTIONS, Constants.DEFAULT_MAXIMUM_CONNECTIONS, 1));
httpBuilder.connectionTimeout(
Duration.ofSeconds(intOption(conf, Constants.ESTABLISH_TIMEOUT, Constants.DEFAULT_ESTABLISH_TIMEOUT, 0)));
httpBuilder.socketTimeout(
Duration.ofSeconds(intOption(conf, Constants.SOCKET_TIMEOUT, Constants.DEFAULT_SOCKET_TIMEOUT, 0)));
httpBuilder.proxyConfiguration(initProxySupport(conf));
return httpBuilder;
}
public static ProxyConfiguration initProxySupport(Configuration conf) throws IllegalArgumentException {
final ProxyConfiguration.Builder builder = ProxyConfiguration.builder();
final String proxyHost = conf.getTrimmed(Constants.PROXY_HOST, "");
int proxyPort = conf.getInt(Constants.PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
if (proxyPort < 0) {
if (conf.getBoolean(Constants.SECURE_CONNECTIONS, Constants.DEFAULT_SECURE_CONNECTIONS)) {
proxyPort = 443;
} else {
proxyPort = 80;
}
}
builder.endpoint(URI.create(proxyHost + ":" + proxyPort));
try {
final String proxyUsername = lookupPassword(conf, Constants.PROXY_USERNAME);
final String proxyPassword = lookupPassword(conf, Constants.PROXY_PASSWORD);
if ((proxyUsername == null) != (proxyPassword == null)) {
throw new IllegalArgumentException(String.format("Proxy error: %s or %s set without the other.",
Constants.PROXY_USERNAME, Constants.PROXY_PASSWORD));
}
builder.username(proxyUsername);
builder.password(proxyPassword);
builder.ntlmDomain(conf.getTrimmed(Constants.PROXY_DOMAIN));
builder.ntlmWorkstation(conf.getTrimmed(Constants.PROXY_WORKSTATION));
} catch (IOException e) {
throw UserException.sourceInBadState(e).buildSilently();
}
} else if (proxyPort >= 0) {
throw new IllegalArgumentException(String.format("Proxy error: %s set without %s",
Constants.PROXY_HOST, Constants.PROXY_PORT));
}
return builder.build();
}
private static void initUserAgent(StsClientBuilder builder, Configuration conf) {
String userAgent = "Hadoop " + VersionInfo.getVersion();
final String userAgentPrefix = conf.getTrimmed(Constants.USER_AGENT_PREFIX, "");
if (!userAgentPrefix.isEmpty()) {
userAgent = userAgentPrefix + ", " + userAgent;
}
builder.overrideConfiguration(ClientOverrideConfiguration.builder()
.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, userAgent)
.build());
}
public static void configure(SecorConfig config) {
if (config != null) {
if (config.getCloudService().equals("Swift")) {
mConf.set("fs.swift.impl", "org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem");
mConf.set("fs.swift.service.GENERICPROJECT.auth.url", config.getSwiftAuthUrl());
mConf.set("fs.swift.service.GENERICPROJECT.username", config.getSwiftUsername());
mConf.set("fs.swift.service.GENERICPROJECT.tenant", config.getSwiftTenant());
mConf.set("fs.swift.service.GENERICPROJECT.region", config.getSwiftRegion());
mConf.set("fs.swift.service.GENERICPROJECT.http.port", config.getSwiftPort());
mConf.set("fs.swift.service.GENERICPROJECT.use.get.auth", config.getSwiftGetAuth());
mConf.set("fs.swift.service.GENERICPROJECT.public", config.getSwiftPublic());
if (config.getSwiftGetAuth().equals("true")) {
mConf.set("fs.swift.service.GENERICPROJECT.apikey", config.getSwiftApiKey());
} else {
mConf.set("fs.swift.service.GENERICPROJECT.password", config.getSwiftPassword());
}
} else if (config.getCloudService().equals("S3")) {
if (config.getAwsAccessKey().isEmpty() != config.getAwsSecretKey().isEmpty()) {
throw new IllegalArgumentException(
"Must specify both aws.access.key and aws.secret.key or neither.");
}
if (!config.getAwsAccessKey().isEmpty()) {
mConf.set(Constants.ACCESS_KEY, config.getAwsAccessKey());
mConf.set(Constants.SECRET_KEY, config.getAwsSecretKey());
mConf.set("fs.s3n.awsAccessKeyId", config.getAwsAccessKey());
mConf.set("fs.s3n.awsSecretAccessKey", config.getAwsSecretKey());
}
}
}
}
@Override
public String toString() {
return "[ Access Key=" + s3Config.get(Constants.ACCESS_KEY) + ", Secret Key =*****, isSecure=" +
s3Config.get(SECURE_CONNECTIONS) + " ]";
}
static Optional<String> getEndpoint(Configuration conf) {
return Optional.ofNullable(conf.getTrimmed(Constants.ENDPOINT))
.map(s -> getHttpScheme(conf) + s);
}
static Optional<String> getStsEndpoint(Configuration conf) {
return Optional.ofNullable(conf.getTrimmed(Constants.ASSUMED_ROLE_STS_ENDPOINT))
.map(s -> "https://" + s);
}
@Override
protected List<Property> getProperties() {
final S3PluginConfig config = getConfig();
final List<Property> finalProperties = new ArrayList<>();
finalProperties.add(new Property(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, "dremioS3:///"));
finalProperties.add(new Property("fs.dremioS3.impl", S3FileSystem.class.getName()));
finalProperties.add(new Property(MAXIMUM_CONNECTIONS, String.valueOf(DEFAULT_MAX_CONNECTIONS)));
finalProperties.add(new Property(FAST_UPLOAD, "true"));
finalProperties.add(new Property(Constants.FAST_UPLOAD_BUFFER, "disk"));
finalProperties.add(new Property(Constants.FAST_UPLOAD_ACTIVE_BLOCKS, "4")); // 256mb (so a single parquet file should be able to flush at once).
finalProperties.add(new Property(MAX_THREADS, "24"));
finalProperties.add(new Property(MULTIPART_SIZE, "67108864")); // 64mb
finalProperties.add(new Property(MAX_TOTAL_TASKS, "30"));
if(config.compatibilityMode) {
finalProperties.add(new Property(S3FileSystem.COMPATIBILITY_MODE, "true"));
}
String mainAWSCredProvider;
switch (config.credentialType) {
case ACCESS_KEY:
if (("".equals(config.accessKey)) || ("".equals(config.accessSecret))) {
throw UserException.validationError()
.message("Failure creating S3 connection. You must provide AWS Access Key and AWS Access Secret.")
.build(logger);
}
mainAWSCredProvider = ACCESS_KEY_PROVIDER;
finalProperties.add(new Property(ACCESS_KEY, config.accessKey));
finalProperties.add(new Property(SECRET_KEY, config.accessSecret));
break;
case EC2_METADATA:
mainAWSCredProvider = EC2_METADATA_PROVIDER;
break;
case NONE:
mainAWSCredProvider = NONE_PROVIDER;
break;
default:
throw new RuntimeException("Failure creating S3 connection. Invalid credentials type.");
}
if (!Strings.isNullOrEmpty(config.assumedRoleARN) && !NONE_PROVIDER.equals(mainAWSCredProvider)) {
finalProperties.add(new Property(Constants.ASSUMED_ROLE_ARN, config.assumedRoleARN));
finalProperties.add(new Property(Constants.ASSUMED_ROLE_CREDENTIALS_PROVIDER, mainAWSCredProvider));
mainAWSCredProvider = ASSUME_ROLE_PROVIDER;
}
finalProperties.add(new Property(Constants.AWS_CREDENTIALS_PROVIDER, mainAWSCredProvider));
final List<Property> propertyList = super.getProperties();
if (propertyList != null && !propertyList.isEmpty()) {
finalProperties.addAll(propertyList);
}
finalProperties.add(new Property(SECURE_CONNECTIONS, String.valueOf(config.secure)));
if (config.externalBucketList != null && !config.externalBucketList.isEmpty()) {
finalProperties.add(new Property(EXTERNAL_BUCKETS, Joiner.on(",").join(config.externalBucketList)));
} else {
if (config.credentialType == AWSAuthenticationType.NONE) {
throw UserException.validationError()
.message("Failure creating S3 connection. You must provide one or more external buckets when you choose no authentication.")
.build(logger);
}
}
if (config.whitelistedBuckets != null && !config.whitelistedBuckets.isEmpty()) {
finalProperties.add(new Property(WHITELISTED_BUCKETS, Joiner.on(",").join(config.whitelistedBuckets)));
}
if (!Strings.isNullOrEmpty(config.kmsKeyARN)) {
finalProperties.add(new Property(SERVER_SIDE_ENCRYPTION_KEY, config.kmsKeyARN));
finalProperties.add(new Property(SERVER_SIDE_ENCRYPTION_ALGORITHM, S3AEncryptionMethods.SSE_KMS.getMethod()));
finalProperties.add(new Property(SECURE_CONNECTIONS, Boolean.TRUE.toString()));
}
finalProperties.add(new Property(ALLOW_REQUESTER_PAYS, Boolean.toString(config.requesterPays)));
return finalProperties;
}