下面列出了怎么用com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter的API类实例代码及写法,或者点击链接到github查看源代码。
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExitFilterInvoke(@Advice.Argument(1) Invocation invocation,
@Advice.Return Result result,
@Nullable @Advice.Local("span") Span span,
@Nullable @Advice.Thrown Throwable t,
@Nullable @Advice.Local("transaction") Transaction transaction) {
AbstractSpan<?> actualSpan = span != null ? span : transaction;
if (actualSpan == null) {
return;
}
actualSpan.captureException(t)
.captureException(result.getException())
.deactivate();
if (!(RpcContext.getContext().getFuture() instanceof FutureAdapter)) {
actualSpan.end();
}
// else: end when ResponseCallback is called (see AlibabaResponseCallbackInstrumentation)
}
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
// 从rpc上下文中取出异步执行结果的future
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
future.setCallback(new ResponseCallback() {
@Override
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
return;
}
///must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
// 触发异常回调=》
fireThrowCallback(invoker, invocation, result.getException());
} else {
// 触发返回值回调=》
fireReturnCallback(invoker, invocation, result.getValue());
}
}
@Override
public void caught(Throwable exception) {
// 异常方法回调=》
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
boolean ensureSpanFinishes(Future<Object> future, Invocation invocation, Invoker<?> invoker) {
boolean deferFinish = false;
if (future instanceof FutureAdapter) {
deferFinish = true;
ResponseFuture original = ((FutureAdapter<Object>) future).getFuture();
ResponseFuture wrapped = new AsyncResponseFutureDelegate(invocation, invoker, original);
// Ensures even if no callback added later, for example when a consumer, we finish the span
wrapped.setCallback(null);
RpcContext.getContext().setFuture(new FutureAdapter<>(wrapped));
}
return deferFinish;
}
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
if (rpcResult == null){
logger.error(new IllegalStateException("invalid result value : null, expected "+Result.class.getName()));
return;
}
///must be rpcResult
if (! (rpcResult instanceof Result)){
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected "+Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
if (rpcResult == null){
logger.error(new IllegalStateException("invalid result value : null, expected "+Result.class.getName()));
return;
}
///must be rpcResult
if (! (rpcResult instanceof Result)){
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected "+Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
if (rpcResult == null){
logger.error(new IllegalStateException("invalid result value : null, expected "+Result.class.getName()));
return;
}
///must be rpcResult
if (! (rpcResult instanceof Result)){
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected "+Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
if (rpcResult == null){
logger.error(new IllegalStateException("invalid result value : null, expected "+Result.class.getName()));
return;
}
///must be rpcResult
if (! (rpcResult instanceof Result)){
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected "+Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
if (rpcResult == null){
logger.error(new IllegalStateException("invalid result value : null, expected "+Result.class.getName()));
return;
}
///must be rpcResult
if (! (rpcResult instanceof Result)){
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected "+Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
@Advice.OnMethodEnter(suppress = Throwable.class)
private static void onEnter(@Advice.Argument(0) Future<?> future) {
if (future instanceof FutureAdapter) {
ElasticApmAgent.ensureInstrumented(((FutureAdapter<?>) future).getFuture().getClass(), RESPONSE_FUTURE_INSTRUMENTATION);
}
}
/**
* rpc client handler
* @param rpcContext
* @param invoker
* @param invocation
* @return
*/
private Result doClientFilter(RpcContext rpcContext, Invoker<?> invoker, Invocation invocation) {
// to build tracer instance
if (dubboConsumerSofaTracer == null) {
this.dubboConsumerSofaTracer = DubboConsumerSofaTracer
.getDubboConsumerSofaTracerSingleton();
}
// get methodName
String methodName = rpcContext.getMethodName();
// get service interface
String service = invoker.getInterface().getSimpleName();
// build a dubbo rpc span
SofaTracerSpan sofaTracerSpan = dubboConsumerSofaTracer.clientSend(service + "#"
+ methodName);
// set tags to span
appendRpcClientSpanTags(invoker, sofaTracerSpan);
// do serialized and then transparent transmission to the rpc server
String serializedSpanContext = sofaTracerSpan.getSofaTracerSpanContext()
.serializeSpanContext();
//put into attachments
invocation.getAttachments().put(CommonSpanTags.RPC_TRACE_NAME, serializedSpanContext);
boolean isOneWay = false, deferFinish = false;
// check invoke type
boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
// set invoke type tag
if (isAsync) {
sofaTracerSpan.setTag(CommonSpanTags.INVOKE_TYPE, "future");
} else {
isOneWay = RpcUtils.isOneway(invoker.getUrl(), invocation);
if (isOneWay) {
sofaTracerSpan.setTag(CommonSpanTags.INVOKE_TYPE, "oneway");
} else {
sofaTracerSpan.setTag(CommonSpanTags.INVOKE_TYPE, "sync");
}
}
Result result;
Throwable exception = null;
String resultCode = SofaTracerConstant.RESULT_CODE_SUCCESS;
try {
// do invoke
result = invoker.invoke(invocation);
if (result.hasException()) {
exception = result.getException();
}
// the case on async client invocation
Future<Object> future = rpcContext.getFuture();
if (future instanceof FutureAdapter) {
deferFinish = ensureSpanFinishes(future, invocation, invoker);
}
return result;
} catch (RpcException e) {
exception = e;
throw e;
} catch (Throwable t) {
exception = t;
throw new RpcException(t);
} finally {
if (exception != null) {
// finish span on exception, delay to clear tl in handleError
handleError(exception, null);
} else {
// sync invoke
if (isOneWay || !deferFinish) {
dubboConsumerSofaTracer.clientReceive(resultCode);
} else {
// to clean SofaTraceContext
SofaTraceContext sofaTraceContext = SofaTraceContextHolder
.getSofaTraceContext();
SofaTracerSpan clientSpan = sofaTraceContext.pop();
if (clientSpan != null) {
// Record client send event
sofaTracerSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
// cache the current span
TracerSpanMap.put(getTracerSpanMapKey(invoker), sofaTracerSpan);
if (clientSpan != null && clientSpan.getParentSofaTracerSpan() != null) {
//restore parent
sofaTraceContext.push(clientSpan.getParentSofaTracerSpan());
}
}
}
}
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if(!RpcTraceContext.getTraceConfig().isEnabled()){
return invoker.invoke(invocation);
}
ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();
if(null==RpcTraceContext.getTraceId()){
RpcTraceContext.start();
RpcTraceContext.setTraceId(IdUtils.get());
RpcTraceContext.setParentId(null);
RpcTraceContext.setSpanId(IdUtils.get());
}
else {
RpcTraceContext.setParentId(RpcTraceContext.getSpanId());
RpcTraceContext.setSpanId(IdUtils.get());
}
TraceContext traceContext= TraceContext.newBuilder()
.traceId(RpcTraceContext.getTraceId())
.parentId(RpcTraceContext.getParentId())
.spanId(RpcTraceContext.getSpanId())
.sampled(true)
.build();
Span span=tracer.toSpan(traceContext).start();
invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId()));
invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId()));
logger.info("consumer:traceId={},parentId={},spanId={}",
span.context().traceId(),
span.context().parentId(),
span.context().spanId());
RpcContext rpcContext = RpcContext.getContext();
boolean isAsync=false;
Future<Object> future = rpcContext.getFuture();
if (future instanceof FutureAdapter) {
isAsync = true;
((FutureAdapter) future).getFuture().setCallback(new AsyncSpanCallback(span));
}
Result result = null;
boolean isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
try {
result = invoker.invoke(invocation);
}
finally {
if(isOneway) {
span.flush();
}
else if(!isAsync) {
span.finish();
}
}
return result;
}