21 调用流程:消费方的调用流程体系,你知道多少?
你好,我是何辉。今天我们深入研究Dubbo源码的第十篇,调用流程。
在消费方这样的代码你一定见过很多了。
在 Spring Bean 中定义 Dubbo 接口为成员变量时,用 @DubboReference 注解修饰 DemoFacade。
你有没有好奇过,我们现在看到的这个 demoFacade 变量,在内存运行时,值类型还属于 DemoFacade 这个类型么?如果不是,那拿着 demoFacade 变量去调用里面的方法时,在消费方到底会经历怎样的调用流程呢?
这个问题,我以前也探索过,不过因为源码太复杂,最后都徒劳无功。直到有一天,遇到了一位资深大佬,指点了我看源码的 12 字方针:不钻细节:只看流程;不看过程:只看结论;再看细节:再看过程。参考他的经验,我经过一番简单的调试后,就轻松梳理出了调用流程的大体框架。
今天我就带你按照这12字方针的思路,感受一下高手是如何研究源码流程的。每个环节,我会用图片总结,加强你对流程的形象化理解。今天的内容稍微多一些,做好准备,我们马上开始。
sayHello 调试
先来看下我们的消费方调用代码。
///////////////////////////////////////////////////
// 消费方的一个 Spring Bean 类
// 1、里面定义了下游 Dubbo 接口的成员变量,并且还用 @DubboReference 修饰了一下。
// 2、还定义了一个 invokeDemo 方法被外部调用,但其重点是该方法可以调用下游的 Dubbo 接口
///////////////////////////////////////////////////
@Component
public class InvokeDemoService {
// 定义调用下游接口的成员变量,
// 并且用注解修饰
@DubboReference
private DemoFacade demoFacade;
// invokeDemo 是被外部触发调用的,不过这不是重点,
// 重点的是该 invokeDemo 逻辑中能调用下游的接口,
// 这里调用的是下游 DemoFacade 接口的 sayHello 方法
public String invokeDemo(){
return demoFacade.sayHello("Geek");
}
}
代码在这个类中定义了一个 invokeDemo 的方法,然后,在方法体中对下游 DemoFacade 接口的 sayHello 方法发起了远程调用。
是不是发现消费方真的非常简单且普通。那怎么展开探索呢?我们以 Debug 的方式调试消费方,在 sayHello 方法打个断点,通过断点调试,钻进调用流程的各个环节,同时参考12字方针,深入研究调用流程的底层逻辑。
1. JDK代理
我们在调用 sayHello 方法的这行打上一个断点,先运行提供方,再 Debug 运行消费方,很快断点就到来了。
还记得前面提出的一个疑问么,demoFacade 在内存运行时,值类型是什么?
从图中,我们可以清楚地看到,demoFacade 的类型,是一个随机生成的代理类名,不再属于 DemoFacade 这个类型了,而且结合 $Proxy 类名、代理类中的 h 成员变量属于 JdkDynamicAopProxy 类型,综合判断,这是采用 JDK 代理动态,生成了一个继承 Proxy 的代理类。
如果你再展开 h 属性看看它里面的成员变量。
有 2 个比较重要的字段,targetSource 和 interfaces,我们挨个看下。
先来看看 targetSource 变量(ReferenceBean$DubboReferenceLazyInitTargetSource)。
看到它的类名,有没有似曾相识的感觉,类名上包含了 DubboReference 关键字,想必跟订阅流程中的引用有关。而且类名的构成,由 $ 符号隔开了,可以说明 DubboReferenceLazyInitTargetSource 是 ReferenceBean 的一个内部类。
进去瞄一眼,看看猜想对不对。
///////////////////////////////////////////////////
// org.apache.dubbo.config.spring.ReferenceBean.DubboReferenceLazyInitTargetSource
// 懒加载初始化的一种操作
///////////////////////////////////////////////////
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
@Override
protected Object createObject() throws Exception {
// 创建对象,其实就是创建代理对象,拿到远程引用
return getCallProxy();
}
@Override
public synchronized Class<?> getTargetClass() {
return getInterfaceClass();
}
}
↓
///////////////////////////////////////////////////
// org.apache.dubbo.config.spring.ReferenceBean#getCallProxy
// 直接通过 referenceConfig 的 get 方法获取接口引用的代理对象
///////////////////////////////////////////////////
private Object getCallProxy() throws Exception {
if (referenceConfig == null) {
throw new IllegalStateException("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started.");
}
//get reference proxy
return referenceConfig.get();
}
进去之后才发现,原来内部类是一层壳,核心创建对象的逻辑还是 ReferenceConfig 的 get 方法。
我们再来看 interfacaces 变量。
这里除了有我们调用下游的接口 DemoFacade,还有一个回声测试接口(EchoService)和一个销毁引用的接口(Destroyable)。
这俩接口我们没见过,却被 Dubbo 在创建代理的过程中追加进来了,可以说明一点,我们可以拿着 demoFacade 变量强转为 EchoService,或者强转为 Destroyable,强转后就可以调用对应接口的方法,实现了一个代理类拥有多态功能的效果。
这个多态功能的支持,得归功于 JDK 的 Proxy 类,它在创建代理对象的 newProxyInstance 方法中,支持入参传入一个接口数组,Dubbo 框架在创建代理时,不但传入了 DemoFacade 接口,还传入了 EchoService 和 Destroyable 接口,生成的代理对象就有了另外两种接口的行为能力。
///////////////////////////////////////////////////
// java.lang.reflect.Proxy#newProxyInstance
// JDK 的 Proxy 类中,创建代理对象的方法
///////////////////////////////////////////////////
@CallerSensitive
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
{ 省略部分实现代码... }
好,简单整理一下,刚开始断点探索,我们就接触到了“高深”的 JDK 代理对象,解开了开头的一个疑惑, demoFacade 在运行时并非我们所见的 DemoFacade 类型,而是由 JDK 动态代理生成的一个代理对象类型。
碰巧发现,在生成的代理对象中,targetSource 成员变量创建的底层核心逻辑还是 ReferenceConfig 的 get 方法,不得不说 ReferenceConfig 是消费方引用下游接口逻辑中非常重要的一个类。
同时还认识了 EchoService 和 Destroyable 两个接口,让我们使用 demoFacade 时不仅可以调用 sayHello 方法,还可以强转为这两个接口调用不同的方法,使得一个 demoFacade 变量拥有三能能力,这就是代理增强的魅力所在。
2. InvokerInvocationHandler
接下来,我们继续 Debug 进入 sayHello 方法中,发现了一个名字有点眼熟的 InvokerInvocationHandler 类,虽然不知道具体能做啥,但是发现它实现了 JDK 代理中的 InvocationHandler 接口,所以,我们可以认为,这个 InvokerInvocationHandler 类是 Dubbo 框架接收 JDK 代理触发调用的入口。
来看看 InvokerInvocationHandler 的 invoke 方法,验证一下。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
// JDK 代理被触发调用后紧接着就开始进入 Dubbo 框架的调用,
// 因此跟踪消费方调用的入口,一般直接搜索这个 InvokerInvocationHandler 即可,
// 再说一点,这个 InvokerInvocationHandler 继承了 InvocationHandler 接口。
///////////////////////////////////////////////////
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果方法所在的类是 Object 类型的话,则不做任何处理
// 毕竟 Object 类型是 JDK 源码中的类,也不是 Dubbo 框架处理的重点
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 获取方法名,方法参数类型
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 如果参数类型个数是 0 个的话,则表示是无参数方法
// 既然是无参数方法的话,对于一些特殊的方法需要尽可能的在入口处解决
// 因此对于 Object 中的 toString、hashCode 方法,Destroyable 接口实现类的 $destroy 方法
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
}
// 如果方法参数个数为 1 个,并且还是 equals 方法,则也不是 Dubbo 框架处理的重点
else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 还能来到这里,说明这才是 Dubbo 框架需要介入处理的方法
// 前面那么多提前返回的逻辑,在前几次阅读时,根本不需要细看,可以直接跳过不看
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method, invoker.getInterface().getName(), protocolServiceKey, args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
// 然后转手就把逻辑全部收口到一个 InvocationUtil 类中,
// 从命名也看得出,就是一个调用的工具类
return InvocationUtil.invoke(invoker, rpcInvocation);
}
按12字方针,从上往下大致浏览一遍。
- 从代码表面的流程看,一些 toString、$destroy、hashCode 等方法做了特殊的逻辑提前返回了,最终调用 InvocationUtil 的 invoke 方法进行了收口处理。
- 从方法的入参和返回值看,有方法对象、入参数据、代理对象,满足了反射调用的基本要素,调用后返回的数据,自然就是我们需要的结果。
- 再看方法实现体的一些细节,发现最终并没有使用 proxy 代理对象,而是使用了 invoker + rpcInvocation 传入 InvocationUtil 工具类,完成了逻辑收口。
再进入 InvocationUtil,忽略与入参无关的逻辑,最终发现把 rpcInvocation 对象传入了 invoker 的 invoke 方法中,继续走后续调用逻辑。
3. MigrationInvoker
Debug 一直往后走,来到了一个 MigrationInvoker 的调用类,从类的名字上看是“迁移”的意思,有点 Dubbo2 与 Dubbo3 新老兼容迁移的味道。
那来看看 MigrationInvoker 的 invoke 方法代码逻辑。
///////////////////////////////////////////////////
// org.apache.dubbo.registry.client.migration.MigrationInvoker#invoke
// 迁移兼容调用器
///////////////////////////////////////////////////
@Override
public Result invoke(Invocation invocation) throws RpcException {
if (currentAvailableInvoker != null) {
if (step == APPLICATION_FIRST) {
// call ratio calculation based on random value
if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
return invoker.invoke(invocation);
}
}
return currentAvailableInvoker.invoke(invocation);
}
switch (step) {
case APPLICATION_FIRST:
if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
currentAvailableInvoker = serviceDiscoveryInvoker;
} else if (checkInvokerAvailable(invoker)) {
currentAvailableInvoker = invoker;
} else {
currentAvailableInvoker = serviceDiscoveryInvoker;
}
break;
case FORCE_APPLICATION:
currentAvailableInvoker = serviceDiscoveryInvoker;
break;
case FORCE_INTERFACE:
default:
currentAvailableInvoker = invoker;
}
return currentAvailableInvoker.invoke(invocation);
}
继续按 12 字方针分析。
- 从代码流程看,从上到下都在针对 step 变量值,进行 if…else 判断走不走分支处理,说明这里针对 step 进行了逻辑分发处理。
- 从方法的入参和返回值来看,入参 invocation 是一个富含所有请求信息的接口,返参也是一个抽象的 Result 接口,意思很直白,根据参数拿到结果,只是入参和返参是接口,体现了抽象的概念。
- 再看看方法实现体的一些细节,通过针对 APPLICATION_FIRST、FORCE_APPLICATION、FORCE_INTERFACE 来进行分发处理,这不就是消费方设置 dubbo.application.service-discovery.migration 属性,进行新老订阅方案兼容的值么?根据不同的属性值,选择对应可用的 invoker 进行继续后续调用。
4. MockClusterInvoker
然后走进”step == APPLICATION_FIRST“的分支逻辑,进入 currentAvailableInvoker 的 invoke 方法,来到了 MockClusterInvoker 这个类,看名字是“模拟集群调用”的意思。
本意是干啥的呢,我们继续看它的 invoke 方法逻辑。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
// 调用异常时进行使用mock逻辑来处理数据的返回
///////////////////////////////////////////////////
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result;
// 从远程引用的url中看看有没有 mock 属性
String value = getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
// mock 属性值为空的话,相当于没有 mock 逻辑,则直接继续后续逻辑调用
if (ConfigUtils.isEmpty(value)) {
//no mock
result = this.invoker.invoke(invocation);
}
// 如果 mock 属性值是以 force 开头的话
else if (value.startsWith("force")) {
// 那么就直接执行 mock 调用逻辑,
// 用事先准备好的模拟逻辑或者模拟数据返回
//force:direct mock
result = doMockInvoke(invocation, null);
}
// 还能来到这说明只是想在调用失败的时候尝试一下 mock 逻辑
else {
//fail-mock
try {
// 先正常执行业务逻辑调用
result = this.invoker.invoke(invocation);
//fix:#4585
// 当业务逻辑执行有异常时,并且这个异常类属于RpcException或RpcException子类时,
// 还有异常的原因如果是 Dubbo 框架层面的业务异常时,则不做任何处理。
// 如果不是业务异常的话,则会继续尝试执行 mock 业务逻辑
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
// 如果异常是 Dubbo 系统层面所认为的业务异常时,就不错任何处理
if(rpcException.isBiz()){
throw rpcException;
}else {
// 能来到这里说明是不是业务异常的话,那就执行模拟逻辑
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
// 业务异常直接往上拋
if (e.isBiz()) {
throw e;
}
// 不是 Dubbo 层面所和认为的异常信息时代,
// 直接
result = doMockInvoke(invocation, e);
}
}
return result;
}
照例按12字方针分析。
- 从代码流程看,整体上就三个 if 逻辑分支,no mock、force mock 和 fail mock,分别进行了不同逻辑的调用。
- 从方法的入参和返回值看,入参和返参和 MigrationInvoker 一样,都是重写了 invoke 方法,只是内部逻辑不一样,不同实现类做各自的事情。
- 再看看方法实现体的一些细节,mock 属性值(除了空值),当以 force 开头时,会直接执行 Mock 业务逻辑,其他情况还是先尝试正常后续调用,如果出现了非业务异常时,就尝试执行 Mock 业务逻辑。
5. 过滤器链
这里,看到 Cluster 这个关键字,想必你也想到了它是一个 SPI 接口,那在“发布流程”中我们也学过,远程导出和远程引用的时候,会用过滤器链把 invoker 层层包装起来。
那么我们就接着断点下去,发现确实进入 FutureFilter、MonitorFilter 等过滤器,这也证明了过滤器链包裹消费方 invoker 的存在。
6. FailoverClusterInvoker
断点一层层走完了所有的过滤器,接着又来到了 FailoverClusterInvoker 这个类,从名字上一看就是在“温故知新”中接触的故障转移策略,是不是有点好奇故障到底是怎么转移的?
我们不妨继续断点,进入它的 doInvoke 方法看看。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
// 故障转移策略的核心逻辑实现类
///////////////////////////////////////////////////
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
// 获取此次调用的方法名
String methodName = RpcUtils.getMethodName(invocation);
// 通过方法名计算获取重试次数
int len = calculateInvokeTimes(methodName);
// retry loop.
// 循环计算得到的 len 次数
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
// 从第2次循环开始,会有一段特殊的逻辑处理
if (i > 0) {
// 检测 invoker 是否被销毁了
checkWhetherDestroyed();
// 重新拿到调用接口的所有提供者列表集合,
// 粗俗理解,就是提供该接口服务的每个提供方节点就是一个 invoker 对象
copyInvokers = list(invocation);
// check again
// 再次检查所有拿到的 invokes 的一些可用状态
checkInvokers(copyInvokers, invocation);
}
// 选择其中一个,即采用了负载均衡策略从众多 invokers 集合中挑选出一个合适可用的
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
// 设置 RpcContext 上下文
RpcContext.getServiceContext().setInvokers((List) invoked);
boolean success = false;
try {
// 得到最终的 invoker 后也就明确了需要调用哪个提供方节点了
// 反正继续走后续调用流程就是了
Result result = invokeWithContext(invoker, invocation);
// 如果没有抛出异常的话,则认为正常拿到的返回数据
// 那么设置调用成功标识,然后直接返回 result 结果
success = true;
return result;
} catch (RpcException e) {
// 如果是 Dubbo 框架层面认为的业务异常,那么就直接抛出异常
if (e.isBiz()) { // biz exception.
throw e;
}
// 其他异常的话,则不继续抛出异常,那么就意味着还可以有机会再次循环调用
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
// 如果没有正常返回拿到结果的话,那么把调用异常的提供方地址信息记录起来
if (!success) {
providers.add(invoker.getUrl().getAddress());
}
}
}
// 如果 len 次循环仍然还没有正常拿到调用结果的话,
// 那么也不再继续尝试调用了,直接索性把一些需要开发人员关注的一些信息写到异常描述信息中,通过异常方式拋出去
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
代码流程分析也比较简单。
- 从代码流程看,主要就是一个大的 for 循环,循环体中进行了 select 操作,拿到了一个合适的 invoker,发起后续逻辑调用。
- 从方法的入参和返回值看,入参是 invocation、invokers、loadbalance 三个参数,猜测应该就是利用 loadbalance 负载均衡器,从 invokers 集合中,选择一个 invoker来发送 invocation 数据,发送完成后得到了返参的 Result 结果。
- 再看看方法实现体的一些细节,通过计算 retries 属性值得到重试次数并循环,每次循环都是利用负载均衡器选择一个进行调用,如果出现非业务异常,继续循环调用,直到所有次数循环完,还是没能拿到结果的话就会抛出 RpcException 异常了。
7. DubboInvoker
现在你应该知道“故障转移策略”的深层含义了吧,就是根据预先设置好的重试次数,当调用发生非业务异常时,按照负载均衡策略继续选择一个 invoker,再次发起调用。
了解完故障转移策略,我们继续 Debug,结果来到了 DubboInvoker 这个类。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
// 按照dubbo协议发起调用实现类
///////////////////////////////////////////////////
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
// 获取此次调用的方法名
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 获取发送数据的客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 看看是单程发送不需要等待响应,还是发送完了后需要等待响应
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 获取超时时间,这个之前在“配置加载顺序”接触过这个方法
int timeout = calculateTimeout(invocation, methodName);
invocation.setAttachment(TIMEOUT_KEY, timeout);
// 单程发送不需要等待响应
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 发送完了之后需要等待响应
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 操作 currentClient 发送了一个 request 请求,
// 然后接收了一个 CompletableFuture 对象,说明这里存在异步操作
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
- 从代码流程看,拿到了一个交换数据的客户端类,然后走两个发送数据的分支,一条分支逻辑单程调用不需要响应,一条有响应。
- 从方法的入参和返回值来看,没有什么特别,根据请求数据想办法发起调用,拿到 Result 结果。
但是当前的类名是处理 Dubbo 协议的调用?有点好奇,我们看看这个 doInvoke 有哪些实现类,发现了有 InjvmInvoker、GrpcInvoker 等实现类,也证实了当前的 DubboInvoker 是专门处理 Dubbo 协议调用的类。
- 再看看方法实现体的一些细节,计算了超时时间值,单程发送,不需要响应好说,直接调用交互数据客户端类的 send 方法就行,需要有响应的分支逻辑还定义了线程池,调用的是 request 方法进行发送。
不过,两条分支最终都返回了一个异步结果对象,可以看出当前的 DubboInvoker 旨在屏蔽协议之间的差异,让上层调用方不感知协议间的不同。
8. ReferenceCountExchangeClient
因为我们没有单独设置调用不需要响应,就继续断点进入 currentClient 的 request 方法,看看到底是怎么发送的,来到了 ReferenceCountExchangeClient 类。
///////////////////////////////////////////////////
// org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient#request(java.lang.Object, int, java.util.concurrent.ExecutorService)
// 这里将构建好的 request 对象发送出去,然后拿到了一个 CompletableFuture 异步化的对象
///////////////////////////////////////////////////
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
// client为:HeaderExchangeClient
return client.request(request, timeout, executor);
}
- 从代码流程来看,这个类中的 request 方法也很简单,把入参数据直接全部交给了当前类的成员变量 client 处理。
- 从方法的入参和返回值看,入参是请求对象、超时时间、线程池对象,返参是 CompletableFuture 对象。从线程池和 Future 对象,充分说明了这个 request 方法进行了同步转异步的操作。
- 再看看方法实现体的一些细节,从断点展示的变量值细节看,只是发现了 client 的类型属于 HeaderExchangeClient,也就意味着 HeaderExchangeClient 完成了同步转异步的逻辑操作。
9. NettyClient
仍然没有看到数据是怎么发送的,我们继续深入 HeaderExchangeClient 的 request 方法中,Debug 几步后发现,进入了抽象类 AbstractPeer 的 send 方法。
///////////////////////////////////////////////////
// org.apache.dubbo.remoting.transport.AbstractPeer#send
// this:NettyClient
// NettyClient 负责和提供方服务建立连接、发送数据等操作
///////////////////////////////////////////////////
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
逻辑也挺简单。
- 从代码流程来看,没有过多的逻辑包装,直接把入参交给了另外一个重载的 send 方法。
- 从方法的入参和返回值来看,入参是请求对象,没有返参,并且方法上声明了 RemotingException 异常,说明这里离数据的发送应该很近了。至于没有返回值,主要是前面由同步转了异步,没有返回值也正常。
- 再看看方法实现体的一些细节,抽象类看不出什么,但是从堆栈变量的引用值可以发现,当前的抽象类是 NettyClient 的父类。
那我们从 NettyClient 的 send 方法细看,结果发现了最终调用了 NioSocketChannel 的 writeAndFlush 方法,这个不就是 Netty 网络通信框架的 API 么?
到这里,我们在代码层面解释了数据原来是通过 Netty 框架的 NioSocketChannel 发送出去的。
10. NettyCodecAdapter
一旦进入到 Netty 框架,再通过断点一步步跟踪数据就有点难了,因为 Netty 框架内部处处都是“线程+队列”的异步操作方式,这里我们走个捷径,进入 NettyClient 类,找找初始化 NettyClient 的相关 Netty 层面的配置。
你会找到一个 NettyCodecAdapter 类,对数据进行编解码,直接在类中的 encode 方法打个断点,等断点过来。
///////////////////////////////////////////////////
// org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder#encode
// 将对象按照一定的序列化方式发送出去
///////////////////////////////////////////////////
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
codec.encode(channel, buffer, msg);
}
}
可以看到,编码的方法简单干脆。
- 从代码流程看,调用了一个 codec 编码器的变量,对入参编码处理。
- 从方法的入参和返回值看,msg 是请求数据对象,out 变量是 ByteBuf 缓冲区,应该是将 msg 编码之后的数据流。
- 再看看方法实现体的一些细节,没什么特别之处,无非就是走后续编解码器的编码方法,编码完成后,再通过 Netty 框架把数据流发送出去。
调用流程的其他案例
经过漫长的十步断点,我们把消费方的调用流程大致串了起来,虽说过程有点曲折,但最终的艰辛是值得的,对于你理解框架调用的来龙去脉很重要,而且未来你会体会到,当调用环节发生了异常,你只要认真回忆一下异常在哪个环节,然后结合代码简单分析一下,很快就能分析出原因来。
消费方的调用流程就是这么一步步调试分析出来的,那还有哪些典型的流程,可以按这个思路一步步调试分析出调用流程呢?我就列举三个比较典型的,你课后可以练练手。
第一,Tomcat 容器接收请求流程,通过在 HttpServlet 的 service 方法打个断点,从调用堆栈的最下面开始,可以分析出 Tomcat 的整体架构。
第二,SpringMvc 处理请求的流程,通过在 Controller 的任意方法打个断点,就可以逐步分析出一个 SpringMvc 处理请求的大致流程框架。
第三,Spring 的 getBean 方法,通过这个方法的层层深入,可以分析出一个庞大的 Spring 对象生成体系,也能挖掘出非常多 Spring 各种操控对象的扩展机制。
总结
今天,从一个被 @DubboReference 标识的常规成员变量开始,我们抛出了两个问题,变量的值类型在运行时是否是见到的类型,在消费方发起调用时会经过怎样的调用流程体系。
带着这两个问题,我们用最普通的 Debug 调试方式,借鉴 12 字方针深入跟踪调用流程的源码,边分析、边绘图,步步为营,最终梳理出了消费方调用流程的大体框架。你还可以和“源码框架”中学过的分层模块联系到一起复习。
总结一下跟踪源码的 12 字方针。
- “不钻细节:只看流程。”代码虽然多,但我们不必研究每个细节,要先捡方法中的重要分支流程,一些提前返回的边边角角的流程一概忽略不看。
- “不看过程:只看结论。”每个方法的代码逻辑可长可短,我们可以重点研究这个方法需要什么入参,又能给出什么返参,以此推测方法在干什么,到底要完成一件什么样的事情,搞清楚并得出结论。
- “再看细节:再看过程。”当你按照前两点认真调试后,大概的调用流程体系就清楚了。在此基础之上,再来细看被遗忘的边角料代码,有助于你进一步丰富调用流程图,体会源码细节中那些缜密的思维逻辑。
思考题
留个作业,消费方的普通调用会经历哪些流程你已经十分熟悉了,可以研究消费方进行泛化调用时会经历哪些流程,以及泛化调用的底层是怎么实现的?动手练习一下。
期待看到你的回答,如果觉得今天的内容对你有帮助,也欢迎分享给身边的朋友一起讨论。我们下一讲见。
20 思考题参考
上一期留了个作业,研究下消费方接收 zookeeper 服务端事件变更的地方在哪里。
想要解答这个问题,我们得从添加监听的起源看起,只有知道添加监听干了些什么事情,预留了什么回调,我们才能进一步找到接收的地方。
我们直接进入 doSubscribe 方法中看看,到底添加了什么监听?监听对应的实现类是哪个?
///////////////////////////////////////////////////
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
// 订阅的核心逻辑,读取 zk 目录下的数据,然后通知刷新内存中的数据
///////////////////////////////////////////////////
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
checkDestroyed();
// 因为这里用 * 号匹配,我们在真实的线上环境也不可能将服务接口配置为 * 号
// 因此这里的 * 号逻辑暂且跳过,直接看后面的具体接口的逻辑
if ("*".equals(url.getServiceInterface())) {
// 省略其他部分代码...
}
// 能来到这里,说明 ServiceInterface 不是 * 号
// url.getServiceInterface() = com.hmilyylimh.cloud.facade.demo.DemoFacade
else {
CountDownLatch latch = new CountDownLatch(1);
try {
List<URL> urls = new ArrayList<>();
// toCategoriesPath(url) 得出来的集合有以下几种:
// 1、/dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/providers
// 2、/dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/configurators
// 3、/dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/routers
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
// 向 zk 创建持久化目录,一种容错方式,担心目录被谁意外的干掉了
zkClient.create(path, false);
// !!!!!!!!!!!!!!!!
// 这段逻辑很重要了,添加对 path 目录的监听,
// 添加监听完成后,还能拿到 path 路径下所有的信息
// 那就意味着监听一旦添加完成,那么就能立马获取到该 DemoFacade 接口到底有多少个提供方节点
List<String> children = zkClient.addChildListener(path, zkListener);
// 将返回的信息全部添加到 urls 集合中
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 从 zk 拿到了所有的信息后,然后调用 notify 方法
// url.get(0) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say®ister-mode=interface&release=3.0.7&side=provider
// url.get(1) = empty://192.168.100.183/com.hmilyylimh.cloud.facade.demo.DemoFacade?application=dubbo-20-subscribe-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=11560&qos.enable=false&release=3.0.7&side=consumer&sticky=false×tamp=1670846788876
// url.get(2) = empty://192.168.100.183/com.hmilyylimh.cloud.facade.demo.DemoFacade?application=dubbo-20-subscribe-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=11560&qos.enable=false&release=3.0.7&side=consumer&sticky=false×tamp=1670846788876
notify(url, listener, urls);
} finally {
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
再次回到了这段代码中,这次我们是有针对性地寻找监听器类了,眼尖的你,相信很快就看到了 RegistryChildListenerImpl 这个类,从名字上看,是子节点的监听实现类,既然能在这段实现全流程逻辑,那毫无疑问得进入 RegistryChildListenerImpl 这个类去看看。
///////////////////////////////////////////////////
// org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.RegistryChildListenerImpl
// 子节点目录监听变更类
///////////////////////////////////////////////////
private class RegistryChildListenerImpl implements ChildListener {
private RegistryNotifier notifier;
private long lastExecuteTime;
private volatile CountDownLatch latch;
public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
this.latch = latch;
notifier = new RegistryNotifier(getUrl(), ZookeeperRegistry.this.getDelay()) {
@Override
public void notify(Object rawAddresses) {
// 获取延时时间
long delayTime = getDelayTime();
// 若延时时间小于或等于 0 的话,就直接执行 RegistryNotifier 内部类的 doNotify 方法
if (delayTime <= 0) {
this.doNotify(rawAddresses);
} else {
// 否则就计算【当前时间】减去【上一次执行时间】得到一个差值
// 若 delayTime 大于这个差值的话,那就继续睡眠下
// 否则又立马执行 RegistryNotifier 内部类的 doNotify 方法
long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime);
if (interval > 0) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// ignore
}
}
lastExecuteTime = System.currentTimeMillis();
this.doNotify(rawAddresses);
}
}
@Override
protected void doNotify(Object rawAddresses) {
ZookeeperRegistry.this.notify(consumerUrl, listener, ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
}
};
}
// 设置计数器概念的同步工具类
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
// 感知到节点有变更,该方法是别其他地方回调的
@Override
public void childChanged(String path, List<String> children) {
// path = /dubbo/com.hmilyylimh.cloud.facade.demo.DemoFacade/providers
// children.get(0) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=8552®ister-mode=interface&release=3.0.7&side=provider×tamp=1670859852464
// children.get(1) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=10568®ister-mode=interface&release=3.0.7&side=provider×tamp=1670859874187
// children.get(2) = dubbo://192.168.100.183:28200/com.hmilyylimh.cloud.facade.demo.DemoFacade?anyhost=true&application=dubbo-20-subscribe-consumer&background=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.hmilyylimh.cloud.facade.demo.DemoFacade&methods=sayHello,say&pid=12988®ister-mode=interface&release=3.0.7&side=provider×tamp=1670859893317
try {
latch.await();
} catch (InterruptedException e) {
logger.warn("Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
}
// 节点变更通知,直接调用的是上面 RegistryNotifier 内部类中的 notify 方法
notifier.notify(children);
}
}
通过这段代码我们看到了若 childChanged 方法发生了回调,那么最终会调用到 ZookeeperRegistry 的 notify 方法。虽然我们找到了 Dubbo 针对 zookeeper 目录监听回调的地方,但是到底是谁调用了这个 childChanged 方法呢?
这个时候,又得使出我们的 Debug 绝招了,你可以按步骤操作。
- 步骤一,启动提供方。
- 步骤二,启动消费方。
- 步骤三,在 RegistryChildListenerImpl 类中的 childChanged 方法打个断点。
- 步骤四,关闭提供方。
- 步骤五,启动提供方。
就能看到断点的到来了。
通过对断点调用栈的分析,真相也就清晰了,原来 dubbo 框架引用了 curator 插件来与 zookeeper 进行交互,curator 插件会实时感知 zookeeper 服务端发送的事件变更,然后 curator 再通过一定的回调处理,就调用到了 RegistryChildListenerImpl 类中的 childChanged 方法。
- Lum 👍(2) 💬(2)
读完后感觉心态有点炸,有点纠结这一步步的invoker是怎么组装起来的,dubbo太复杂了。
2023-03-02 - Nights 👍(1) 💬(2)
InvokerInvocationHandler》MigrationInvoker》MockClusterInvoker》FailoverClusterInvoker》DubboInvoker 这个 invoker 调用链路是怎么看出来的?
2023-02-06 - 张小凡 👍(0) 💬(1)
dubbo invoker如何理解?
2023-03-07 - Wallace Pang 👍(0) 💬(1)
"不钻细节,只看流程;不看过程,只看结论;再看细节,再看过程" 16字方针
2023-02-23