下面列出了怎么用org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState的API类实例代码及写法,或者点击链接到github查看源代码。
public void recover(HistoryServerState state) throws IOException {
LOG.info("Recovering " + getClass().getSimpleName());
for (DelegationKey key : state.tokenMasterKeyState) {
addKey(key);
}
for (Entry<MRDelegationTokenIdentifier, Long> entry :
state.tokenState.entrySet()) {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
public void recover(HistoryServerState state) throws IOException {
LOG.info("Recovering " + getClass().getSimpleName());
for (DelegationKey key : state.tokenMasterKeyState) {
addKey(key);
}
for (Entry<MRDelegationTokenIdentifier, Long> entry :
state.tokenState.entrySet()) {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
@Test
public void testUpdatedTokenRecovery() throws IOException {
IOException intentionalErr = new IOException("intentional error");
FileSystem fs = FileSystem.getLocal(conf);
final FileSystem spyfs = spy(fs);
// make the update token process fail halfway through where we're left
// with just the temporary update file and no token file
ArgumentMatcher<Path> updateTmpMatcher = new ArgumentMatcher<Path>() {
@Override
public boolean matches(Object argument) {
if (argument instanceof Path) {
return ((Path) argument).getName().startsWith("update");
}
return false;
}
};
doThrow(intentionalErr)
.when(spyfs).rename(argThat(updateTmpMatcher), isA(Path.class));
conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI,
testDir.getAbsoluteFile().toURI().toString());
HistoryServerStateStoreService store =
new HistoryServerFileSystemStateStoreService() {
@Override
FileSystem createFileSystem() throws IOException {
return spyfs;
}
};
store.init(conf);
store.start();
final MRDelegationTokenIdentifier token1 =
new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
final Long tokenDate1 = 1L;
store.storeToken(token1, tokenDate1);
final Long newTokenDate1 = 975318642L;
try {
store.updateToken(token1, newTokenDate1);
fail("intentional error not thrown");
} catch (IOException e) {
assertEquals(intentionalErr, e);
}
store.close();
// verify the update file is seen and parsed upon recovery when
// original token file is missing
store = createAndStartStore();
HistoryServerState state = store.loadState();
assertEquals("incorrect loaded token count", 1, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", newTokenDate1,
state.tokenState.get(token1));
store.close();
}
@Test
public void testTokenStore() throws IOException {
HistoryServerStateStoreService store = createAndStartStore();
// verify initially the store is empty
HistoryServerState state = store.loadState();
assertTrue("token state not empty", state.tokenState.isEmpty());
assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
// store a key and some tokens
final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
final MRDelegationTokenIdentifier token1 =
new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
final Long tokenDate1 = 1L;
final MRDelegationTokenIdentifier token2 =
new MRDelegationTokenIdentifier(new Text("tokenOwner2"),
new Text("tokenRenewer2"), new Text("tokenUser2"));
token2.setSequenceNumber(12345678);
final Long tokenDate2 = 87654321L;
store.storeTokenMasterKey(key1);
store.storeToken(token1, tokenDate1);
store.storeToken(token2, tokenDate2);
store.close();
// verify the key and tokens can be recovered
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", tokenDate1,
state.tokenState.get(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", tokenDate2,
state.tokenState.get(token2));
assertEquals("incorrect master key count", 1,
state.tokenMasterKeyState.size());
assertTrue("missing master key 1",
state.tokenMasterKeyState.contains(key1));
// store some more keys and tokens, remove the previous key and one
// of the tokens, and renew a previous token
final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
final MRDelegationTokenIdentifier token3 =
new MRDelegationTokenIdentifier(new Text("tokenOwner3"),
new Text("tokenRenewer3"), new Text("tokenUser3"));
token3.setSequenceNumber(12345679);
final Long tokenDate3 = 87654321L;
store.removeToken(token1);
store.storeTokenMasterKey(key2);
final Long newTokenDate2 = 975318642L;
store.updateToken(token2, newTokenDate2);
store.removeTokenMasterKey(key1);
store.storeTokenMasterKey(key3);
store.storeToken(token3, tokenDate3);
store.close();
// verify the new keys and tokens are recovered, the removed key and
// token are no longer present, and the renewed token has the updated
// expiration date
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", newTokenDate2,
state.tokenState.get(token2));
assertTrue("missing token 3", state.tokenState.containsKey(token3));
assertEquals("incorrect token 3 date", tokenDate3,
state.tokenState.get(token3));
assertEquals("incorrect master key count", 2,
state.tokenMasterKeyState.size());
assertFalse("master key 1 not removed",
state.tokenMasterKeyState.contains(key1));
assertTrue("missing master key 2",
state.tokenMasterKeyState.contains(key2));
assertTrue("missing master key 3",
state.tokenMasterKeyState.contains(key3));
store.close();
}
@Test
public void testUpdatedTokenRecovery() throws IOException {
IOException intentionalErr = new IOException("intentional error");
FileSystem fs = FileSystem.getLocal(conf);
final FileSystem spyfs = spy(fs);
// make the update token process fail halfway through where we're left
// with just the temporary update file and no token file
ArgumentMatcher<Path> updateTmpMatcher = new ArgumentMatcher<Path>() {
@Override
public boolean matches(Object argument) {
if (argument instanceof Path) {
return ((Path) argument).getName().startsWith("update");
}
return false;
}
};
doThrow(intentionalErr)
.when(spyfs).rename(argThat(updateTmpMatcher), isA(Path.class));
conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI,
testDir.getAbsoluteFile().toURI().toString());
HistoryServerStateStoreService store =
new HistoryServerFileSystemStateStoreService() {
@Override
FileSystem createFileSystem() throws IOException {
return spyfs;
}
};
store.init(conf);
store.start();
final MRDelegationTokenIdentifier token1 =
new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
final Long tokenDate1 = 1L;
store.storeToken(token1, tokenDate1);
final Long newTokenDate1 = 975318642L;
try {
store.updateToken(token1, newTokenDate1);
fail("intentional error not thrown");
} catch (IOException e) {
assertEquals(intentionalErr, e);
}
store.close();
// verify the update file is seen and parsed upon recovery when
// original token file is missing
store = createAndStartStore();
HistoryServerState state = store.loadState();
assertEquals("incorrect loaded token count", 1, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", newTokenDate1,
state.tokenState.get(token1));
store.close();
}
@Test
public void testTokenStore() throws IOException {
HistoryServerStateStoreService store = createAndStartStore();
// verify initially the store is empty
HistoryServerState state = store.loadState();
assertTrue("token state not empty", state.tokenState.isEmpty());
assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
// store a key and some tokens
final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
final MRDelegationTokenIdentifier token1 =
new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
final Long tokenDate1 = 1L;
final MRDelegationTokenIdentifier token2 =
new MRDelegationTokenIdentifier(new Text("tokenOwner2"),
new Text("tokenRenewer2"), new Text("tokenUser2"));
token2.setSequenceNumber(12345678);
final Long tokenDate2 = 87654321L;
store.storeTokenMasterKey(key1);
store.storeToken(token1, tokenDate1);
store.storeToken(token2, tokenDate2);
store.close();
// verify the key and tokens can be recovered
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", tokenDate1,
state.tokenState.get(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", tokenDate2,
state.tokenState.get(token2));
assertEquals("incorrect master key count", 1,
state.tokenMasterKeyState.size());
assertTrue("missing master key 1",
state.tokenMasterKeyState.contains(key1));
// store some more keys and tokens, remove the previous key and one
// of the tokens, and renew a previous token
final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
final MRDelegationTokenIdentifier token3 =
new MRDelegationTokenIdentifier(new Text("tokenOwner3"),
new Text("tokenRenewer3"), new Text("tokenUser3"));
token3.setSequenceNumber(12345679);
final Long tokenDate3 = 87654321L;
store.removeToken(token1);
store.storeTokenMasterKey(key2);
final Long newTokenDate2 = 975318642L;
store.updateToken(token2, newTokenDate2);
store.removeTokenMasterKey(key1);
store.storeTokenMasterKey(key3);
store.storeToken(token3, tokenDate3);
store.close();
// verify the new keys and tokens are recovered, the removed key and
// token are no longer present, and the renewed token has the updated
// expiration date
store = createAndStartStore();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", newTokenDate2,
state.tokenState.get(token2));
assertTrue("missing token 3", state.tokenState.containsKey(token3));
assertEquals("incorrect token 3 date", tokenDate3,
state.tokenState.get(token3));
assertEquals("incorrect master key count", 2,
state.tokenMasterKeyState.size());
assertFalse("master key 1 not removed",
state.tokenMasterKeyState.contains(key1));
assertTrue("missing master key 2",
state.tokenMasterKeyState.contains(key2));
assertTrue("missing master key 3",
state.tokenMasterKeyState.contains(key3));
store.close();
}