下面列出了怎么用com.google.protobuf.GeneratedMessageV3的API类实例代码及写法,或者点击链接到github查看源代码。
public void handleMessage(int responseId, GeneratedMessageV3 message) {
if (!state.checkState(SocketStateCode.RUN_DUPLEX)) {
logger.warn("failed to handle message. caused:illegal State");
return;
}
if (isInfo) {
logger.info("{} handleMessage:{}", agentInfo, MessageFormatUtils.debugLog(message));
}
TBase tMessage = messageConverter.toMessage(message);
try {
byte[] serialize = SerializationUtils.serialize(tMessage, commandHeaderTBaseSerializerFactory);
ResponsePacket responsePacket = new ResponsePacket(responseId, serialize);
requestManager.messageReceived(responsePacket, agentInfo.toString());
} catch (TException e) {
setFailMessageToFuture(responseId, e.getMessage());
}
}
@Override
@Nullable
public Object decodeResult(ByteBuf data, @Nullable Class<?> targetClass) throws EncodingException {
if (data.readableBytes() > 0 && targetClass != null) {
try {
if (GeneratedMessageV3.class.isAssignableFrom(targetClass)) {
Method method = parseFromMethodStore.get(targetClass);
if (method != null) {
return method.invoke(null, data.nioBuffer());
}
} else if (ktProtoBuf && KotlinSerializerSupport.isKotlinSerializable(targetClass)) {
byte[] bytes = new byte[data.readableBytes()];
data.readBytes(bytes);
return KotlinSerializerSupport.decodeFromProtobuf(bytes, targetClass);
} else {
Schema schema = RuntimeSchema.getSchema(targetClass);
Object object = schema.newMessage();
ProtostuffIOUtil.mergeFrom(new ByteBufInputStream(data), object, schema);
return object;
}
} catch (Exception e) {
throw new EncodingException(RsocketErrorCode.message("RST-700501", "bytebuf", targetClass.getName()), e);
}
}
return null;
}
public void request() throws Exception {
MessageConverter<GeneratedMessageV3> messageConverter = new GrpcMetadataMessageConverter();
HeaderFactory headerFactory = new AgentHeaderFactory(AGENT_ID, APPLICATION_NAME, START_TIME);
DnsExecutorServiceProvider dnsExecutorServiceProvider = new DnsExecutorServiceProvider();
GrpcNameResolverProvider grpcNameResolverProvider = new GrpcNameResolverProvider(dnsExecutorServiceProvider);
NameResolverProvider nameResolverProvider = grpcNameResolverProvider.get();
ChannelFactoryBuilder channelFactoryBuilder = new DefaultChannelFactoryBuilder("TestAgentGrpcDataSender");
channelFactoryBuilder.setHeaderFactory(headerFactory);
channelFactoryBuilder.setNameResolverProvider(nameResolverProvider);
channelFactoryBuilder.setClientOption(new ClientOption.Builder().build());
ChannelFactory channelFactory = channelFactoryBuilder.build();
AgentGrpcDataSender sender = new AgentGrpcDataSender("localhost", 9997, 1, messageConverter,
reconnectExecutor, scheduledExecutorService, channelFactory, null);
AgentInfo agentInfo = newAgentInfo();
sender.request(agentInfo);
TimeUnit.SECONDS.sleep(60);
sender.stop();
}
void callListeners(UpdateSeqUpdate upd) {
Object updateRaw = null;
try {
Field f = upd.getClass().getDeclaredField("update_");
f.setAccessible(true);
updateRaw = f.get(upd);
} catch (Exception e) {
log.error("Failed to get sequence update field", e);
}
if (!(updateRaw instanceof GeneratedMessageV3)) {
return;
}
GeneratedMessageV3 up = (GeneratedMessageV3) updateRaw;
for (Map.Entry<Class, List<UpdateListener>> entry : subscribers.entrySet()) {
if (updateRaw.getClass().isAssignableFrom(entry.getKey())) {
entry.getValue().forEach(listener -> listener.onUpdate(up));
}
}
}
public GrpcDataSender(String host, int port,
int executorQueueSize,
MessageConverter<GeneratedMessageV3> messageConverter,
ChannelFactory channelFactory) {
this.channelFactory = Assert.requireNonNull(channelFactory, "channelFactory");
this.name = Assert.requireNonNull(channelFactory.getFactoryName(), "channelFactory.name");
this.host = Assert.requireNonNull(host, "host");
this.port = port;
this.messageConverter = Assert.requireNonNull(messageConverter, "messageConverter");
this.executor = newExecutorService(name + "-Executor", executorQueueSize);
this.managedChannel = channelFactory.build(host, port);
}
public StatGrpcDataSender(String host, int port,
int senderExecutorQueueSize,
MessageConverter<GeneratedMessageV3> messageConverter,
ReconnectExecutor reconnectExecutor,
ChannelFactory channelFactory) {
super(host, port, senderExecutorQueueSize, messageConverter, channelFactory);
this.statStub = StatGrpc.newStub(managedChannel);
this.reconnectExecutor = Assert.requireNonNull(reconnectExecutor, "reconnectExecutor");
{
final Runnable statStreamReconnectJob = new Runnable() {
@Override
public void run() {
statStream = newStatStream();
}
};
this.statStreamReconnector = reconnectExecutor.newReconnector(statStreamReconnectJob);
statStreamReconnectJob.run();
}
}
public DiozeroProtos.Response sendMessage(String topic, String correlationId, GeneratedMessageV3 message)
throws MqttException {
Condition condition = lock.newCondition();
conditions.put(correlationId, condition);
lock.lock();
try {
mqttClient.publish(topic, message.toByteArray(), MqttProviderConstants.DEFAULT_QOS,
MqttProviderConstants.DEFAULT_RETAINED);
Logger.info("Waiting for response...");
condition.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Logger.error(e, "Interrupted: {}", e);
} finally {
lock.unlock();
}
DiozeroProtos.Response response = responses.remove(correlationId);
if (response == null) {
throw new RuntimeIOException("Cannot find response message for " + correlationId);
}
return response;
}
@Override
public boolean request(final Object data) {
final Runnable convertAndRun = new Runnable() {
@Override
public void run() {
try {
// Convert message
final GeneratedMessageV3 message = messageConverter.toMessage(data);
if (isDebug) {
logger.debug("Request metadata={}", debugLog(message));
}
request0(message, maxAttempts);
} catch (Exception ex) {
logger.info("Failed to request metadata={}", data, ex);
}
}
};
try {
executor.execute(convertAndRun);
} catch (RejectedExecutionException reject) {
logger.info("Rejected metadata={}", data);
return false;
}
return true;
}
public SpanGrpcDataSender(String host, int port,
int executorQueueSize,
MessageConverter<GeneratedMessageV3> messageConverter,
ReconnectExecutor reconnectExecutor,
ChannelFactory channelFactory) {
super(host, port, executorQueueSize, messageConverter, channelFactory);
this.spanStub = newSpanStub();
this.reconnectExecutor = Assert.requireNonNull(reconnectExecutor, "reconnectExecutor");
{
final Runnable spanStreamReconnectJob = new Runnable() {
@Override
public void run() {
spanStream = newSpanStream();
}
};
this.spanStreamReconnector = reconnectExecutor.newReconnector(spanStreamReconnectJob);
spanStreamReconnectJob.run();
}
}
private void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCount) {
final TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.cancel()) {
return;
}
logger.info("Retry {} {}", remainingRetryCount, request);
request(request, remainingRetryCount - 1);
}
};
try {
retryTimer.newTimeout(timerTask, 1000, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.debug("retry fail {}", e.getCause(), e);
}
}
public static <T> ProtobufSchema of(SchemaDefinition<T> schemaDefinition) {
Class<T> pojo = schemaDefinition.getPojo();
if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+ " is not assignable from " + pojo.getName());
}
SchemaInfo schemaInfo = SchemaInfo.builder()
.schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8))
.type(SchemaType.PROTOBUF)
.name("")
.properties(schemaDefinition.getProperties())
.build();
try {
return new ProtobufSchema(schemaInfo,
(GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null));
}catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}
public MetadataClientMock(final String host, final int port, final boolean agentHeader) {
this.retryTimer = newTimer(this.getClass().getName());
this.channelFactory = newChannelFactory();
this.channel = channelFactory.build(host, port);
this.metadataStub = MetadataGrpc.newStub(channel);
this.retryScheduler = new RetryScheduler<GeneratedMessageV3, PResult>() {
@Override
public boolean isSuccess(PResult response) {
return response.getSuccess();
}
@Override
public void scheduleNextRetry(GeneratedMessageV3 request, int remainingRetryCount) {
MetadataClientMock.this.scheduleNextRetry(request, remainingRetryCount);
}
};
}
@Inject
public AgentGrpcDataSenderProvider(GrpcTransportConfig grpcTransportConfig,
@MetadataConverter MessageConverter<GeneratedMessageV3> messageConverter,
HeaderFactory headerFactory,
Provider<ReconnectExecutor> reconnectExecutor,
ScheduledExecutorService retransmissionExecutor,
NameResolverProvider nameResolverProvider,
ActiveTraceRepository activeTraceRepository) {
this.grpcTransportConfig = Assert.requireNonNull(grpcTransportConfig, "grpcTransportConfig");
this.messageConverter = Assert.requireNonNull(messageConverter, "messageConverter");
this.headerFactory = Assert.requireNonNull(headerFactory, "headerFactory");
this.reconnectExecutorProvider = Assert.requireNonNull(reconnectExecutor, "reconnectExecutorProvider");
this.retransmissionExecutor = Assert.requireNonNull(retransmissionExecutor, "retransmissionExecutor");
this.nameResolverProvider = Assert.requireNonNull(nameResolverProvider, "nameResolverProvider");
this.activeTraceRepository = Assert.requireNonNull(activeTraceRepository, "activeTraceRepository");
}
public ClientStreamChannel openStream(GeneratedMessageV3 message, ClientStreamChannelEventHandler streamChannelEventHandler) throws StreamException {
if (!state.checkState(SocketStateCode.RUN_DUPLEX)) {
throw new StreamException(StreamCode.STATE_NOT_CONNECTED);
}
PCmdRequest request = createRequest(message);
if (request == null) {
throw new StreamException(StreamCode.TYPE_UNSUPPORT);
}
GrpcClientStreamChannel grpcClientStreamChannel = new GrpcClientStreamChannel(remoteAddress, request.getRequestId(), streamChannelRepository, streamChannelEventHandler);
try {
grpcClientStreamChannel.init();
grpcClientStreamChannel.connect(new Runnable() {
@Override
public void run() {
requestObserver.onNext(request);
}
}, 3000);
} catch (StreamException e) {
grpcClientStreamChannel.close(e.getStreamCode());
throw e;
}
return grpcClientStreamChannel;
}
@Override
public XMessage readMessage(Optional<XMessage> reuse, int expectedType) throws IOException {
// waiting for ListenersDispatcher completion to perform sync call
synchronized (this.waitingSyncOperationMonitor) {
try {
Class<? extends GeneratedMessageV3> messageClass = MessageConstants.getMessageClassForType(readHeader().getMessageType());
Class<? extends GeneratedMessageV3> expectedClass = MessageConstants.getMessageClassForType(expectedType);
// ensure that parsed message class matches incoming tag
if (expectedClass != messageClass) {
throw new WrongArgumentException("Unexpected message class. Expected '" + expectedClass.getSimpleName() + "' but actually received '"
+ messageClass.getSimpleName() + "'");
}
return new XMessage(readMessageLocal(messageClass));
} catch (IOException e) {
throw new XProtocolError(e.getMessage(), e);
}
}
}
public Boolean createFromMessage(XMessage message) {
//GeneratedMessage msg = (GeneratedMessage) message.getMessage();
@SuppressWarnings("unchecked")
Class<? extends GeneratedMessageV3> msgClass = (Class<? extends GeneratedMessageV3>) message.getMessage().getClass();
if (Frame.class.equals(msgClass)) {
this.builder.addNotice(this.noticeFactory.createFromMessage(message));
return false; /* done reading? */
} else if (StmtExecuteOk.class.equals(msgClass)) {
this.future.complete(this.builder.build());
return true; /* done reading? */
} else if (Error.class.equals(msgClass)) {
this.future.completeExceptionally(new XProtocolError(Error.class.cast(message.getMessage())));
return true; /* done reading? */
} else if (FetchDone.class.equals(msgClass)) {
return false; /* done reading? */
}
this.future.completeExceptionally(new WrongArgumentException("Unhandled msg class (" + msgClass + ") + msg=" + message.getMessage()));
return true; /* done reading? */
}
private PCmdRequest createRequest(GeneratedMessageV3 message) {
PCmdRequest.Builder requestBuilder = PCmdRequest.newBuilder();
final int requestId = requestManager.nextRequestId();
requestBuilder.setRequestId(requestId);
if (message instanceof PCmdEcho) {
requestBuilder.setCommandEcho((PCmdEcho) message);
return requestBuilder.build();
} else if (message instanceof PCmdActiveThreadCount) {
requestBuilder.setCommandActiveThreadCount((PCmdActiveThreadCount) message);
return requestBuilder.build();
} else if (message instanceof PCmdActiveThreadDump) {
requestBuilder.setCommandActiveThreadDump((PCmdActiveThreadDump) message);
return requestBuilder.build();
} else if (message instanceof PCmdActiveThreadLightDump) {
requestBuilder.setCommandActiveThreadLightDump((PCmdActiveThreadLightDump) message);
return requestBuilder.build();
} else {
return null;
}
}
private ChannelFutureAggregator(String destination, GeneratedMessageV3 request, CanalPacket.PacketType type, int amount, long latency, boolean empty, short errorCode) {
this.result = new ClientRequestResult.Builder()
.destination(destination)
.type(type)
.request(request)
.amount(amount + HEADER_LENGTH)
.latency(latency)
.errorCode(errorCode)
.empty(empty)
.build();
}
private Object _parseMsg(int cmd, ByteBuf buf){
try {
GeneratedMessageV3 m = s_mapMsgParser.get(cmd);
if(m != null){
byte[] arrBuf = new byte[buf.readableBytes()];
buf.readBytes(arrBuf);
return m.getParserForType().parseFrom(arrBuf);
}
} catch (Throwable e) {
e.printStackTrace();
}
return null;
}
@Override
public GeneratedMessageV3 toMessage(Object message) {
if (message instanceof AgentInfo) {
final AgentInfo agentInfo = (AgentInfo) message;
return convertAgentInfo(agentInfo);
}
return null;
}
public List<String> getGeneratedProtoClasses(String serviceName) {
FastClasspathScanner cpScanner = new FastClasspathScanner();
ScanResult scanResult = cpScanner.scan();
List<String> oldProtobuf = scanResult.getNamesOfSubclassesOf(GeneratedMessage.class);
List<String> newProtobuf = scanResult.getNamesOfSubclassesOf(GeneratedMessageV3.class);
List<String> retval = Stream.concat(oldProtobuf.stream(),
newProtobuf.stream()).collect(Collectors.toList());
String[] packageTokens = serviceName.split("\\.");
return retval.stream().filter(s -> protoFilePackageMatches(s, packageTokens)).collect(Collectors.toList());
}
private void send(final Message<? extends GeneratedMessageV3> message, StreamObserver<Empty> responseObserver) {
try {
ServerRequest<? extends GeneratedMessageV3> request = serverRequestFactory.newServerRequest(message);
this.dispatchHandler.dispatchSendMessage(request);
} catch (Exception e) {
logger.warn("Failed to request. message={}", message, e);
if (e instanceof StatusException || e instanceof StatusRuntimeException) {
responseObserver.onError(e);
} else {
// Avoid detailed exception
responseObserver.onError(Status.INTERNAL.withDescription("Bad Request").asException());
}
}
}
private void scheduleNextRetry(final GeneratedMessageV3 message, final int remainingRetryCount) {
if (shutdown) {
if (isDebug) {
logger.debug("Request drop. Already shutdown request={}", MessageFormatUtils.debugLog(message));
}
return;
}
if (remainingRetryCount <= 0) {
if (isDebug) {
logger.debug("Request drop. remainingRetryCount={}, request={}", MessageFormatUtils.debugLog(message), remainingRetryCount);
}
return;
}
if (isDebug) {
logger.debug("Request retry. request={}, remainingRetryCount={}", MessageFormatUtils.debugLog(message), remainingRetryCount);
}
final TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.cancel()) {
return;
}
if (shutdown) {
return;
}
request0(message, remainingRetryCount);
}
};
try {
retryTimer.newTimeout(timerTask, retryDelayMillis, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.debug("retry fail {}", e.getCause(), e);
}
}
@Override
public boolean request(Object data, final FutureListener listener) {
final GeneratedMessageV3 message = this.messageConverter.toMessage(data);
if (!(message instanceof PAgentInfo)) {
throw new IllegalArgumentException("unsupported message " + data);
}
final PAgentInfo pAgentInfo = (PAgentInfo) message;
this.agentInfoStub.requestAgentInfo(pAgentInfo, new FutureListenerStreamObserver(listener));
return true;
}
/**
* Generic method to convert Protostuff to Protobuf. Uses LegacyProtobufSerializer because deserializing with the
* regular protobuf serializer does not handle repeated fields correctly.
* @param protobufParser Parser for protobuf object
* @param protostuff Protostuff object to convert
* @param <M> Type of Protobuf
* @param <T> Type of Protobuff
* @return Converted object as Protobuf
*/
private static <M extends GeneratedMessageV3, T extends Message<T> & Schema<T>>
M toBuf(Parser<M> protobufParser, T protostuff) {
try {
LinkedBuffer buffer = LinkedBuffer.allocate();
byte[] bytes = ProtobufIOUtil.toByteArray(protostuff, protostuff.cachedSchema(), buffer);
// LegacyProtobufSerializer is necessary as it deals with stuff/buf grouping differences
return LegacyProtobufSerializer.parseFrom(protobufParser, bytes);
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException("Cannot convert from protostuff to protobuf");
}
}
@Override
protected void sendMessage(String url, GeneratedMessageV3 message) throws IOException {
DiozeroProtos.MessageWrapper message_wrapper = DiozeroProtos.MessageWrapper.newBuilder()
.setType(message.getClass().getSimpleName()).setMessage(ByteString.copyFrom(message.toByteArray()))
.build();
session.getRemote().sendBytes(ByteBuffer.wrap(message_wrapper.toByteArray()));
}
@Override
public MessageConverter<GeneratedMessageV3> get() {
MessageConverter<GeneratedMessageV3> metadataMessageConverter = new GrpcMetadataMessageConverter();
MessageConverter<GeneratedMessageV3> agentMessageConverter = new GrpcAgentInfoMessageConverter();
@SuppressWarnings("unchecked")
final MessageConverterGroup<GeneratedMessageV3> group = MessageConverterGroup.wrap(metadataMessageConverter, agentMessageConverter);
return group;
}
private ChannelFutureAggregator(String destination, GeneratedMessageV3 request, CanalPacket.PacketType type, int amount, long latency, boolean empty, short errorCode) {
this.result = new ClientRequestResult.Builder()
.destination(destination)
.type(type)
.request(request)
.amount(amount + HEADER_LENGTH)
.latency(latency)
.errorCode(errorCode)
.empty(empty)
.build();
}
@Override
public GeneratedMessageV3 toMessage(Object message) {
if (message instanceof SpanChunk) {
final SpanChunk spanChunk = (SpanChunk) message;
final PSpanChunk pSpanChunk = buildPSpanChunk(spanChunk);
return pSpanChunk;
}
if (message instanceof Span) {
final Span span = (Span) message;
return buildPSpan(span);
}
return null;
}
public static Class<? extends GeneratedMessageV3> getMessageClassForType(int type) {
Class<? extends GeneratedMessageV3> messageClass = MessageConstants.MESSAGE_TYPE_TO_CLASS.get(type);
if (messageClass == null) {
// check if there's a mapping that we don't explicitly handle
ServerMessages.Type serverMessageMapping = ServerMessages.Type.forNumber(type);
throw AssertionFailedException.shouldNotHappen("Unknown message type: " + type + " (server messages mapping: " + serverMessageMapping + ")");
}
return messageClass;
}