下面列出了怎么用com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testTerminatedRunningCluster()
{
ClusterManager processor = new ClusterManager();
processor.setClusterID("abcd");
processor.setClusterName("test name");
JdbcTemplate template = Mockito.mock(JdbcTemplate.class);
AmazonElasticMapReduceClient elasticMapReduceClient = Mockito.mock(AmazonElasticMapReduceClient.class);
DescribeClusterResult describeClusterResult = Mockito.mock(DescribeClusterResult.class);
Cluster cluster=Mockito.mock(Cluster.class);
when(template.queryForObject(ObjectProcessor.SELECT_CLUSTER_ID_SQL, new Object[]{"test name"}, String.class)).thenReturn("running_cluster").thenReturn("abcd");
when(elasticMapReduceClient.describeCluster(any(DescribeClusterRequest.class))).thenReturn(describeClusterResult);
when(describeClusterResult.getCluster()).thenReturn(cluster);
ClusterStatus clusterStatus = new ClusterStatus();
clusterStatus.setState(ClusterState.TERMINATED_WITH_ERRORS);
when(cluster.getStatus()).thenReturn(clusterStatus);
processor.setTemplate(template);
processor.setEmrClient(elasticMapReduceClient);
assertEquals(true, processor.registerCluster());
}
@Test
@Transactional
public void testAutoScale() throws Exception
{
ClusterManager clusterManager = new ClusterManager();
clusterManager.setTemplate(template);
String sql = new String(Files.readAllBytes(Paths.get("src/test/resources/dbunit/cluster_dataset.sql")));
template.update(sql);
sql = "INSERT INTO EMR_CLUSTER (CLUSTER_NAME,CLUSTER_ID, CREATE_TIME, STATUS) VALUES ('A', 'B', now(), 'R');";
template.update(sql);
assertEquals(2, clusterManager.calculateNumberOfClustersNeeded());
AmazonElasticMapReduceClient emrClient = Mockito.mock(AmazonElasticMapReduceClient.class);
clusterManager.setEmrClient(emrClient);
clusterManager.dmRestClient = dmApiClient;
// clusterManager.clusterAutoScale();
// verify(restClient).performXmlPost(any(String.class), any(String.class));
}
public void monitorEMRStep() throws Exception {
List<String> stepIds = new ArrayList<String>();
Connection conn = new com.mysql.jdbc.Driver().connect(props.getProperty("url"), props);
ResultSet openStepsRS = conn.createStatement().executeQuery(props.getProperty("sql.retrieveOpenSteps"));
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
DescribeStepRequest stepReq=new DescribeStepRequest();
PreparedStatement ps = conn.prepareStatement(props.getProperty("sql.updateStepStatus"));
while(openStepsRS.next()){
stepReq.setClusterId(openStepsRS.getString("cluster_id"));
stepReq.setStepId(openStepsRS.getString("step_id"));
String stepState = emr.describeStep(stepReq).getStep().getStatus().getState();
if(stepState.equals(StepState.COMPLETED.toString())){
ps.setString(1,StepState.COMPLETED.toString());
}else if (stepState.equals(StepState.FAILED.toString())){
ps.setString(1,StepState.FAILED.toString());
}
ps.setString(2,openStepsRS.getString("job_config_id"));
ps.addBatch();
}
ps.executeBatch();
ps.close();
conn.close();
}
protected List<String> getActiveTaggedClusters() throws Exception{
AmazonElasticMapReduceClient emrClient = new AmazonElasticMapReduceClient();
List<String> waitingClusters = new ArrayList<String>();
ListClustersResult clusterResult = emrClient.listClusters(new ListClustersRequest().withClusterStates(ClusterState.WAITING));
DescribeClusterRequest specifcTagDescribe = new DescribeClusterRequest();
specifcTagDescribe.putCustomQueryParameter("Cluster.Tags",null);
for( ClusterSummary cluster : clusterResult.getClusters()){
System.out.println("list cluster id "+cluster.getId());
List<Tag> tagList = emrClient.describeCluster(specifcTagDescribe.withClusterId(cluster.getId())).getCluster().getTags();
for(Tag tag:tagList){
if(tag.getKey().equals(props.getProperty("edba.cluster.tag.key"))){
waitingClusters.add(cluster.getId());
}
}
}
return waitingClusters;
}
/**
* Terminate EMR cluster, overrides terminate protection if requested.
*/
@Override
public void terminateEmrCluster(AmazonElasticMapReduceClient emrClient, String clusterId, boolean overrideTerminationProtection)
{
// Override terminate protection if requested.
if (overrideTerminationProtection)
{
// Set termination protection
emrClient.setTerminationProtection(new SetTerminationProtectionRequest().withJobFlowIds(clusterId).withTerminationProtected(false));
}
// Terminate the job flow
emrClient.terminateJobFlows(new TerminateJobFlowsRequest().withJobFlowIds(clusterId));
}
@Test
public void getEmrClientAssertClientConfigurationSet()
{
String httpProxyHost = "httpProxyHost";
Integer httpProxyPort = 1234;
AwsParamsDto awsParamsDto = getAwsParamsDto();
awsParamsDto.setHttpProxyHost(httpProxyHost);
awsParamsDto.setHttpProxyPort(httpProxyPort);
AmazonElasticMapReduceClient amazonElasticMapReduceClient = emrDao.getEmrClient(awsParamsDto);
ClientConfiguration clientConfiguration = (ClientConfiguration) ReflectionTestUtils.getField(amazonElasticMapReduceClient, "clientConfiguration");
assertNotNull(clientConfiguration);
assertEquals(httpProxyHost, clientConfiguration.getProxyHost());
assertEquals(httpProxyPort.intValue(), clientConfiguration.getProxyPort());
}
@Test
public void getEmrClientAssertClientConfigurationNotSetWhenProxyHostIsBlank()
{
String httpProxyHost = "";
Integer httpProxyPort = 1234;
AwsParamsDto awsParamsDto = getAwsParamsDto();
awsParamsDto.setHttpProxyHost(httpProxyHost);
awsParamsDto.setHttpProxyPort(httpProxyPort);
AmazonElasticMapReduceClient amazonElasticMapReduceClient = emrDao.getEmrClient(awsParamsDto);
ClientConfiguration clientConfiguration = (ClientConfiguration) ReflectionTestUtils.getField(amazonElasticMapReduceClient, "clientConfiguration");
assertNotNull(clientConfiguration);
assertNull(clientConfiguration.getProxyHost());
}
@Test
public void getEmrClientAssertClientConfigurationNotSetWhenProxyPortIsNull()
{
String httpProxyHost = "httpProxyHost";
Integer httpProxyPort = null;
AwsParamsDto awsParamsDto = getAwsParamsDto();
awsParamsDto.setHttpProxyHost(httpProxyHost);
awsParamsDto.setHttpProxyPort(httpProxyPort);
AmazonElasticMapReduceClient amazonElasticMapReduceClient = emrDao.getEmrClient(awsParamsDto);
ClientConfiguration clientConfiguration = (ClientConfiguration) ReflectionTestUtils.getField(amazonElasticMapReduceClient, "clientConfiguration");
assertNotNull(clientConfiguration);
assertNull(clientConfiguration.getProxyHost());
}
@Override
public ListClustersResult listEmrClusters(AmazonElasticMapReduceClient emrClient, ListClustersRequest listClustersRequest)
{
List<ClusterSummary> clusterSummaryList = new ArrayList<>();
for (MockEmrJobFlow cluster : emrClusters.values())
{
if (!listClustersRequest.getClusterStates().isEmpty() && listClustersRequest.getClusterStates().contains(cluster.getStatus()))
{
ClusterSummary clusterSummary = new ClusterSummary();
clusterSummary.withId(cluster.getJobFlowId()).withName(cluster.getJobFlowName()).withStatus(new ClusterStatus().withState(cluster.getStatus())
.withStateChangeReason(new ClusterStateChangeReason().withCode(cluster.getStatusChangeReason().getCode())
.withMessage(cluster.getStatusChangeReason().getMessage())).withTimeline(new ClusterTimeline().withCreationDateTime(
cluster.getStatusTimeline().getCreationTime() != null ? cluster.getStatusTimeline().getCreationTime().toGregorianCalendar().getTime() :
null).withEndDateTime(
cluster.getStatusTimeline().getEndTime() != null ? cluster.getStatusTimeline().getEndTime().toGregorianCalendar().getTime() : null)
.withReadyDateTime(
cluster.getStatusTimeline().getReadyTime() != null ? cluster.getStatusTimeline().getReadyTime().toGregorianCalendar().getTime() :
null)));
clusterSummaryList.add(clusterSummary);
}
}
if (StringUtils.isBlank(listClustersRequest.getMarker()))
{
return new ListClustersResult().withClusters(clusterSummaryList).withMarker(MOCK_EMR_MAKER);
}
else
{
return new ListClustersResult().withClusters(clusterSummaryList);
}
}
@Override
public ListInstancesResult listClusterInstancesRequest(AmazonElasticMapReduceClient emrClient, ListInstancesRequest listInstancesRequest)
{
MockEmrJobFlow cluster =
getClusterByName(buildEmrClusterName(AbstractDaoTest.NAMESPACE, AbstractDaoTest.EMR_CLUSTER_DEFINITION_NAME, MOCK_CLUSTER_NOT_PROVISIONED_NAME));
if (cluster != null && listInstancesRequest.getClusterId().equals(cluster.getJobFlowId()))
{
return new ListInstancesResult();
}
Instance instance = new Instance().withEc2InstanceId("EC2_EMR_MASTER_INSTANCE").withPrivateIpAddress("INSTANCE_IP_ADDRESS");
return new ListInstancesResult().withInstances(instance);
}
@Override
public void terminateEmrCluster(AmazonElasticMapReduceClient emrClient, String clusterId, boolean overrideTerminationProtection)
{
MockEmrJobFlow cluster = getClusterById(clusterId);
if (cluster.getJobFlowName().endsWith(MockAwsOperationsHelper.AMAZON_SERVICE_EXCEPTION))
{
throw new AmazonServiceException(MockAwsOperationsHelper.AMAZON_SERVICE_EXCEPTION);
}
cluster.setStatus(ClusterState.TERMINATED.toString());
}
@Override
public ListInstanceFleetsResult listInstanceFleets(AmazonElasticMapReduceClient emrClient, ListInstanceFleetsRequest listInstanceFleetsRequest)
{
ListInstanceFleetsResult listInstanceFleetsResult = new ListInstanceFleetsResult();
List<InstanceFleet> instanceFleets = new ArrayList<>();
InstanceFleet instanceFleet = new InstanceFleet();
instanceFleet.setId("mock_instance_id_1");
instanceFleet.setName("mock_instance_name");
instanceFleets.add(instanceFleet);
listInstanceFleetsResult.setInstanceFleets(instanceFleets);
return listInstanceFleetsResult;
}
@Test
public void testGetListInstanceFleetsResult()
{
// Create an AWS parameters DTO.
AwsParamsDto awsParamsDto =
new AwsParamsDto(AWS_ASSUMED_ROLE_ACCESS_KEY, AWS_ASSUMED_ROLE_SECRET_KEY, AWS_ASSUMED_ROLE_SESSION_TOKEN, HTTP_PROXY_HOST, HTTP_PROXY_PORT,
NO_AWS_REGION_NAME);
// Create a mock AmazonElasticMapReduceClient.
AmazonElasticMapReduceClient amazonElasticMapReduceClient = mock(AmazonElasticMapReduceClient.class);
// Create a list instance fleets request.
ListInstanceFleetsRequest listInstanceFleetsRequest = new ListInstanceFleetsRequest().withClusterId(EMR_CLUSTER_ID);
// Create a list instance fleets result.
ListInstanceFleetsResult listInstanceFleetsResult = new ListInstanceFleetsResult().withMarker(MARKER);
// Mock the external calls.
when(awsClientFactory.getEmrClient(awsParamsDto)).thenReturn(amazonElasticMapReduceClient);
when(emrOperations.listInstanceFleets(amazonElasticMapReduceClient, listInstanceFleetsRequest)).thenReturn(listInstanceFleetsResult);
// Call the method under test.
ListInstanceFleetsResult result = emrDaoImpl.getListInstanceFleetsResult(EMR_CLUSTER_ID, awsParamsDto);
// Verify the external calls.
verify(awsClientFactory).getEmrClient(awsParamsDto);
verify(emrOperations).listInstanceFleets(amazonElasticMapReduceClient, listInstanceFleetsRequest);
verifyNoMoreInteractionsHelper();
// Validate the results.
assertEquals(listInstanceFleetsResult, result);
}
/**
* Helper class to create and Amazon EMR cluster with HBase installed on that cluster
*
* @param clusterIdentifier - cluster id if one exists
* @param clusterName - name you want associated with this cluster
* @param amiVersion - version of AMI that you wish to use for your HBase cluster
* @param keypair - you need a keypair to SSH into the cluster
* @param masterInstanceType - Amazon EC2 instance type for your master node
* @param coreInstanceType - Amazon Ec2 instance tyoe for your core nodes
* @param logUri - Specify a bucket for your EMR logs
* @param numberOfNodes - total number of nodes in your cluster including the master node
*/
private void createEMRCluster(String clusterIdentifier,
String clusterName,
String amiVersion,
String keypair,
String masterInstanceType,
String coreInstanceType,
String logUri,
int numberOfNodes) {
// Make sure the EMR cluster is available
AmazonElasticMapReduceClient emrClient = new AmazonElasticMapReduceClient(config.AWS_CREDENTIALS_PROVIDER);
emrClient.setEndpoint(config.EMR_ENDPOINT);
String clusterid = clusterIdentifier;
if (!EMRUtils.clusterExists(emrClient, clusterIdentifier)) {
clusterid = EMRUtils.createCluster(emrClient,
clusterIdentifier,
amiVersion,
keypair,
masterInstanceType,
coreInstanceType,
logUri,
numberOfNodes);
}
// Update the emr cluster id and public DNS properties
config.EMR_CLUSTER_IDENTIFIER = clusterid;
config.EMR_CLUSTER_PUBLIC_DNS = EMRUtils.getPublicDns(emrClient, clusterid);
//make sure table exists
if (!HBaseUtils.tablesExists(config.HBASE_TABLE_NAME, config.EMR_CLUSTER_PUBLIC_DNS, config.HBASE_REST_PORT)){
HBaseUtils.createTable(config.HBASE_TABLE_NAME, config.EMR_CLUSTER_PUBLIC_DNS, config.HBASE_REST_PORT);
}
}
public HBaseEmitter(EMRHBaseKinesisConnectorConfiguration configuration) {
// DynamoDB Config
this.emrEndpoint = configuration.EMR_ENDPOINT;
this.hbaseTableName = configuration.HBASE_TABLE_NAME;
this.hbaseRestPort = configuration.HBASE_REST_PORT;
this.emrPublicDns = configuration.EMR_CLUSTER_PUBLIC_DNS;
// Client
this.emrClient = new AmazonElasticMapReduceClient(configuration.AWS_CREDENTIALS_PROVIDER);
this.emrClient.setEndpoint(this.emrEndpoint);
LOG.info("EMRHBaseEmitter.....");
}
protected String fireEMRJob(String paramsStr,String clusterId){
StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
emr.setRegion(Region.getRegion(Regions.fromName(System.getenv().get("AWS_REGION"))));
Application sparkConfig = new Application()
.withName("Spark");
String[] params = paramsStr.split(",");
StepConfig enabledebugging = new StepConfig()
.withName("Enable debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newEnableDebuggingStep());
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs(params);
final StepConfig sparkStep = new StepConfig()
.withName("Spark Step")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(sparkStepConf);
AddJobFlowStepsRequest request = new AddJobFlowStepsRequest(clusterId)
.withSteps(new ArrayList<StepConfig>(){{add(sparkStep);}});
AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
return result.getStepIds().get(0);
}
void setEmrClient(AmazonElasticMapReduceClient emrClient) {
this.emrClient = emrClient;
}
@Override
public AmazonElasticMapReduceClient getEmrClient(AwsParamsDto awsParamsDto)
{
return (AmazonElasticMapReduceClient) awsClientFactory.getEmrClient(awsParamsDto);
}
/**
* List EMR cluster instances
*/
@Override
public ListInstancesResult listClusterInstancesRequest(AmazonElasticMapReduceClient emrClient, ListInstancesRequest listInstancesRequest)
{
return emrClient.listInstances(listInstancesRequest);
}
/**
* Run Job Flow to AmazonElasticMapReduceClient
*/
@Override
public String runEmrJobFlow(AmazonElasticMapReduceClient emrClient, RunJobFlowRequest jobFlowRequest)
{
return emrClient.runJobFlow(jobFlowRequest).getJobFlowId();
}
/**
* List the EMR Clusters in the account
*/
@Override
public ListClustersResult listEmrClusters(AmazonElasticMapReduceClient emrClient, ListClustersRequest listClustersRequest)
{
return emrClient.listClusters(listClustersRequest);
}
@Override
public ListInstanceFleetsResult listInstanceFleets(AmazonElasticMapReduceClient emrClient, ListInstanceFleetsRequest listInstanceFleetsRequest)
{
return emrClient.listInstanceFleets(listInstanceFleetsRequest);
}
@Override
public String runEmrJobFlow(AmazonElasticMapReduceClient emrClient, RunJobFlowRequest jobFlowRequest)
{
String clusterStatus = ClusterState.BOOTSTRAPPING.toString();
StatusChangeReason reason = new StatusChangeReason(ClusterStateChangeReasonCode.USER_REQUEST.toString(), "Started " + clusterStatus);
StatusTimeline timeline = new StatusTimeline();
timeline.setCreationTime(HerdDateUtils.getXMLGregorianCalendarValue(new Date()));
if (StringUtils.isNotBlank(jobFlowRequest.getAmiVersion()))
{
if (jobFlowRequest.getAmiVersion().equals(MockAwsOperationsHelper.AMAZON_THROTTLING_EXCEPTION))
{
AmazonServiceException throttlingException = new AmazonServiceException("test throttling exception");
throttlingException.setErrorCode("ThrottlingException");
throw throttlingException;
}
else if (jobFlowRequest.getAmiVersion().equals(MockAwsOperationsHelper.AMAZON_BAD_REQUEST))
{
AmazonServiceException badRequestException = new AmazonServiceException(MockAwsOperationsHelper.AMAZON_BAD_REQUEST);
badRequestException.setStatusCode(HttpStatus.SC_BAD_REQUEST);
throw badRequestException;
}
else if (jobFlowRequest.getAmiVersion().equals(MockAwsOperationsHelper.AMAZON_NOT_FOUND))
{
AmazonServiceException notFoundException = new AmazonServiceException(MockAwsOperationsHelper.AMAZON_NOT_FOUND);
notFoundException.setStatusCode(HttpStatus.SC_NOT_FOUND);
throw notFoundException;
}
else if (jobFlowRequest.getAmiVersion().equals(MockAwsOperationsHelper.AMAZON_SERVICE_EXCEPTION))
{
throw new AmazonServiceException(MockAwsOperationsHelper.AMAZON_SERVICE_EXCEPTION);
}
else if (jobFlowRequest.getAmiVersion().equals(MockAwsOperationsHelper.AMAZON_CLUSTER_STATUS_WAITING))
{
clusterStatus = ClusterState.WAITING.toString();
}
else if (jobFlowRequest.getAmiVersion().equals(MockAwsOperationsHelper.AMAZON_CLUSTER_STATUS_RUNNING))
{
clusterStatus = ClusterState.RUNNING.toString();
}
}
return createNewCluster(jobFlowRequest, clusterStatus, reason, timeline).getJobFlowId();
}
@Test
public void testCreateEmrClusterWithNscdBootstrapScript()
{
// Create an AWS parameters DTO.
final AwsParamsDto awsParamsDto =
new AwsParamsDto(AWS_ASSUMED_ROLE_ACCESS_KEY, AWS_ASSUMED_ROLE_SECRET_KEY, AWS_ASSUMED_ROLE_SESSION_TOKEN, HTTP_PROXY_HOST, HTTP_PROXY_PORT,
AWS_REGION_NAME_US_EAST_1);
EmrClusterDefinition emrClusterDefinition = new EmrClusterDefinition();
final InstanceDefinitions instanceDefinitions =
new InstanceDefinitions(new MasterInstanceDefinition(), new InstanceDefinition(), new InstanceDefinition());
emrClusterDefinition.setInstanceDefinitions(instanceDefinitions);
emrClusterDefinition.setNodeTags(Collections.emptyList());
when(configurationHelper.getProperty(ConfigurationValue.EMR_NSCD_SCRIPT)).thenReturn(EMR_NSCD_SCRIPT);
when(configurationHelper.getProperty(ConfigurationValue.S3_URL_PROTOCOL)).thenReturn(S3_URL_PROTOCOL);
when(configurationHelper.getProperty(ConfigurationValue.S3_STAGING_BUCKET_NAME)).thenReturn(S3_BUCKET_NAME);
when(configurationHelper.getProperty(ConfigurationValue.S3_STAGING_RESOURCE_BASE)).thenReturn(S3_STAGING_RESOURCE_BASE);
when(configurationHelper.getProperty(ConfigurationValue.S3_URL_PATH_DELIMITER)).thenReturn(S3_URL_PATH_DELIMITER);
when(configurationHelper.getProperty(ConfigurationValue.EMR_CONFIGURE_DAEMON)).thenReturn(EMR_CONFIGURE_DAEMON);
List<Parameter> daemonConfigs = new ArrayList<>();
Parameter daemonConfig = new Parameter();
daemonConfig.setName(EMR_CLUSTER_DAEMON_CONFIG_NAME);
daemonConfig.setValue(EMR_CLUSTER_DAEMON_CONFIG_VALUE);
daemonConfigs.add(daemonConfig);
emrClusterDefinition.setDaemonConfigurations(daemonConfigs);
AmazonElasticMapReduce amazonElasticMapReduce = AmazonElasticMapReduceClientBuilder.standard().withRegion(awsParamsDto.getAwsRegionName())
.build();
when(awsClientFactory.getEmrClient(awsParamsDto)).thenReturn(amazonElasticMapReduce);
when(awsClientFactory.getEmrClient(awsParamsDto)).thenReturn(amazonElasticMapReduce);
when(emrOperations.runEmrJobFlow(amazonElasticMapReduceClientArgumentCaptor.capture(), runJobFlowRequestArgumentCaptor.capture()))
.thenReturn(EMR_CLUSTER_ID);
// Create the cluster
String clusterId = emrDaoImpl.createEmrCluster(EMR_CLUSTER_NAME, emrClusterDefinition, awsParamsDto);
// Verifications
RunJobFlowRequest runJobFlowRequest = runJobFlowRequestArgumentCaptor.getValue();
assertEquals(clusterId, EMR_CLUSTER_ID);
verify(configurationHelper).getProperty(ConfigurationValue.EMR_NSCD_SCRIPT);
verify(configurationHelper).getProperty(ConfigurationValue.S3_URL_PROTOCOL);
verify(configurationHelper).getProperty(ConfigurationValue.S3_STAGING_BUCKET_NAME);
verify(configurationHelper).getProperty(ConfigurationValue.S3_STAGING_RESOURCE_BASE);
verify(configurationHelper).getProperty(ConfigurationValue.EMR_CONFIGURE_DAEMON);
verify(awsClientFactory).getEmrClient(awsParamsDto);
verify(emrOperations).runEmrJobFlow((AmazonElasticMapReduceClient) amazonElasticMapReduce, runJobFlowRequest);
List<BootstrapActionConfig> bootstrapActionConfigs = runJobFlowRequest.getBootstrapActions();
// There should be two bootstrap actions: NSCD script, and emr daemon config
assertEquals(2, bootstrapActionConfigs.size());
// Verify NSCD bootstrap action
assertEquals(ConfigurationValue.EMR_NSCD_SCRIPT.getKey(), bootstrapActionConfigs.get(0).getName());
assertEquals(String
.format("%s%s%s%s%s%s", S3_URL_PROTOCOL, S3_BUCKET_NAME, S3_URL_PATH_DELIMITER, S3_STAGING_RESOURCE_BASE, S3_URL_PATH_DELIMITER, EMR_NSCD_SCRIPT),
bootstrapActionConfigs.get(0).getScriptBootstrapAction().getPath());
// Verify EMR configure daemon bootstrap action
assertEquals(ConfigurationValue.EMR_CONFIGURE_DAEMON.getKey(), bootstrapActionConfigs.get(1).getName());
assertEquals(EMR_CONFIGURE_DAEMON, bootstrapActionConfigs.get(1).getScriptBootstrapAction().getPath());
assertEquals(String.format("%s=%s", EMR_CLUSTER_DAEMON_CONFIG_NAME, EMR_CLUSTER_DAEMON_CONFIG_VALUE),
bootstrapActionConfigs.get(1).getScriptBootstrapAction().getArgs().get(0));
}
@Test
public void testGetActiveEmrClusterByName()
{
// Create an AWS parameters DTO.
AwsParamsDto awsParamsDto =
new AwsParamsDto(AWS_ASSUMED_ROLE_ACCESS_KEY, AWS_ASSUMED_ROLE_SECRET_KEY, AWS_ASSUMED_ROLE_SESSION_TOKEN, HTTP_PROXY_HOST, HTTP_PROXY_PORT,
AWS_REGION_NAME_US_EAST_1);
// Create a mock AmazonElasticMapReduceClient.
AmazonElasticMapReduceClient amazonElasticMapReduceClient = mock(AmazonElasticMapReduceClient.class);
// Create a list cluster request.
ListClustersRequest listClustersRequest = new ListClustersRequest().withClusterStates(EMR_VALID_STATE);
// Create a list cluster result with a non-matching cluster and a marker.
ListClustersResult listClusterResultWithMarker =
new ListClustersResult().withClusters(new ClusterSummary().withName(INVALID_VALUE).withId(EMR_CLUSTER_ID)).withMarker(MARKER);
// Create a list cluster request with marker.
ListClustersRequest listClustersRequestWithMarker = new ListClustersRequest().withClusterStates(EMR_VALID_STATE).withMarker(MARKER);
// Create a cluster summary.
ClusterSummary clusterSummary = new ClusterSummary().withName(EMR_CLUSTER_NAME).withId(EMR_CLUSTER_ID);
// Create a list cluster result with the matching cluster.
ListClustersResult listClusterResult = new ListClustersResult().withClusters(clusterSummary);
// Mock the external calls.
when(configurationHelper.getProperty(ConfigurationValue.EMR_VALID_STATES)).thenReturn(EMR_VALID_STATE);
when(configurationHelper.getProperty(ConfigurationValue.FIELD_DATA_DELIMITER))
.thenReturn((String) ConfigurationValue.FIELD_DATA_DELIMITER.getDefaultValue());
when(awsClientFactory.getEmrClient(awsParamsDto)).thenReturn(amazonElasticMapReduceClient);
when(emrOperations.listEmrClusters(amazonElasticMapReduceClient, listClustersRequest)).thenReturn(listClusterResultWithMarker);
when(emrOperations.listEmrClusters(amazonElasticMapReduceClient, listClustersRequestWithMarker)).thenReturn(listClusterResult);
// Call the method under test.
ClusterSummary result = emrDaoImpl.getActiveEmrClusterByNameAndAccountId(EMR_CLUSTER_NAME, null, awsParamsDto);
// Verify the external calls.
verify(configurationHelper).getProperty(ConfigurationValue.EMR_VALID_STATES);
verify(configurationHelper).getProperty(ConfigurationValue.FIELD_DATA_DELIMITER);
verify(awsClientFactory, times(2)).getEmrClient(awsParamsDto);
verify(emrOperations, times(2)).listEmrClusters(eq(amazonElasticMapReduceClient), any(ListClustersRequest.class));
verifyNoMoreInteractionsHelper();
// Validate the results.
assertEquals(clusterSummary, result);
}
private void testGetActiveEmrClusterByNameWithTimestamps(EmrClusterCacheTimestamps emrClusterCacheTimestamps, Date createdAfter)
{
// Create an AWS parameters DTO.
AwsParamsDto awsParamsDto =
new AwsParamsDto(AWS_ASSUMED_ROLE_ACCESS_KEY, AWS_ASSUMED_ROLE_SECRET_KEY, AWS_ASSUMED_ROLE_SESSION_TOKEN, HTTP_PROXY_HOST, HTTP_PROXY_PORT,
AWS_REGION_NAME_US_EAST_1);
// Create a mock AmazonElasticMapReduceClient.
AmazonElasticMapReduceClient amazonElasticMapReduceClient = mock(AmazonElasticMapReduceClient.class);
// Create a list cluster request.
ListClustersRequest listClustersRequest = new ListClustersRequest().withClusterStates(EMR_VALID_STATE).withCreatedAfter(createdAfter);
// Create a list cluster result with a non-matching cluster and a marker.
ListClustersResult listClusterResultWithMarker =
new ListClustersResult().withClusters(new ClusterSummary().withName(INVALID_VALUE).withId(EMR_CLUSTER_ID)).withMarker(MARKER);
// Create a list cluster request with marker.
ListClustersRequest listClustersRequestWithMarker =
new ListClustersRequest().withClusterStates(EMR_VALID_STATE).withMarker(MARKER).withCreatedAfter(createdAfter);
// Create a cluster summary.
ClusterSummary clusterSummary = new ClusterSummary().withName(EMR_CLUSTER_NAME).withId(EMR_CLUSTER_ID);
// Create a list cluster result with the matching cluster.
ListClustersResult listClusterResult = new ListClustersResult().withClusters(clusterSummary);
// Mock the external calls.
when(emrClusterCacheTimestampsMap.get(EMR_CLUSTER_CACHE_MAP_DEFAULT_AWS_ACCOUNT_ID_KEY)).thenReturn(emrClusterCacheTimestamps);
when(configurationHelper.getProperty(ConfigurationValue.EMR_VALID_STATES)).thenReturn(EMR_VALID_STATE);
when(configurationHelper.getProperty(ConfigurationValue.FIELD_DATA_DELIMITER))
.thenReturn((String) ConfigurationValue.FIELD_DATA_DELIMITER.getDefaultValue());
when(awsClientFactory.getEmrClient(awsParamsDto)).thenReturn(amazonElasticMapReduceClient);
when(emrOperations.listEmrClusters(amazonElasticMapReduceClient, listClustersRequest)).thenReturn(listClusterResultWithMarker);
when(emrOperations.listEmrClusters(amazonElasticMapReduceClient, listClustersRequestWithMarker)).thenReturn(listClusterResult);
// Call the method under test.
ClusterSummary result = emrDaoImpl.getActiveEmrClusterByNameAndAccountId(EMR_CLUSTER_NAME, null, awsParamsDto);
// Verify the external calls.
verify(configurationHelper).getProperty(ConfigurationValue.EMR_VALID_STATES);
verify(configurationHelper).getProperty(ConfigurationValue.FIELD_DATA_DELIMITER);
verify(awsClientFactory, times(2)).getEmrClient(awsParamsDto);
verify(emrOperations, times(2)).listEmrClusters(eq(amazonElasticMapReduceClient), any(ListClustersRequest.class));
verifyNoMoreInteractionsHelper();
// Validate the results.
assertEquals(clusterSummary, result);
}
private void getActiveEmrClusterByNameAndAccountIdClusterNameIsInCache(Cluster cluster, String accountId)
{
// Create an AWS parameters DTO.
AwsParamsDto awsParamsDto =
new AwsParamsDto(AWS_ASSUMED_ROLE_ACCESS_KEY, AWS_ASSUMED_ROLE_SECRET_KEY, AWS_ASSUMED_ROLE_SESSION_TOKEN, HTTP_PROXY_HOST, HTTP_PROXY_PORT,
AWS_REGION_NAME_US_EAST_1);
// Create a mock AmazonElasticMapReduceClient.
AmazonElasticMapReduceClient amazonElasticMapReduceClient = mock(AmazonElasticMapReduceClient.class);
// Create a cluster summary.
ClusterSummary clusterSummary =
new ClusterSummary().withName(EMR_CLUSTER_NAME).withId(EMR_CLUSTER_ID).withStatus(cluster == null ? null : cluster.getStatus());
// Create a list cluster result with the matching cluster.
ListClustersResult listClusterResult = new ListClustersResult().withClusters(clusterSummary);
// Create a describe cluster result.
DescribeClusterResult describeClusterResult = new DescribeClusterResult().withCluster(cluster);
// Create a describe cluster request.
DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest().withClusterId(EMR_CLUSTER_ID);
// Build the EMR cluster cache key
EmrClusterCacheKey emrClusterCacheKey = new EmrClusterCacheKey(EMR_CLUSTER_NAME.toUpperCase(), accountId);
// Build the EMR cluster cache
Map<EmrClusterCacheKey, String> emrClusterCache = new ConcurrentHashMap<>();
emrClusterCache.put(emrClusterCacheKey, EMR_CLUSTER_ID);
// Mock the external calls.
if (accountId == null)
{
when(emrClusterCacheMap.get(EMR_CLUSTER_CACHE_MAP_DEFAULT_AWS_ACCOUNT_ID_KEY)).thenReturn(emrClusterCache);
}
else
{
when(emrClusterCacheMap.get(accountId)).thenReturn(emrClusterCache);
}
when(emrOperations.describeClusterRequest(eq(amazonElasticMapReduceClient), any(DescribeClusterRequest.class))).thenReturn(describeClusterResult);
when(configurationHelper.getProperty(ConfigurationValue.EMR_VALID_STATES)).thenReturn(ConfigurationValue.EMR_VALID_STATES.getDefaultValue().toString());
when(configurationHelper.getProperty(ConfigurationValue.FIELD_DATA_DELIMITER))
.thenReturn((String) ConfigurationValue.FIELD_DATA_DELIMITER.getDefaultValue());
when(awsClientFactory.getEmrClient(awsParamsDto)).thenReturn(amazonElasticMapReduceClient);
when(emrOperations.listEmrClusters(any(AmazonElasticMapReduceClient.class), any(ListClustersRequest.class))).thenReturn(listClusterResult);
// Call the method under test.
ClusterSummary result = emrDaoImpl.getActiveEmrClusterByNameAndAccountId(EMR_CLUSTER_NAME, accountId, awsParamsDto);
// Verify the external calls.
verify(emrOperations).describeClusterRequest(eq(amazonElasticMapReduceClient), eq(describeClusterRequest));
if (cluster == null)
{
verify(configurationHelper).getProperty(ConfigurationValue.FIELD_DATA_DELIMITER);
verify(configurationHelper).getProperty(ConfigurationValue.EMR_VALID_STATES);
verify(awsClientFactory, times(2)).getEmrClient(awsParamsDto);
verify(emrOperations).listEmrClusters(eq(amazonElasticMapReduceClient), any(ListClustersRequest.class));
}
else if (cluster.getStatus().getState().equals(EMR_INVALID_STATE))
{
verify(configurationHelper, times(2)).getProperty(ConfigurationValue.FIELD_DATA_DELIMITER);
verify(configurationHelper, times(2)).getProperty(ConfigurationValue.EMR_VALID_STATES);
verify(awsClientFactory, times(2)).getEmrClient(awsParamsDto);
verify(emrOperations).listEmrClusters(eq(amazonElasticMapReduceClient), any(ListClustersRequest.class));
}
else
{
verify(configurationHelper).getProperty(ConfigurationValue.FIELD_DATA_DELIMITER);
verify(configurationHelper).getProperty(ConfigurationValue.EMR_VALID_STATES);
verify(awsClientFactory).getEmrClient(awsParamsDto);
}
verifyNoMoreInteractionsHelper();
// Validate the results.
assertEquals(clusterSummary, result);
}
@Override
public AmazonElasticMapReduceClient getEmrClient(AwsParamsDto awsParamsDto)
{
return super.getEmrClient(awsParamsDto);
}
@Before
public void setUp()
throws Exception
{
assumeThat(S3_TEMP_BUCKET, not(isEmptyOrNullString()));
assumeThat(AWS_ACCESS_KEY_ID, not(isEmptyOrNullString()));
assumeThat(AWS_SECRET_ACCESS_KEY, not(isEmptyOrNullString()));
assumeThat(AWS_ROLE, not(isEmptyOrNullString()));
assumeThat(TD_API_KEY, not(isEmptyOrNullString()));
assumeThat(AWS_KMS_KEY_ID, not(isEmptyOrNullString()));
AWSCredentials credentials = new BasicAWSCredentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY);
AtomicInteger s3Requests = new AtomicInteger();
AtomicInteger kmsRequests = new AtomicInteger();
AtomicInteger emrRequests = new AtomicInteger();
proxyServer = TestUtils.startRequestFailingProxy(request -> {
String uri = request.getUri();
if (uri.contains("s3")) {
s3Requests.incrementAndGet();
if (s3Requests.get() < 2) {
return Optional.of(INTERNAL_SERVER_ERROR);
}
}
else if (uri.contains("kms")) {
kmsRequests.incrementAndGet();
if (kmsRequests.get() < 2) {
return Optional.of(INTERNAL_SERVER_ERROR);
}
}
else if (uri.contains("elasticmapreduce")) {
emrRequests.incrementAndGet();
if (emrRequests.get() % 2 == 0) {
return Optional.absent();
}
else {
return Optional.of(INTERNAL_SERVER_ERROR);
}
}
return Optional.absent();
});
// TODO: assume the supplied role?
emr = new AmazonElasticMapReduceClient(credentials);
s3 = new AmazonS3Client(credentials);
server = TemporaryDigdagServer.builder()
.environment(ImmutableMap.of(
"https_proxy", "http://" + proxyServer.getListenAddress().getHostString() + ":" + proxyServer.getListenAddress().getPort())
)
.withRandomSecretEncryptionKey()
.build();
server.start();
projectDir = folder.getRoot().toPath();
createProject(projectDir);
projectName = projectDir.getFileName().toString();
projectId = pushProject(server.endpoint(), projectDir, projectName);
outfile = folder.newFolder().toPath().resolve("outfile");
digdagClient = DigdagClient.builder()
.host(server.host())
.port(server.port())
.build();
digdagClient.setProjectSecret(projectId, "aws.emr.access_key_id", AWS_ACCESS_KEY_ID);
digdagClient.setProjectSecret(projectId, "aws.emr.secret_access_key", AWS_SECRET_ACCESS_KEY);
digdagClient.setProjectSecret(projectId, "aws.emr.role_arn", AWS_ROLE);
digdagClient.setProjectSecret(projectId, "aws.emr.kms_key_id", AWS_KMS_KEY_ID);
digdagClient.setProjectSecret(projectId, "td.apikey", TD_API_KEY);
digdagClient.setProjectSecret(projectId, "foo.bar", "foobar");
addResource(projectDir, "acceptance/emr/bootstrap_foo.sh");
addResource(projectDir, "acceptance/emr/bootstrap_hello.sh");
addResource(projectDir, "acceptance/emr/WordCount.jar");
addResource(projectDir, "acceptance/emr/libhello.jar");
addResource(projectDir, "acceptance/emr/simple.jar");
addResource(projectDir, "acceptance/emr/hello.py");
addResource(projectDir, "acceptance/emr/hello.sh");
addResource(projectDir, "acceptance/emr/query.sql");
addResource(projectDir, "acceptance/emr/pi.scala");
addResource(projectDir, "acceptance/emr/td-www_access.scala");
addResource(projectDir, "acceptance/emr/data.csv");
addResource(projectDir, "acceptance/emr/emr_configuration.json");
addWorkflow(projectDir, "acceptance/emr/emr.dig");
DateTimeFormatter f = DateTimeFormatter.ofPattern("YYYYMMdd_HHmmssSSS", Locale.ROOT).withZone(UTC);
String now = f.format(Instant.now());
tmpS3FolderKey = "tmp/" + now + "-" + UUID.randomUUID();
tmpS3FolderUri = new AmazonS3URI("s3://" + S3_TEMP_BUCKET + "/" + tmpS3FolderKey);
putS3(S3_TEMP_BUCKET, tmpS3FolderKey + "/applications/pi.py", "acceptance/emr/pi.py");
putS3(S3_TEMP_BUCKET, tmpS3FolderKey + "/scripts/hello.sh", "acceptance/emr/hello.sh");
}
/**
* Create the EMR client with the given proxy and access key details.
*
* @param awsParamsDto AWS related parameters for access/secret keys and proxy details.
*
* @return the AmazonElasticMapReduceClient object.
*/
public AmazonElasticMapReduceClient getEmrClient(AwsParamsDto awsParamsDto);