下面列出了org.apache.hadoop.io.DataOutputBuffer#writeInt ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public byte[] serialize(TMessageSet payload) {
DataOutputBuffer outBuffer = new DataOutputBuffer();
try {
outBuffer.reset();
outBuffer.writeUTF(payload.getApp());
outBuffer.writeInt(payload.getNumMessages());
outBuffer.writeByte(payload.getCompression());
outBuffer.writeLong(payload.getCrc());
outBuffer.writeInt(payload.getMessages().length);
outBuffer.write(payload.getMessages());
return ByteBuffer.wrap(outBuffer.getData(), 0, outBuffer.getLength()).array();
} catch (Exception e) {
throw new RuntimeException("Failed to serialize TMessageSet: "+e.getMessage(), e);
} finally {
Closeables.closeQuietly(outBuffer);
}
}
private OutputContext createTezOutputContext() throws IOException {
ApplicationId applicationId = ApplicationId.newInstance(1, 1);
OutputContext outputContext = mock(OutputContext.class);
ExecutionContextImpl executionContext = mock(ExecutionContextImpl.class);
doReturn("localhost").when(executionContext).getHostName();
doReturn(executionContext).when(outputContext).getExecutionContext();
DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
serviceProviderMetaData.writeInt(80);
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
.getServiceProviderMetaData
(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doReturn(1).when(outputContext).getTaskVertexIndex();
doReturn(1).when(outputContext).getOutputIndex();
doReturn(0).when(outputContext).getDAGAttemptNumber();
doReturn("destVertex").when(outputContext).getDestinationVertexName();
when(outputContext.getCounters()).thenReturn(new TezCounters());
return outputContext;
}
/**
* Serialize the shuffle port into a ByteBuffer for use later on.
* @param port the port to be sent to the ApplciationMaster
* @return the serialized form of the port.
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(port);
return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
}
/**
* Serialize the shuffle port into a ByteBuffer for use later on.
* @param port the port to be sent to the ApplciationMaster
* @return the serialized form of the port.
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(port);
return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
}
@Test(expected = IOException.class)
public void testFailedEvaluateResponse() throws IOException {
//prep mockin the SaslClient
SimpleSaslClientAuthenticationProvider mockProvider =
Mockito.mock(SimpleSaslClientAuthenticationProvider.class);
SaslClient mockClient = Mockito.mock(SaslClient.class);
Assert.assertNotNull(mockProvider);
Assert.assertNotNull(mockClient);
Mockito.when(mockProvider.createClient(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean(), Mockito.any())).thenReturn(mockClient);
HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(HBaseConfiguration.create(),
mockProvider, createTokenMock(),
Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), false);
//simulate getting an error from a failed saslServer.evaluateResponse
DataOutputBuffer errorBuffer = new DataOutputBuffer();
errorBuffer.writeInt(SaslStatus.ERROR.state);
WritableUtils.writeString(errorBuffer, IOException.class.getName());
WritableUtils.writeString(errorBuffer, "Invalid Token");
DataInputBuffer in = new DataInputBuffer();
in.reset(errorBuffer.getData(), 0, errorBuffer.getLength());
DataOutputBuffer out = new DataOutputBuffer();
//simulate that authentication exchange has completed quickly after sending the token
Mockito.when(mockClient.isComplete()).thenReturn(true);
rpcClient.saslConnect(in, out);
}
/**
* Serialize the shuffle port into a ByteBuffer for use later on.
* @param port the port to be sent to the ApplciationMaster
* @return the serialized form of the port.
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(port);
return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
}
/**
* Serialize the shuffle port into a ByteBuffer for use later on.
* @param port the port to be sent to the ApplciationMaster
* @return the serialized form of the port.
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(port);
return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
}
/**
* Create an empty edits log
*/
static void createEditsFile(String editDir) throws IOException {
File editfile = new File(editDir + EDITSFILE);
FileOutputStream fp = new FileOutputStream(editfile);
DataOutputBuffer buf = new DataOutputBuffer(1024);
buf.writeInt(FSConstants.LAYOUT_VERSION);
buf.writeTo(fp);
buf.close();
fp.close();
}
private InputContext createInputContext() throws IOException {
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(PORT);
final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
port_dob.close();
ExecutionContext executionContext = mock(ExecutionContext.class);
doReturn(HOST).when(executionContext).getHostName();
InputContext inputContext = mock(InputContext.class);
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
doReturn(shuffleMetaData).when(inputContext)
.getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doReturn(executionContext).when(inputContext).getExecutionContext();
when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
new Answer<ExecutorService>() {
@Override
public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
return sharedExecutor.createExecutorService(
invocation.getArgumentAt(0, Integer.class),
invocation.getArgumentAt(1, String.class));
}
});
return inputContext;
}
private InputContext createInputContext() throws IOException {
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(PORT);
final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0,
port_dob.getLength());
port_dob.close();
ExecutionContext executionContext = mock(ExecutionContext.class);
doReturn(FETCHER_HOST).when(executionContext).getHostName();
InputContext inputContext = mock(InputContext.class);
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
doReturn(shuffleMetaData).when(inputContext)
.getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doReturn(executionContext).when(inputContext).getExecutionContext();
when(inputContext.createTezFrameworkExecutorService(anyInt(), anyString())).thenAnswer(
new Answer<ExecutorService>() {
@Override
public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
return sharedExecutor.createExecutorService(
invocation.getArgumentAt(0, Integer.class),
invocation.getArgumentAt(1, String.class));
}
});
return inputContext;
}
private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
String uniqueId, String auxiliaryService) throws IOException {
OutputContext outputContext = mock(OutputContext.class);
ExecutionContext execContext = new ExecutionContextImpl("localhost");
DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
serviceProviderMetaData.writeInt(80);
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext)
.getServiceProviderMetaData(auxiliaryService);
doReturn(execContext).when(outputContext).getExecutionContext();
doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter();
doReturn(counters).when(outputContext).getCounters();
doReturn(appId).when(outputContext).getApplicationId();
doReturn(1).when(outputContext).getDAGAttemptNumber();
doReturn("dagName").when(outputContext).getDAGName();
doReturn("destinationVertexName").when(outputContext).getDestinationVertexName();
doReturn(1).when(outputContext).getOutputIndex();
doReturn(1).when(outputContext).getTaskAttemptNumber();
doReturn(1).when(outputContext).getTaskIndex();
doReturn(1).when(outputContext).getTaskVertexIndex();
doReturn("vertexName").when(outputContext).getTaskVertexName();
doReturn(uniqueId).when(outputContext).getUniqueIdentifier();
Path outDirBase = new Path(workDir, "outDir_" + uniqueId);
String[] outDirs = new String[] { outDirBase.toString() };
doReturn(outDirs).when(outputContext).getWorkDirs();
return outputContext;
}
private OutputContext createTezOutputContext() throws IOException {
String[] workingDirs = { workingDir.toString() };
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
serviceProviderMetaData.writeInt(PORT);
TezCounters counters = new TezCounters();
OutputContext context = mock(OutputContext.class);
ExecutionContext execContext = new ExecutionContextImpl("localhost");
doReturn(mock(OutputStatisticsReporter.class)).when(context).getStatisticsReporter();
doReturn(execContext).when(context).getExecutionContext();
doReturn(counters).when(context).getCounters();
doReturn(workingDirs).when(context).getWorkDirs();
doReturn(payLoad).when(context).getUserPayload();
doReturn(5 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
doReturn(UniqueID).when(context).getUniqueIdentifier();
doReturn("v1").when(context).getDestinationVertexName();
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
.getServiceProviderMetaData
(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
long requestedSize = (Long) invocation.getArguments()[0];
MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation
.getArguments()[1];
callback.memoryAssigned(requestedSize);
return null;
}
}).when(context).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
return context;
}
/**
* Serialize the shuffle port into a ByteBuffer for use later on.
* @param port the port to be sent to the ApplciationMaster
* @return the serialized form of the port.
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
DataOutputBuffer port_dob = new DataOutputBuffer();
port_dob.writeInt(port);
return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
}