下面列出了 io.netty.handler.codec.http2.DelegatingDecompressorFrameListener #com.alipay.sofa.rpc.common.RpcConstants 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void updateAllProviders(List<ProviderGroup> providerGroups) {
ConcurrentHashSet<ProviderInfo> tmpDirectUrl = new ConcurrentHashSet<ProviderInfo>();
ConcurrentHashSet<ProviderInfo> tmpRegistry = new ConcurrentHashSet<ProviderInfo>();
for (ProviderGroup providerGroup : providerGroups) {
if (!ProviderHelper.isEmpty(providerGroup)) {
if (RpcConstants.ADDRESS_DIRECT_GROUP.equals(providerGroup.getName())) {
tmpDirectUrl.addAll(providerGroup.getProviderInfos());
} else {
tmpRegistry.addAll(providerGroup.getProviderInfos());
}
}
}
wLock.lock();
try {
this.directUrlGroup.setProviderInfos(new ArrayList<ProviderInfo>(tmpDirectUrl));
this.registryGroup.setProviderInfos(new ArrayList<ProviderInfo>(tmpRegistry));
} finally {
wLock.unlock();
}
}
public static void main(String[] args) {
ApplicationConfig clientApp = new ApplicationConfig().setAppName("triple-client");
/* RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setProtocol("zookeeper").setAddress("127.0.0.1:2181");*/
ConsumerConfig<SofaGreeterTriple.IGreeter> consumerConfig = new ConsumerConfig<SofaGreeterTriple.IGreeter>();
consumerConfig.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName())
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
//.setRegistry(registryConfig)
.setApplication(clientApp)
.setDirectUrl("tri://sofagw-rz00b-100083024149.eu95.alipay.net:80")
.setRegister(false);
SofaGreeterTriple.IGreeter greeterBlockingStub = consumerConfig.refer();
LOGGER.info("Triple stub bean successful: {}", greeterBlockingStub.getClass().getName());
MultiMtopTripleRequest request = MultiMtopTripleRequest.newBuilder().setRids("1").build();
MultiMtopTripleResponse response = greeterBlockingStub.getTripleData(request);
System.out.println("fuck");
System.out.println(response.getData());
}
@BeforeClass
public static void startServer() {
RpcRunningState.setUnitTestMode(true);
// 只有2个线程 执行
serverConfig = new ServerConfig()
.setStopTimeout(0)
.setPort(22222)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setQueues(100).setCoreThreads(5).setMaxThreads(5);
// 发布一个服务,每个请求要执行1秒
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl())
.setServer(serverConfig)
.setRegister(false);
providerConfig.export();
}
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
try {
if (RpcInternalContext.isAttachmentEnable()) {
// 补充客户端request长度
RpcInternalContext context = RpcInternalContext.getContext();
context.setAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE,
requestContext.getHeaderString(HttpHeaders.CONTENT_LENGTH));
}
RestTracerAdapter.beforeSend(requestContext);
} catch (Exception e) {
logger.error(LogCodes.getLog(LogCodes.ERROR_TRACER_UNKNOWN_EXP, "filter", "rest", "client"), e);
}
}
/**
* Convert consumer to url.
*
* @param consumerConfig the ConsumerConfig
* @return the url list
*/
public static String convertConsumerToUrl(ConsumerConfig consumerConfig) {
StringBuilder sb = new StringBuilder(200);
String host = SystemInfo.getLocalHost();
//noinspection unchecked
sb.append(consumerConfig.getProtocol()).append("://").append(host).append("?version=1.0")
.append(getKeyPairs(RpcConstants.CONFIG_KEY_UNIQUEID, consumerConfig.getUniqueId()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_PID, RpcRuntimeContext.PID))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_TIMEOUT, consumerConfig.getTimeout()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_ID, consumerConfig.getId()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_GENERIC, consumerConfig.isGeneric()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_INTERFACE, consumerConfig.getInterfaceId()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_APP_NAME, consumerConfig.getAppName()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_SERIALIZATION,
consumerConfig.getSerialization()))
.append(getKeyPairs(ProviderInfoAttrs.ATTR_START_TIME, RpcRuntimeContext.now()))
.append(convertMap2Pair(consumerConfig.getParameters()));
addCommonAttrs(sb);
return sb.toString();
}
private void onEvent(ClientEndInvokeEvent event) {
InvokeMeta meta = new InvokeMeta(
event.getRequest(),
event.getResponse(),
getLongAvoidNull(RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE))
);
RpcInternalContext context = RpcInternalContext.getContext();
Duration elapsed = meta.elapsed();
Tags tags = meta.tags(this.common);
clientTotal.apply(tags).record(elapsed);
if (!meta.success()) {
clientFail.apply(tags).record(elapsed);
}
requestSize.apply(tags).record(getLongAvoidNull(
context.getAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE)));
responseSize.apply(tags).record(getLongAvoidNull(
context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE)));
}
@Override
public String getInterfaceId() {
if (StringUtils.equals(RpcConstants.PROTOCOL_TYPE_TRIPLE, this.getProtocol())) {
Class enclosingClass = this.getProxyClass().getEnclosingClass();
Method sofaStub = null;
String serviceName = interfaceId;
try {
sofaStub = enclosingClass.getDeclaredMethod("getServiceName");
serviceName = (String) sofaStub.invoke(null);
} catch (Throwable e) {
//ignore
}
return serviceName;
} else {
return interfaceId;
}
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
final Http2Connection connection = new DefaultHttp2Connection(false);
connectionHandler = new HttpToHttp2ConnectionHandlerBuilder()
.frameListener(
new DelegatingDecompressorFrameListener(connection, new InboundHttp2ToHttpAdapterBuilder(connection)
.maxContentLength(transportConfig.getPayload()).propagateSettings(true).build()))
.connection(connection).build();
responseHandler = new Http2ClientChannelHandler();
settingsHandler = new Http2SettingsHandler(ch.newPromise());
String protocol = transportConfig.getProviderInfo().getProtocolType();
if (RpcConstants.PROTOCOL_TYPE_H2.equals(protocol)) {
configureSsl(ch);
} else if (RpcConstants.PROTOCOL_TYPE_H2C.equals(protocol)) {
if (!useH2cPriorKnowledge) {
configureClearTextWithHttpUpgrade(ch);
} else {
configureClearTextWithPriorKnowledge(ch);
}
}
}
@BeforeClass
public static void before() {
ServerConfig serverConfig = new ServerConfig()
.setStopTimeout(60000)
.setPort(8803)
.setProtocol(RpcConstants.PROTOCOL_TYPE_REST);
ProviderConfig<RestService> providerConfig = new ProviderConfig<RestService>()
.setInterfaceId(RestService.class.getName())
.setRef(new RestServiceImpl())
.setServer(serverConfig)
.setBootstrap("rest")
.setRegister(false);
providerConfig.export();
ConsumerConfig<RestService> consumerConfig = new ConsumerConfig<RestService>()
.setInterfaceId(RestService.class.getName())
.setDirectUrl("rest://127.0.0.1:8803")
.setProtocol("rest")
.setBootstrap("rest")
.setTimeout(30000)
.setRegister(false);
restService = consumerConfig.refer();
}
@Override
public List<ProviderInfo> route(SofaRequest request, List<ProviderInfo> providerInfos) {
//has address. FIXME
if (CommonUtils.isNotEmpty(providerInfos)) {
return providerInfos;
}
AddressHolder addressHolder = consumerBootstrap.getCluster().getAddressHolder();
if (addressHolder != null) {
List<ProviderInfo> current = addressHolder.getProviderInfos(RpcConstants.ADDRESS_DEFAULT_GROUP);
if (providerInfos != null) {
providerInfos.addAll(current);
} else {
providerInfos = current;
}
}
recordRouterWay(RPC_REGISTRY_ROUTER);
return providerInfos;
}
@Test
public void testFuture() throws SofaRpcException, ExecutionException, InterruptedException {
consumerConfig.setInvokeType(RpcConstants.INVOKER_TYPE_FUTURE);
consumerConfig.unRefer();
helloService = consumerConfig.refer();
for (int i = 0; i < 5; i++) {
helloService.sayHello("liangen");
try {
RpcInvokeContext.getContext().getFuture().get();
} catch (Exception e) {
LOGGER.info("future超时");
}
}
Thread.sleep(1000);
final ProviderInfo providerInfo = getProviderInfoByHost(consumerConfig, "127.0.0.1");
InvocationStatDimension statDimension = new InvocationStatDimension(providerInfo, consumerConfig);
InvocationStat invocationStat = InvocationStatFactory.getInvocationStat(statDimension);
Assert.assertEquals(5, delayGetCount(invocationStat, 5));
InvocationStatFactory.removeInvocationStat(invocationStat);
}
private void sendHttp2Response0(HttpResponseStatus status, boolean error, ByteBuf data) {
Http2Headers headers = new DefaultHttp2Headers().status(status.codeAsText());
if (request.getSerializeType() > 0) {
String serialization = SerializerFactory.getAliasByCode(request.getSerializeType());
headers.set(RemotingConstants.HEAD_SERIALIZE_TYPE, serialization);
} else {
headers.set(CONTENT_TYPE, "text/plain; charset=" + RpcConstants.DEFAULT_CHARSET.displayName());
}
if (error) {
headers.set(RemotingConstants.HEAD_RESPONSE_ERROR, "true");
}
if (data != null) {
encoder.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
encoder.writeData(ctx, streamId, data, 0, true, ctx.newPromise());
} else {
encoder.writeHeaders(ctx, streamId, headers, 0, true, ctx.newPromise());
}
}
/**
* For convert provider to bolt url.
*
* @param transportConfig ClientTransportConfig
* @param providerInfo ProviderInfo
* @return Bolt Url
*/
protected Url convertProviderToUrl(ClientTransportConfig transportConfig, ProviderInfo providerInfo) {
// Url的第一个参数,如果不用事件的话,其实无所谓
Url boltUrl = new Url(providerInfo.toString(), providerInfo.getHost(), providerInfo.getPort());
boltUrl.setConnectTimeout(transportConfig.getConnectTimeout());
// 默认初始化connNum个长连接,为了slb和vip的情况
final int connectionNum = transportConfig.getConnectionNum();
if (connectionNum > 0) {
boltUrl.setConnNum(connectionNum);
} else {
boltUrl.setConnNum(1);
}
boltUrl.setConnWarmup(false); // true的话
if (RpcConstants.PROTOCOL_TYPE_BOLT.equals(providerInfo.getProtocolType())) {
boltUrl.setProtocol(RemotingConstants.PROTOCOL_BOLT);
} else {
boltUrl.setProtocol(RemotingConstants.PROTOCOL_TR);
}
return boltUrl;
}
/**
* 客户端记录序列化请求的耗时和
*
* @param requestCommand 请求对象
*/
protected void recordSerializeRequest(RequestCommand requestCommand, InvokeContext invokeContext) {
if (!RpcInternalContext.isAttachmentEnable()) {
return;
}
RpcInternalContext context = null;
if (invokeContext != null) {
// 客户端异步调用的情况下,上下文会放在InvokeContext中传递
context = invokeContext.get(RemotingConstants.INVOKE_CTX_RPC_CTX);
}
if (context == null) {
context = RpcInternalContext.getContext();
}
int cost = context.getStopWatch().tick().read();
int requestSize = RpcProtocol.getRequestHeaderLength()
+ requestCommand.getClazzLength()
+ requestCommand.getContentLength()
+ requestCommand.getHeaderLength();
// 记录请求序列化大小和请求序列化耗时
context.setAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE, requestSize);
context.setAttachment(RpcConstants.INTERNAL_KEY_REQ_SERIALIZE_TIME, cost);
}
/**
* 客户端记录响应反序列化大小和响应反序列化耗时
*
* @param responseCommand 响应体
*/
private void recordDeserializeResponse(RpcResponseCommand responseCommand, InvokeContext invokeContext) {
if (!RpcInternalContext.isAttachmentEnable()) {
return;
}
RpcInternalContext context = null;
if (invokeContext != null) {
// 客户端异步调用的情况下,上下文会放在InvokeContext中传递
context = invokeContext.get(RemotingConstants.INVOKE_CTX_RPC_CTX);
}
if (context == null) {
context = RpcInternalContext.getContext();
}
int cost = context.getStopWatch().tick().read();
int respSize = RpcProtocol.getResponseHeaderLength()
+ responseCommand.getClazzLength()
+ responseCommand.getContentLength()
+ responseCommand.getHeaderLength();
// 记录响应反序列化大小和响应反序列化耗时
context.setAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE, respSize);
context.setAttachment(RpcConstants.INTERNAL_KEY_RESP_DESERIALIZE_TIME, cost);
}
@Test
public void testRecordClientElapseTime() {
BoltInvokerCallback invokerCallback = new BoltInvokerCallback(null, null,
null, null, null, null);
invokerCallback.recordClientElapseTime();
Long elapse = (Long) RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE);
Assert.assertNull(elapse);
RpcInternalContext context = RpcInternalContext.getContext();
invokerCallback = new BoltInvokerCallback(null, null,
null, null, context, null);
invokerCallback.recordClientElapseTime();
elapse = (Long) context.getAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE);
Assert.assertNull(elapse);
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now() - 1000);
invokerCallback.recordClientElapseTime();
elapse = (Long) context.getAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE);
Assert.assertNotNull(elapse);
}
@Test
public void testAll() {
final String directUrl = "bolt://127.0.0.1:12300";
final ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setDirectUrl(directUrl)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setBootstrap("bolt")
.setApplication(new ApplicationConfig().setAppName("clientApp"))
.setReconnectPeriod(1000);
HelloService helloService = consumerConfig.refer();
// 关闭后再调用一个抛异常
try {
helloService.sayHello("xx", 22);
} catch (Exception e) {
// 应该抛出异常
Assert.assertTrue(e instanceof SofaRouteException);
Assert.assertTrue(e.getMessage().contains(directUrl));
}
}
public void notifyConsumerTest() {
LocalRegistry registry = new LocalRegistry(new RegistryConfig());
ConsumerConfig<?> consumer = new ConsumerConfig();
consumer.setInterfaceId("test");
LocalRegistryTest.MockProviderInfoListener providerInfoListener = new LocalRegistryTest.MockProviderInfoListener();
consumer.setProviderInfoListener(providerInfoListener);
registry.subscribe(consumer);
String key = LocalRegistryHelper.buildListDataId(consumer, consumer.getProtocol());
registry.memoryCache.put(key, new ProviderGroup());
Map<String, ProviderGroup> newCache = new HashMap<String, ProviderGroup>();
ProviderGroup newProviderGroup = new ProviderGroup();
ProviderInfo providerInfo = new ProviderInfo().setHost("0.0.0.0");
newProviderGroup.add(providerInfo);
newCache.put(key, newProviderGroup);
registry.notifyConsumer(newCache);
Map<String, ProviderGroup> ps = providerInfoListener.getData();
Assert.assertTrue(ps.size() > 0);
Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP));
Assert.assertTrue(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size() == 1);
}
private static String convertInstanceToUrl(Instance instance) {
Map<String, String> metaData = instance.getMetadata();
if (metaData == null) {
metaData = new HashMap<String, String>();
}
String uri = "";
String protocol = metaData.get(RpcConstants.CONFIG_KEY_PROTOCOL);
if (StringUtils.isNotEmpty(protocol)) {
uri = protocol + "://";
}
uri += instance.getIp() + ":" + instance.getPort();
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : metaData.entrySet()) {
sb.append("&").append(entry.getKey()).append("=").append(entry.getValue());
}
if (sb.length() > 0) {
uri += sb.replace(0, 1, "?").toString();
}
return uri;
}
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
// Now only support sync invoke.
if (request.getInvokeType() != null && !RpcConstants.INVOKER_TYPE_SYNC.equals(request.getInvokeType())) {
return invoker.invoke(request);
}
String interfaceResourceName = getInterfaceResourceName(request);
String methodResourceName = getMethodResourceName(request);
Entry interfaceEntry = null;
Entry methodEntry = null;
try {
interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.OUT);
methodEntry = SphU.entry(methodResourceName, ResourceTypeConstants.COMMON_RPC,
EntryType.OUT, getMethodArguments(request));
SofaResponse response = invoker.invoke(request);
traceResponseException(response, interfaceEntry, methodEntry);
return response;
} catch (BlockException e) {
return SofaRpcFallbackRegistry.getConsumerFallback().handle(invoker, request, e);
} catch (Throwable t) {
throw traceOtherException(t, interfaceEntry, methodEntry);
} finally {
if (methodEntry != null) {
methodEntry.exit(1, getMethodArguments(request));
}
if (interfaceEntry != null) {
interfaceEntry.exit();
}
}
}
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
SofaHystrixInvokable command;
if (RpcConstants.INVOKER_TYPE_SYNC.equals(request.getInvokeType())) {
command = new SofaHystrixCommand(invoker, request);
} else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(request.getInvokeType())) {
command = new SofaAsyncHystrixCommand(invoker, request);
} else {
return invoker.invoke(request);
}
return command.invoke();
}
@Override
public List<ProviderInfo> route(SofaRequest request, List<ProviderInfo> providerInfos) {
AddressHolder addressHolder = consumerBootstrap.getCluster().getAddressHolder();
if (addressHolder != null) {
List<ProviderInfo> current = addressHolder.getProviderInfos(RpcConstants.ADDRESS_DIRECT_GROUP);
if (providerInfos != null) {
providerInfos.addAll(current);
} else {
providerInfos = current;
}
}
recordRouterWay(RPC_DIRECT_URL_ROUTER);
return providerInfos;
}
public static void main(String[] args) {
String file = System.getProperty("user.home") + File.separator
+ "localFileTest" + File.separator + "localRegistry.reg";
RegistryConfig registryConfig = new RegistryConfig().setProtocol("local")
.setFile(file);
ServerConfig serverConfig = new ServerConfig()
.setPort(22222)
.setDaemon(false);
ServerConfig serverConfig2 = new ServerConfig()
.setPort(22200)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setDaemon(false);
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl())
.setServer(serverConfig)
.setRegistry(registryConfig);
ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
.setInterfaceId(EchoService.class.getName())
.setRef(new EchoServiceImpl())
.setServer(serverConfig)
.setRegistry(registryConfig);
providerConfig.export();
providerConfig2.export();
LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
}
public static void main(String[] args) throws InterruptedException {
ApplicationConfig applicationConfig = new ApplicationConfig().setAppName("oneway-client");
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setApplication(applicationConfig)
.setInterfaceId(HelloService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_ONEWAY)
.setTimeout(50000)
.setDirectUrl("bolt://127.0.0.1:22222?appName=oneway-server");
HelloService helloService = consumerConfig.refer();
ConsumerConfig<EchoService> consumerConfig2 = new ConsumerConfig<EchoService>()
.setApplication(applicationConfig)
.setInterfaceId(EchoService.class.getName())
.setInvokeType(RpcConstants.INVOKER_TYPE_ONEWAY)
.setTimeout(50000)
.setDirectUrl("bolt://127.0.0.1:22222?appName=oneway-server");
EchoService echoService = consumerConfig2.refer();
LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
while (true) {
try {
String s1 = helloService.sayHello("xxx", 22);
LOGGER.warn("must null :{}", s1);
String s2 = echoService.echoStr("yyy");
LOGGER.warn("must null :{}", s2);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
try {
Thread.sleep(2000);
} catch (Exception ignore) {
}
}
}
public static void main(String[] args) {
/**
* 运行需要pom.xml里增加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<scope>test</scope>
</dependency>
*/
RegistryConfig registryConfig = new RegistryConfig()
.setProtocol(RpcConstants.REGISTRY_PROTOCOL_ZK)
.setAddress("127.0.0.1:2181");
ServerConfig serverConfig = new ServerConfig()
.setPort(22101)
.setDaemon(false);
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl("result from 22101"))
.setServer(serverConfig)
.setRegistry(registryConfig);
ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
.setInterfaceId(EchoService.class.getName())
.setRef(new EchoServiceImpl())
.setServer(serverConfig)
.setRegistry(registryConfig);
providerConfig.export();
providerConfig2.export();
LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
}
@Test
public void testUseCorrentAuth() {
parameters.put("scheme", "digest");
//如果存在多个认证信息,则在参数形式为为user1:passwd1,user2:passwd2
parameters.put("addAuth", "sofazk:rpc1");
registryConfig = new RegistryConfig()
.setProtocol(RpcConstants.REGISTRY_PROTOCOL_ZK)
.setAddress("127.0.0.1:2181/authtest")
.setParameters(parameters);
serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12200) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程
ProviderConfig<EchoService> providerConfig = new ProviderConfig<EchoService>()
.setRegistry(registryConfig)
.setInterfaceId(EchoService.class.getName()) // 指定接口
.setRef(new EchoServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端
providerConfig.export(); // 发布服务
ConsumerConfig<EchoService> consumerConfig = new ConsumerConfig<EchoService>()
.setRegistry(registryConfig)
.setInterfaceId(EchoService.class.getName()) // 指定接口
.setProtocol("bolt") // 指定协议
.setTimeout(3000)
.setConnectTimeout(10 * 1000);
EchoService echoService = consumerConfig.refer();
String result = echoService.echoStr("auth test");
Assert.assertEquals("auth test", result);
}
@Override
public void clientAsyncAfterSend(SofaRequest request) {
//客户端的启动
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
//获取并不弹出
SofaTracerSpan clientSpan = sofaTraceContext.getCurrentSpan();
if (clientSpan == null) {
SelfLog.warn("ClientSpan is null.Before call interface=" + request.getInterfaceName() + ",method=" +
request.getMethodName());
return;
}
RpcInternalContext rpcInternalContext = RpcInternalContext.getContext();
// 异步callback同步
if (request.isAsync()) {
//异步,这个时候除了缓存spanContext clientBeforeSendRequest() rpc 已经调用
//还需要这个时候需要还原回父 span
//弹出;不弹出的话当前线程就会一直是client了
clientSpan = sofaTraceContext.pop();
if (clientSpan != null) {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
//将当前 span 缓存在 request 中,注意:这个只是缓存不需要序列化到服务端
rpcInternalContext.setAttachment(RpcConstants.INTERNAL_KEY_TRACER_SPAN, clientSpan);
if (clientSpan != null && clientSpan.getParentSofaTracerSpan() != null) {
//restore parent
sofaTraceContext.push(clientSpan.getParentSofaTracerSpan());
}
} else {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
}
@Override
public ProviderGroup getProviderGroup(String groupName) {
rLock.lock();
try {
return RpcConstants.ADDRESS_DIRECT_GROUP.equals(groupName) ? directUrlGroup
: registryGroup;
} finally {
rLock.unlock();
}
}
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
SofaTracerSpan serverSpan = null;
try {
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
serverSpan = sofaTraceContext.getCurrentSpan();
if (serverSpan != null) {
RpcInternalContext context = RpcInternalContext.getContext();
serverSpan.setTag(RpcSpanTags.SERVICE, request.getTargetServiceUniqueName());
serverSpan.setTag(RpcSpanTags.METHOD, request.getMethodName());
serverSpan.setTag(RpcSpanTags.REMOTE_IP, context.getRemoteHostName()); // 客户端地址
// 从请求里获取ConsumerTracerFilter额外传递的信息
serverSpan.setTag(RpcSpanTags.REMOTE_APP, (String) request.getRequestProp(HEAD_APP_NAME));
serverSpan.setTag(RpcSpanTags.PROTOCOL, (String) request.getRequestProp(HEAD_PROTOCOL));
serverSpan.setTag(RpcSpanTags.INVOKE_TYPE, (String) request.getRequestProp(HEAD_INVOKE_TYPE));
ProviderConfig providerConfig = (ProviderConfig) invoker.getConfig();
serverSpan.setTag(RpcSpanTags.LOCAL_APP, providerConfig.getAppName());
serverSpan.setTag(RpcSpanTags.SERVER_THREAD_POOL_WAIT_TIME,
(Number) context.getAttachment(RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME));
}
return invoker.invoke(request);
} finally {
if (serverSpan != null) {
serverSpan.setTag(RpcSpanTags.SERVER_BIZ_TIME,
(Number) RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE));
}
}
}
private ConsumerConfig<HystrixService> defaultClient() {
return new ConsumerConfig<HystrixService>()
.setInterfaceId(HystrixService.class.getName())
.setDirectUrl("bolt://127.0.0.1:22222")
.setInvokeType(RpcConstants.INVOKER_TYPE_FUTURE)
.setParameter(HystrixConstants.SOFA_HYSTRIX_ENABLED, String.valueOf(true))
.setRepeatedReferLimit(-1);
}