扫一扫
关注公众号
注:公众号关于dubbo解读文章均基于apache-dubbo-incubating-2.7.1版本,发版于5月26号,此版本注册中心(多数是zookeeper)在某些特殊场景下会出现重复URL地址数据无法删除,导致消费方拿到的是失效地址,从而导致调用失败的问题。如果你也在使用此版本进行源码学习,在网络漂移(下班回家再调试源码)的情况下需要手动删除zookeeper的dubbo节点路径
服务引用示例
public class Application { /** * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before * launch the application */ public static void main(String[] args) { ReferenceConfig<DemoService> reference = new ReferenceConfig<>(); reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer")); reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); reference.setInterface(DemoService.class); DemoService service = reference.get(); String message = service.sayHello("dubbo"); System.out.println(message); } }
这里得到的DemoService是什么?将下来一层一层地掀开迷底
ReferenceConfig#get=>ReferenceConfig#init,在init方法中会执行ref = createProxy(map);
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { //TODO 本地引用inJvm if (shouldJvmRefer(map)) { invoker = refprotocol.refer(interfaceClass, url); } else { //TODO urls为服务引用的接口信息,URL是dubbo中的统一数据模型 if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); } if (registryURL != null) { invoker = cluster.join(new StaticDirectory(u, invokers)); } else { //TODO 直连 //TODO 适用于测试环境或者注册中心不可用时需要发布的情况 invoker = cluster.join(new StaticDirectory(invokers)); } } } //TODO 创建服务代理 return (T) proxyFactory.getProxy(invoker); }
直连中的StaticDirectory的作用是本文第一个重点关注对象,实际上集群目录服务实现父抽象类AbstractDirectory的doList模板方法,会返回经过路由过滤后的Invoker列表,路由过滤也就是服务路由,常用于设置分组调用、同机房调用优先、灰度分布、流量切换、读写分离等。另外还有RegistryDirectory、MockDirectory目录服务,可以不加思索地猜想RegistryDirectory会动态维护Invoker列表,StaticDirector则是直接返回
Protocol中的refer方法是被@Adaptive注解修饰的,说明它是一个自适应扩展点,自适应扩展点加在方法层面上,表示会动态生成一个自适应的适配器,比如这里的DubboProtocol$Adaptive,并且默认实现是”dubbo”,最终实现可以通过在URL指定
/** * Protocol. (API/SPI, Singleton, ThreadSafe) */ @SPI("dubbo") public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; void destroy(); }
refprotocol#refer先后经过filter包装类ProtocolFilterWrapper、ProtocolListenerWrapper最后执行RegistryProtocol。这些包装类是在创建扩展器时,通过查找构造方法的参数类型获取Wrapper类,然后将自身注入,前者ProtocolFilterWrapper负责过滤器,Dubbo允许我们在provider端设置权限校验、缓存、限流等等一些Filter过滤器,也可以在consumer端设置一些Filter,这是一种责任链模式;后者ProtocolListenerWrapper负责监听器,Dubbo允许consumer端在调用之前、调用之后或出现异常时,触发oninvoke、onreturn、onthrow三个事件
private T createExtension(String name) { try { Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (CollectionUtils.isNotEmpty(wrapperClasses)) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } }
RegistryProtocol#refer=>doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { //TODO 对多个invoker进行封装 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); //TODO 注册 registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); //TODO 订阅,监听变化 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //TODO 根据容错模式和负载均衡算法获取invoker Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
简单回顾下之前提到的服务注册,执行链大概是这样的,Registry根据URL通过registryFactory获取自适应适配类,最后会执行FailbackRegistry中的模块方法doRegister,真正的实现类是ZookeeperRegistry,创建路径节点,将url信息写入zookeeper中
/***************分隔线***********************/
接下来说下订阅监听zookeeper变化
RegistryDirectory#subscribe
public void subscribe(URL url) { setConsumerUrl(url); consumerConfigurationListener.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); }
FailbackRegistry#subscribe
@Override public void subscribe(URL url, NotifyListener listener) { try { doSubscribe(url, listener); } catch (Exception e) { List<URL> urls = getCacheUrls(url); if (CollectionUtils.isNotEmpty(urls)) { notify(url, listener, urls); } addFailedSubscribed(url, listener); } } //TODO 模板方法 public abstract void doSubscribe(URL url, NotifyListener listener);
ZookeeperRegistry#doSubscribe
@Override public void doSubscribe(final URL url, final NotifyListener listener) { List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); zkListener = listeners.get(listener); } //TODO 服务提供者dubbo/org.apache.dubbo.demo.DemoService/providers //TODO 服务配置dubbo/org.apache.dubbo.demo.DemoService/configurators /TODO 服务路由/dubbo/org.apache.dubbo.demo.DemoService/routers zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { //TODO 只有服务提供者才有children,也就是具体某个实例 urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); }
可以看到当有注册中心有服务列表更新的时候会执行通知,此接口接受三种类别的url,包括服务提供方provider、服务配置configurator、服务路由router。通知方法执行链大概是FailbackRegistry#notify=>AbstractRegistry#notify=>RegistryDirectory#notify
AbstractRegistry#notify
for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); //TODO 通知 listener.notify(categoryList); // We will update our cache file after each notification. // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL. //TODO 更新缓存 saveProperties(url); }
AbstractRegistry#saveProperties
File lockfile = new File(file.getAbsolutePath() + ".lock"); if (!lockfile.exists()) { lockfile.createNewFile(); } try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); FileChannel channel = raf.getChannel()) { FileLock lock = channel.tryLock(); if (lock == null) { throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); } // Save try { if (!file.exists()) { file.createNewFile(); } try (FileOutputStream outputFile = new FileOutputStream(file)) { properties.store(outputFile, "Dubbo Registry Cache"); } } finally { lock.release(); } }
平时我们更习惯使用输入流和输出流操作文件,将输入流的数据写入到输出流中,但是利用FileChannel会更加高效,它能直连输入和输出流的文件通道
/***************分隔线*************/
回到notify方法中,执行链RegistryDirectory#notify=>RegistryDirectory#refreshOverrideAndInvoker=>RegistryDirectory#refreshInvoker
@Override public URL getUrl() { return this.overrideDirectoryUrl; } private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); refreshInvoker(urls); } private void refreshInvoker(List<URL> invokerUrls) { Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; } private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); String key = url.toFullString(); // The parameter urls are sorted invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); newUrlInvokerMap.put(key, invoker); return newUrlInvokerMap; }
在refreshOverrideAndInvoker中会更新routerChain,也会更新overrideDirectoryUrl等,invoker#getUrl实际上是取的overrideDirectoryUrl,而扩展点适配器选择具体实现是根据URL来的。注:URL在Dubbo中是统一的数据模式
protocol#refer又用到了Dubbo SPI机制,执行链是Protocol$Adaptive#refer=>ProtocolListenerWrapper#refer=>ProtocolFilterWrapper#refer=>DubboProtocol#refer,其中在 DubboProtocol#refer方法中会构建DubboInvoker对象
DubboProtocol#refer
@Override public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
refer方法参数中还有一个很重要的DubboProtocol#getClient方法,应该是和Netty相关的,下一篇再讲解
/**************分隔线*************/
回到doRefer中的Cluster.join(directory),Cluster是一个集群容错接口,同时也是一个@Adaptive自适应扩展点,默认实现类是FailoverCluster.NAME。Dubbo主要内置了如下几种策略:失败自动切换Failover、安全失败Failsafe、快速失败Failfast、失败自动恢复Failback、并行调用Forking、广播Broadcast
@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
前面我们说过ExtensionLoader在实例化对象时,会将自己(这里是FailOverCluster)注入到包装类中,所以这里实际上是调用MockClusterWrapper#join,所以在ReferenceConfig#createProxy中的invoker = refprotocol.refer(interfaceClass, urls.get(0))中得到的invoker是一个MockClusterWrapper包装类。使用这种机制可以把一些公共处理放在Wrapper包装类中
/************************题外话-开始*************************/
这里插一句题外话,Dubbo是如何实现Forking调用?
//TODO 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作
// 但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
//TODO 使用阻塞队列
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
//TODO 如果没有一个成功那么所有都失败才会返回
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
//TODO 阻塞等待结果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
可以看到它是通过阻塞队列完成的,只要有一个成功就返回,隐性的含义是如果没有成本则所有失败才返回,返回最后一个异常结果,通常适用于实时性要求高的操作,但是资源成本高,所以一般通过设置forks参数限定最大并行数,我们也可以使用CompletionService达到相同的效果
// 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建 CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); // 用于保存 Future 对象 List<Future<Integer>> futures = new ArrayList<>(3); // 提交异步任务,并保存 future 到 futures futures.add( cs.submit(()->sayHello1())); futures.add( cs.submit(()->sayHello2())); futures.add( cs.submit(()->sayHeelo2())); // 获取最快返回的任务执行结果 Integer r = 0; try { // 只要有一个成功返回,则 break for (int i = 0; i < 3; ++i) { r = cs.take().get(); // 简单地通过判空来检查是否成功返回 if (r != null) { break; } } } finally { // 取消所有任务 for(Future<Integer> f : futures) f.cancel(true); } return r;
当需要批量提交异步任务的时候使用CompletionService。CompletionService将线程池 Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。嗯,作者就是这么照顾读者,看源码一定要学到东西,学到东西也会分享出来,所有快快关注“松花皮蛋的黑板报”一起涨见识吧!
/***********题外话-结束******************/
说回到消费端入口ReferenceConfig
private T createProxy(Map<String, String> map) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); return (T) proxyFactory.getProxy(invoker); }
前面我们讲清楚了invoker是一个引用了FailOverClusterInvoker的MockClusterInvoker,接下来看下getProxy执行链是
ProxyFactory$Adaptive#getProxy=>StubProxyFactoryWrapper#getProxy
=>AbstractProxyFactory#getProxy=>JavassistProxyFactory#getProxy
先看下包装类的方法,StubProxyFactoryWrapper#getProxy
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public <T> T getProxy(Invoker<T> invoker) throws RpcException { //TODO 这里的invoker是MockClusterInvoker //TODO proxyFactory是JavassistProxyFactory T proxy = proxyFactory.getProxy(invoker); //TODO 泛化调用入口 if (GenericService.class != invoker.getInterface()) { Class<?> stubClass = ReflectUtils.forName(stub); Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType); proxy = (T) constructor.newInstance(new Object[]{proxy}); } } return proxy; }
泛化调用主要用于消费端没有API接口的情况;不需要引入接口JAR包,而是直接通过GenericService接口来发起服务调用,参数及返回值中的所有POJO均用Map表示。泛化调用对于服务端无需关注,按正常服务进行暴露即可。以下几种场景可以考虑使用泛化调用:服务测试平台、API服务网关
再来看下JavassistProxyFactory类
public class JavassistProxyFactory extends AbstractProxyFactory { //TODO DUbbo中的URL数据模型中指定有proxyFactory实现,默认是javassist @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
可以看到getProxy得到的是一个InvokerInvocationHandler,所以在服务端调用RPC接口方法会调用到InvokerInvocationHandler#invoker,其中toString\hashCode\equals方法不走RPC调用。此时开头提出的问题终于有了答案,reference#get得到的是InvokerInvocationHandler对象
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } //TODO 以下方法走本地调用 if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } //TODO 调用MockClusterInvoker#invoker return invoker.invoke(createInvocation(method, args)).recreate(); }
MockClusterInvoker#invoker
@Override public Result invoke(Invocation invocation) throws RpcException { Result result = null; //TODO URL中的mock值,默认是不开启 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { //TODO 不走mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { //TODO 不管是否失败都直接走mock result = doMockInvoke(invocation, null); } else { //TODO 失败时调用mock try { result = this.invoker.invoke(invocation); } catch (RpcException e) { result = doMockInvoke(invocation, e); } } return result; }
有了mock,再也不用担心联调慢和上游服务全部宕机
然后执行链会走到AbstractClusterInvoker#invoke
@Override public Result invoke(final Invocation invocation) throws RpcException { //TODO 从directory获取到invokerList List<Invoker<T>> invokers = list(invocation); //TODO 通过SPI扩展实例化LoadBalance,默认是随机选取 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //TODO FailoverClusterInvoker return doInvoke(invocation, invokers, loadbalance); }
而list的执行链是RegistryDirectory#doList=>RouterChain#route,之前在分析完成注册中心后订阅变更事件,然后会刷新Directory中的RouterChain,这里也验证了开头描述的RegistryDirectory是动态维护的Invoker目录服务
而doInvoke会将loadbalance实例传到FailoverClusterInvoker#doInvoke
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { //TODO 获取invoker Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); //TODO 执行 Result result = invoker.invoke(invocation); }
然后执行链是FailoverClusterInvoker#doInvoke=>InvokerWrapper#invoke=>ListenerInvokerWrapper#invoke=>ProtocolFilterWrapper#invoke=>AbstractInvoker#invoke=>DubboInvoker,其中在ProtocolFilterWrapper$1中会执行定义的各种过滤链filter
DubboInvoker#doInvoke,异步调用、回调调用,同时这也是与高性能NIO通信框架Netty交互的入口
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; ExchangeClient currentClient;//TODO 之前创建的连接 if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { if (isOneway) {//TODO 单向 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // TODO 将RpcInvocation放到Netty处理流程中 currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) {//TODO 异步 //TODO 将RpcInvocation放到Netty处理流程中 ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) {//TODO 回调 result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { } catch (RemotingException e) { } }
大概引用流程就是这样,当然不同的配置执行链流程会有区别,但是大体方向是相似的,希望这篇文章能帮到你,欢迎关注