下面列出了怎么用com.google.protobuf.GeneratedMessage的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Adds all the available reports to heartbeat.
*
* @param requestBuilder builder to which the report has to be added.
*/
private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
for (GeneratedMessage report :
context.getAllAvailableReports(rpcEndpoint.getAddress())) {
String reportName = report.getDescriptorForType().getFullName();
for (Descriptors.FieldDescriptor descriptor :
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
String heartbeatFieldName = descriptor.getMessageType().getFullName();
if (heartbeatFieldName.equals(reportName)) {
if (descriptor.isRepeated()) {
requestBuilder.addRepeatedField(descriptor, report);
} else {
requestBuilder.setField(descriptor, report);
}
}
}
}
}
/**
* Returns the ReportPublisher for the corresponding report.
*
* @param report report
*
* @return report publisher
*/
public ReportPublisher getPublisherFor(
Class<? extends GeneratedMessage> report) {
Class<? extends ReportPublisher> publisherClass =
report2publisher.get(report);
if (publisherClass == null) {
throw new RuntimeException("No publisher found for report " + report);
}
try {
ReportPublisher reportPublisher = publisherClass.newInstance();
reportPublisher.setConf(conf);
return reportPublisher;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void sendCall(GeneratedMessage call, Protocol protocol, String streamId, String url) throws IOException {
log.debug("[call] " + call);
Restty restty = Restty.create(url)
.addAccept(protocol.mediaType())
.addMediaType(protocol.mediaType())
.requestBody(protocol.getSendBytes(call));
if (StringUtils.isNotBlank(streamId)) {
restty.addHeader("Mesos-Stream-Id", streamId);
}
try {
restty.postNoResponse();
} catch (IOException e) {
log.warn("send call to mesos master failed, due to : " + e);
throw e;
}
}
public Boolean createFromMessage(XMessage message) {
GeneratedMessage msg = (GeneratedMessage) message.getMessage();
Class<? extends GeneratedMessage> msgClass = msg.getClass();
if (this.resultType == null) {
if (ColumnMetaData.class.equals(msgClass)) {
this.resultType = ResultType.DATA;
} else if (!Error.class.equals(msgClass)) {
this.resultType = ResultType.UPDATE;
}
}
if (this.resultType == ResultType.DATA) {
// delegate to the result creation
return this.resultListener.createFromMessage(message);
}
// done
return this.okListener.createFromMessage(message);
}
@Override
public GeneratedMessage getDto(Settings settings) {
ClientMessageDtos.WriteEvents.Builder web = ClientMessageDtos.WriteEvents.newBuilder();
web.setEventStreamId(streamId);
web.setExpectedVersion(expectedVersion);
web.setRequireMaster(settings.isRequireMaster());
List<ClientMessageDtos.NewEvent> newEvents = new ArrayList<>();
for (Event e : events) {
newEvents.add(e.getMessageEvent());
}
web.addAllEvents(newEvents);
return web.build();
}
@Override
public XMessage readMessage(Optional<XMessage> reuse, int expectedType) throws IOException {
try {
Class<? extends GeneratedMessage> messageClass = MessageConstants.getMessageClassForType(readHeader().getMessageType());
Class<? extends GeneratedMessage> expectedClass = MessageConstants.getMessageClassForType(expectedType);
// ensure that parsed message class matches incoming tag
if (expectedClass != messageClass) {
throw new WrongArgumentException("Unexpected message class. Expected '" + expectedClass.getSimpleName() + "' but actually received '"
+ messageClass.getSimpleName() + "'");
}
return new XMessage(readAndParse(messageClass));
} catch (IOException e) {
throw new XProtocolError(e.getMessage(), e);
}
}
private TreeSet<PacketPosition> getResetPacketsBeforeTick(int wantedTick) throws IOException {
int backup = source.getPosition();
PacketPosition wanted = PacketPosition.createPacketPosition(wantedTick, ResetRelevantKind.FULL_PACKET, 0);
if (resetRelevantPackets.tailSet(wanted, true).size() == 0) {
PacketPosition basePos = resetRelevantPackets.floor(wanted);
source.setPosition(basePos.getOffset());
try {
while (true) {
int at = source.getPosition();
PacketInstance<GeneratedMessage> pi = engineType.getNextPacketInstance(source);
PacketPosition pp = newResetRelevantPacketPosition(pi.getTick(), pi.getResetRelevantKind(), at);
if (pp != null) {
addResetRelevant(pp);
}
if (pi.getTick() >= wantedTick) {
break;
}
pi.skip();
}
} catch (EOFException e) {
}
}
source.setPosition(backup);
return new TreeSet<>(resetRelevantPackets.headSet(wanted, true));
}
/**
* @see com.baidu.beidou.navi.pbrpc.client.PbrpcClient#asyncTransport(java.lang.Class,
* com.baidu.beidou.navi.pbrpc.transport.PbrpcMsg)
*/
@Override
public <T extends GeneratedMessage> CallFuture<T> asyncTransport(Class<T> responseClazz,
PbrpcMsg pbrpcMsg) {
PbrpcClientChannel channel = channelPool.getResource();
try {
CallFuture<T> res = channel.asyncTransport(responseClazz, pbrpcMsg, this.readTimeout);
return res;
} catch (Exception e) {
LOG.error("asyncTransport failed, " + e.getMessage(), e);
channelPool.returnBrokenResource(channel);
throw new PbrpcException("Pbrpc invocation failed on " + getInfo() + ", "
+ e.getMessage(), e);
} finally {
channelPool.returnResource(channel);
}
}
/**
* @see com.baidu.beidou.navi.pbrpc.client.PbrpcClient#asyncTransport(java.lang.Class,
* com.baidu.beidou.navi.pbrpc.transport.PbrpcMsg)
*/
public <T extends GeneratedMessage> CallFuture<T> asyncTransport(Class<T> responseClazz,
PbrpcMsg pbrpcMsg) {
try {
if (isShortAliveConn) {
ChannelFuture channelFuture = connect().sync();
Channel ch = channelFuture.channel();
return doAsyncTransport(ch, responseClazz, pbrpcMsg);
} else {
return doAsyncTransport(this.channel, responseClazz, pbrpcMsg);
}
} catch (Exception e) {
LOG.error("Failed to transport to " + getInfo() + " due to " + e.getMessage(), e);
throw new PbrpcException(e);
}
}
/**
* 使用channel进行数据发送
*
* @param ch
* @param responseClazz
* @param pbrpcMsg
* @return
*/
protected <T extends GeneratedMessage> CallFuture<T> doAsyncTransport(Channel ch,
Class<T> responseClazz, PbrpcMsg pbrpcMsg) {
if (ch != null) {
int uuid = IdGenerator.genUUID();
pbrpcMsg.setLogId(uuid);
CallFuture<T> future = CallFuture.newInstance();
CallbackPool.put(uuid, this.readTimeout, this.isShortAliveConn, ch, responseClazz,
future);
ch.writeAndFlush(pbrpcMsg);
LOG.debug("Send message " + pbrpcMsg + " done");
return future;
} else {
LOG.error("Socket channel is not well established, so failed to transport on "
+ getInfo());
throw new PbrpcConnectionException(
"Socket channel is not well established,so failed to transport on " + getInfo());
}
}
@Override
public byte[] encode(final GeneratedMessage msg) {
final REEFProtocol.REEFMessage.Builder message = REEFProtocol.REEFMessage.newBuilder();
if (msg instanceof ClientRuntimeProtocol.JobControlProto) {
message.setJobControl((ClientRuntimeProtocol.JobControlProto) msg);
} else if (msg instanceof ReefServiceProtos.RuntimeErrorProto) {
message.setRuntimeError((ReefServiceProtos.RuntimeErrorProto) msg);
} else if (msg instanceof ReefServiceProtos.JobStatusProto) {
message.setJobStatus((ReefServiceProtos.JobStatusProto) msg);
} else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorControlProto) {
message.setEvaluatorControl((EvaluatorRuntimeProtocol.EvaluatorControlProto) msg);
} else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) {
message.setEvaluatorHeartBeat((EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) msg);
} else if (msg instanceof EvaluatorShimProtocol.EvaluatorShimControlProto) {
message.setEvaluatorShimCommand((EvaluatorShimProtocol.EvaluatorShimControlProto) msg);
} else if (msg instanceof EvaluatorShimProtocol.EvaluatorShimStatusProto) {
message.setEvaluatorShimStatus((EvaluatorShimProtocol.EvaluatorShimStatusProto) msg);
} else {
throw new RuntimeException("Unable to serialize: " + msg);
}
return message.build().toByteArray();
}
/**
* 异步调用
*
* @param responseClazz
* @param pbrpcMsg
* @param readTimeout
* 客户端调用超时时间
* @return
* @throws Exception
*/
public <T extends GeneratedMessage> CallFuture<T> asyncTransport(Class<T> responseClazz,
PbrpcMsg pbrpcMsg, int readTimeout) throws Exception {
if (channelFuture != null) {
try {
int uuid = IdGenerator.genUUID();
pbrpcMsg.setLogId(uuid);
CallFuture<T> future = CallFuture.newInstance();
CallbackPool.put(uuid, readTimeout, false, null, responseClazz, future);
// long start = System.currentTimeMillis();
channelFuture.channel().writeAndFlush(pbrpcMsg);
// LOG.info("Send message " + pbrpcMsg + " done using " + (System.currentTimeMillis() - start) + "ms");
return future;
} catch (Exception e) {
LOG.error(
"Failed to transport to " + channelFuture.channel() + " due to "
+ e.getMessage(), e);
throw new PbrpcException(e);
}
} else {
LOG.error("Socket channel is not well established, so failed to transport");
throw new PbrpcException(
"ChannelFuture is null! Socket channel is not well established, so failed to transport");
}
}
/**
* @see com.baidu.beidou.navi.pbrpc.client.PbrpcClient#syncTransport(java.lang.Class,
* com.baidu.beidou.navi.pbrpc.transport.PbrpcMsg)
*/
@Override
public <T extends GeneratedMessage> T syncTransport(Class<T> responseClazz, PbrpcMsg pbrpcMsg) {
BlockingIOPbrpcClient client = socketPool.getResource();
try {
T res = client.syncTransport(responseClazz, pbrpcMsg);
return res;
} catch (Exception e) {
LOG.error("asyncTransport failed, " + e.getMessage(), e);
socketPool.returnBrokenResource(client);
throw new PbrpcException("Pbrpc invocation failed on " + getInfo() + ", "
+ e.getMessage(), e);
} finally {
socketPool.returnResource(client);
}
}
/** Register a new {@link StorageFormat}.
* @param protobufExtension the extension field that keys use of this format
* @param sqlIdentifier the <code>STORAGE_FORMAT</code> identifier that keys use of this format or <code>null</code>
* @param descriptionClass that specific class used to hold this format
* @param storageFormat the mapping handler
*/
public <T extends StorageDescription> void registerStorageFormat(GeneratedMessage.GeneratedExtension<Storage,?> protobufExtension, String sqlIdentifier, Class<T> descriptionClass, StorageFormat<T> storageFormat) {
int fieldNumber = protobufExtension.getDescriptor().getNumber();
if (formatsByField.containsKey(fieldNumber))
throw new IllegalArgumentException("there is already a StorageFormat registered for field " + fieldNumber);
if ((sqlIdentifier != null) &&
formatsByIdentifier.containsKey(sqlIdentifier))
throw new IllegalArgumentException("there is already a StorageFormat registered for STORAGE_FORMAT " + sqlIdentifier);
if (!isDescriptionClassAllowed(descriptionClass)) {
throw new IllegalArgumentException("description " + descriptionClass + " not allowed for " + getClass().getSimpleName());
}
extensionRegistry.add(protobufExtension);
Format<T> format = new Format<T>(protobufExtension, sqlIdentifier, descriptionClass, storageFormat);
formatsInOrder.add(format);
formatsByField.put(fieldNumber, format);
if (sqlIdentifier != null) {
formatsByIdentifier.put(sqlIdentifier, format);
}
}
@Override
protected void setUp() throws Exception {
super.setUp();
gson =
new GsonBuilder()
.registerTypeHierarchyAdapter(GeneratedMessage.class,
ProtoTypeAdapter.newBuilder()
.setEnumSerialization(EnumSerialization.NUMBER)
.build())
.create();
upperCamelGson =
new GsonBuilder()
.registerTypeHierarchyAdapter(
GeneratedMessage.class, ProtoTypeAdapter.newBuilder()
.setFieldNameSerializationFormat(
CaseFormat.LOWER_UNDERSCORE, CaseFormat.UPPER_CAMEL)
.build())
.create();
}
/**
* Returns available reports from the report queue with a max limit on
* list size, or empty list if the queue is empty.
*
* @return List of reports
*/
public List<GeneratedMessage> getReports(InetSocketAddress endpoint,
int maxLimit) {
List<GeneratedMessage> reportsToReturn = new LinkedList<>();
synchronized (reports) {
List<GeneratedMessage> reportsForEndpoint = reports.get(endpoint);
if (reportsForEndpoint != null) {
List<GeneratedMessage> tempList = reportsForEndpoint.subList(
0, min(reportsForEndpoint.size(), maxLimit));
reportsToReturn.addAll(tempList);
tempList.clear();
}
}
return reportsToReturn;
}
@Test
public void testReportAPIs() {
OzoneConfiguration conf = new OzoneConfiguration();
DatanodeStateMachine datanodeStateMachineMock =
mock(DatanodeStateMachine.class);
StateContext stateContext = new StateContext(conf,
DatanodeStates.getInitState(), datanodeStateMachineMock);
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
// Try to add report with endpoint. Should not be stored.
stateContext.addReport(mock(GeneratedMessage.class));
assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
// Add 2 scm endpoints.
stateContext.addEndpoint(scm1);
stateContext.addEndpoint(scm2);
// Add report. Should be added to all endpoints.
stateContext.addReport(mock(GeneratedMessage.class));
List<GeneratedMessage> allAvailableReports =
stateContext.getAllAvailableReports(scm1);
assertEquals(1, allAvailableReports.size());
assertEquals(1, stateContext.getAllAvailableReports(scm2).size());
// Assert the reports are no longer available.
assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
// Put back reports.
stateContext.putBackReports(allAvailableReports, scm1);
assertFalse(stateContext.getAllAvailableReports(scm1).isEmpty());
}
private ProtobufDatumFactory(String protobufClass) {
try {
String messageClassName = protobufClass;
this.messageClass = (Class<? extends GeneratedMessage>) Class.forName(messageClassName);
this.getDefaultInstance = messageClass.getMethod("getDefaultInstance");
defaultInstance = (Message) getDefaultInstance.invoke(null);
} catch (Throwable t) {
t.printStackTrace();
throw new RuntimeException(t);
}
}
public List<String> getGeneratedProtoClasses(String serviceName) {
FastClasspathScanner cpScanner = new FastClasspathScanner();
ScanResult scanResult = cpScanner.scan();
List<String> oldProtobuf = scanResult.getNamesOfSubclassesOf(GeneratedMessage.class);
List<String> newProtobuf = scanResult.getNamesOfSubclassesOf(GeneratedMessageV3.class);
List<String> retval = Stream.concat(oldProtobuf.stream(),
newProtobuf.stream()).collect(Collectors.toList());
String[] packageTokens = serviceName.split("\\.");
return retval.stream().filter(s -> protoFilePackageMatches(s, packageTokens)).collect(Collectors.toList());
}
public static Class<? extends GeneratedMessage> getMessageClassForType(int type) {
Class<? extends GeneratedMessage> messageClass = MessageConstants.MESSAGE_TYPE_TO_CLASS.get(type);
if (messageClass == null) {
// check if there's a mapping that we don't explicitly handle
ServerMessages.Type serverMessageMapping = ServerMessages.Type.valueOf(type);
throw AssertionFailedException.shouldNotHappen("Unknown message type: " + type + " (server messages mapping: " + serverMessageMapping + ")");
}
return messageClass;
}
/**
* Dispatch a message to a listener or "peek-er" once it has been read and parsed.
*
* @param messageClass
* class extending {@link GeneratedMessage}
* @param message
* {@link GeneratedMessage}
*/
private void dispatchMessage(XMessageHeader hdr, GeneratedMessage message) {
if (message.getClass() == Frame.class && ((Frame) message).getScope() == Frame.Scope.GLOBAL) {
// we don't yet have any global notifications defined.
throw new RuntimeException("TODO: implement me");
}
// if there's no message listener waiting, expose the message class as pending for the next read
if (getMessageListener(false) == null) {
synchronized (this.pendingMsgMonitor) {
this.pendingMsgHeader = CompletableFuture.completedFuture(hdr);
this.pendingMsgMonitor.notify();
}
}
getMessageListener(true);
// we must ensure that the message has been delivered and the pending message is cleared atomically under the pending message lock. otherwise the
// pending message may still be seen after the message has been delivered but before the pending message is cleared
//
// t1-nio-thread | t2-user-thread
// ------------------------------------------------------+------------------------------------------------------
// pendingMsgClass exposed - no current listener |
// | listener added
// getMessageListener(true) returns |
// dispatchMessage(), in currentMessageListener.apply() |
// | getNextMessageClass(), pendingMsgClass != null
// | pendingMsgClass returned, but already being delivered
// | in other thread
// pendingMsgClass = null |
//
synchronized (this.pendingMsgMonitor) {
// we repeatedly deliver messages to the current listener until he yields control and we move on to the next
boolean currentListenerDone = this.currentMessageListener.createFromMessage(new XMessage(message));
if (currentListenerDone) {
this.currentMessageListener = null;
}
// clear this after the message is delivered
this.pendingMsgHeader = null;
}
}
@SuppressWarnings("unchecked")
private <T extends GeneratedMessage> T parseNotice(ByteString payload, Class<T> noticeClass) {
try {
Parser<T> parser = (Parser<T>) MessageConstants.MESSAGE_CLASS_TO_PARSER.get(noticeClass);
return parser.parseFrom(payload);
} catch (InvalidProtocolBufferException ex) {
throw new CJCommunicationsException(ex);
}
}
@OnMessage(GeneratedMessage.class)
public void onMessage(Context ctx, GeneratedMessage message) {
if (message instanceof S1NetMessages.CSVCMsg_VoiceData || message instanceof S2NetMessages.CSVCMsg_VoiceData) {
return;
}
log.info("{}: {}", ctx.getTick(), message.getClass().getSimpleName());
}
@Nullable
public static <
MessageType extends GeneratedMessage.ExtendableMessage<MessageType>,
Type extends GeneratedMessage>
Type getExtensionObject(
GeneratedMessage.ExtendableMessageOrBuilder<MessageType> mob,
ExtensionLite<MessageType, Type> extension) {
if (mob.hasExtension(extension)) {
return mob.getExtension(extension);
}
return null;
}
@Nullable
public static <
MessageType extends GeneratedMessage.ExtendableMessage<MessageType>,
Type extends GeneratedMessage>
List<Type> getRepeatedExtensionObjects(
GeneratedMessage.ExtendableMessageOrBuilder<MessageType> mob,
ExtensionLite<MessageType, List<Type>> extension) {
ImmutableList.Builder extensionList = ImmutableList.builder();
int extensionCount = mob.getExtensionCount(extension);
for (int extensionIndex = 0; extensionIndex < extensionCount; ++extensionIndex) {
extensionList.add(mob.getExtension(extension, extensionIndex));
}
return extensionList.build();
}
@Initializer(OnMessageContainer.class)
public void initOnMessageContainerListener(final EventListener<OnMessageContainer> listener) {
listener.setInvocationPredicate(args -> {
Class<? extends GeneratedMessage> clazz = (Class<? extends GeneratedMessage>) args[0];
return listener.getAnnotation().value().isAssignableFrom(clazz);
});
}
/**
* Given a GeneratedMessage object, get a Map of the state. Why? More Clojure-friendly
*
* @param status the GeneratedMessage
* @return a key/value set of the fields
*/
public static Map<String, Object> statusToMap(GeneratedMessage status) {
return status.getAllFields().entrySet().
stream().
collect(Collectors.toMap(e -> e.getKey().getName(),
Map.Entry::getValue));
}
public static byte[] serializeProtobuf(final GeneratedMessage msg) {
final byte[] byteArray = new byte[msg.getSerializedSize()];
final CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(byteArray);
try {
msg.writeTo(codedOutputStream);
} catch (final IOException e) {
Log.e(TAG, "failed to serialize message: " + e.toString());
e.printStackTrace();
return null;
}
return byteArray;
}
/**
* 代理运行时拦截的方法
*
* @param proxy 代理接口
* @param method 代理方法
* @param args 参数列表
*
* @return 代理实例对象
*
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Proxy bean starts to invoke pbrpc on %s %s.%s(%s)",
method.getReturnType().getSimpleName(),
method.getDeclaringClass().getSimpleName(),
method.getName(),
Arrays.toString(args)));
}
// check arguments and returnType
if (args == null || args.length != 1) {
throw new PbrpcException("Pbrpc only support one argument");
}
if (!GeneratedMessage.class.isAssignableFrom(method.getParameterTypes()[0])) {
throw new PbrpcException("Pbrpc method argument should be an instance of GeneratedMessage");
}
if (!GeneratedMessage.class.isAssignableFrom(method.getReturnType())) {
throw new PbrpcException("Pbrpc method returnType should be an instance of GeneratedMessage");
}
Class<? extends GeneratedMessage> responseClass =
(Class<? extends GeneratedMessage>) method.getReturnType();
PbrpcMsg msg = new PbrpcMsg();
msg.setServiceId(getMethodId(method));
msg.setProvider(provider);
msg.setData(((GeneratedMessage) args[0]).toByteArray());
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("provider=%s, methodId=%d", msg.getProvider(), msg.getServiceId()));
}
GeneratedMessage response = pbrpcClient.syncTransport(responseClass, msg);
return response;
}
/**
* @see com.baidu.beidou.navi.pbrpc.client.ha.LoadBalanceStrategy#doSyncTransport(java.util.List, java.lang.Class,
* com.baidu.beidou.navi.pbrpc.transport.PbrpcMsg)
*/
@SuppressWarnings("unchecked")
@Override
public <T extends GeneratedMessage> T doSyncTransport(List<PbrpcClient> clientList,
Class<T> responseClazz, PbrpcMsg pbrpcMsg) {
return (T) transport(clientList, responseClazz, pbrpcMsg, false);
}