3.2 Dubbo框架的原理分析
在工程师的世界里,概念、原理都是抽象的,唯有源码是具象的。阅读更多的优秀的源码,我们的想象力才会更加具象,理解才会更加“干净”。
在使用Dubbo的时候,如果希望能够深入掌握,那么了解其原理和关键代码是非常必要的,做到知其然且知其所以然,不仅能让我们了解内部运行机制,而且在使用的过程中遇到问题时,我们还可以自行维护和修改。
3.2.1 总体架构分析
在深入分析Dubbo源码之前,先看一下Dubbo源码的包结构及各部分的作用,如图3-3所示。
图3-3
具体说明如下。
● dubbo-admin:Dubbo自带的控制台管理,用于服务治理和服务监控。
● dubbo-cluster:集群模块,将多个服务提供方伪装为一个提供方,包括负载均衡、容错、路由等,集群的地址列表可以是静态配置的,也可以由注册中心下发。
● dubbo-common:公共逻辑模块,包括Util类和通用模型。
● dubbo-config:配置模块,是Dubbo对外的API,用户通过Config使用Dubbo,隐藏Dubbo所有细节。
● dubbo-container:容器模块,是一个Standalone的容器,以简单的Main加载Spring的启动,因为服务通常不需要Tomcat/JBoss等Web容器的特性,所以没必要用Web容器去加载服务。
● dubbo-filter:主要针对dubbo-rpc里面的Filter进行缓存和校验。
● dubbo-monitor:监控模块,统计服务的调用次数、调用时间等。
● dubbo-registry:注册中心模块,基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
● dubbo-remoting:远程通信模块,包括Netty、Mina等多种通信方式。
● dubbo-rpc:远程调用模块,抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
了解完包结构后,参考Dubbo官网提供的分层架构图可以看到,整个Dubbo体系共分为十层,如图3-4所示。
图3-4
各层主要的功能说明如下。
● Service层:这一层和业务实现相结合,根据具体业务设计服务提供者和消费者的实现类和接口类。
● Config层:配置信息层,由Spring解析服务提供者和消费者的配置信息,然后封装到ServiceConfig和ReferenceConfig中。
● Proxy层:服务代理层,这一层主要是结合SPI机制,动态选取不同的配置类。
● Registry层:服务注册层,主要负责注册与发现Dubbo服务,以及对Dubbo服务的监听。
● Cluster层:服务集群层,负责服务的路由、负载及失败重试策略。
● Protocol层:在这层会进行相关协议的转换与过滤。
● Exchange层:封装请求响应模式,同步转异步。
● Transport层:网络传输层,抽象Netty、Mina为统一接口,在这一层进行真正的数据传输。
● Serialize层:序列化层,根据不同的协议对数据进行序列化。
从Dubbo的整体分层架构图中可以看到,Dubbo项目还是比较复杂的,涉及非常多的知识点,包括Spring相关知识点、各种设计模式的组合使用、Java网络编程相关知识(Netty、Mina、NIO)、RPC机制,还包括序列化、SPI、ClassLoader等,下面将逐一介绍各知识点在Dubbo中的运用。
3.2.2 Dubbo Bean的加载
作为Dubbo的使用者,最先接触的就是Dubbo的配置文件,在配置文件中配置注册中心、服务提供者、服务消费者等,而我们所使用的Spring标签则是Dubbo自定义的标签,比如dubbo:service/等。Spring解析这些自定义标签并将信息封装在Dubbo的Config类中。
1. Spring自定义标签的使用
在Spring中完成一个自定义标签需要如下步骤:
● 设计配置属性和JavaBean;
● 编写XSD文件;
● 编写BeanDefinitionParser标签解析类;
● 编写调用标签解析类的NamespaceHandler类;
● 编写spring.handlers和spring.schemas以供Spring读取;
● 在Spring中使用。
下面结合一个小例子来说明自定义标签的全过程。
(1)设计配置属性和JavaBean。
建立一个与自定义标签相对应的配置JavaBean User对象:
public class User { private String userName; private String email; public String getUserName(){ return userName; } public void setUserName(String userName){ this.userName = userName; } public String getEmail(){ return email; } public void setEmail(String email){ this.email = email; } }
(2)编写XSD文件,用来检验我们自定义标签的有效性。
<? xml version="1.0" encoding="UTF-8"? > <schema xmlns="http://www.w3.org/2001/XMLSchema" targetNamespace="http://www. superpay.com/schema/user" xmlns:tns="http://www.superpay.com/schema/user" elementFormDefault="qualified"> <element name="user"> <complexType> <attribute name="id" type="string" /> <attribute name="userName" type="string" /> <attribute name="email" type="string" /> </complexType> </element> </schema>
(3)编写BeanDefinitionParser标签解析类。
public class UserDefinitionParser extends AbstractSingleBeanDefinitionParser { protected Class getBeanClass(Element element){ return User.class; } protected void doParse(Element element, BeanDefinitionBuilder bean){ String userName = element.getAttribute("userName"); String email = element.getAttribute("email"); if(StringUtils.hasText(userName)){ bean.addPropertyValue("userName", userName); } if(StringUtils.hasText(email)){ bean.addPropertyValue("email", email); } } }
(4)编写调用标签解析类的NamespaceHandler类。
NamespaceHandler会根据schema和节点名找到UserDefinitionParser,然后由UserDefinition-Parser完成具体的解析工作:
public class CustomNamespaceHandler extends NamespaceHandlerSupport { public void init(){ registerBeanDefinitionParser("user", new UserDefinitionParser()); } }
其中registerBeanDefinitionParser("user", new UserDefinitionParser())用来把节点名和解析类联系起来,在配置中引用User配置项时,就会用UserDefinitionParser来解析配置。
(5)编写spring.handlers和spring.schemas以供Spring读取。
要实现自定义的标签配置,就需要在META-INF下由两个默认Spring配置文件来提供支持。一个是spring.schemas,另一个是spring.handlers,前者是为了验证自定义的XML配置文件是否符合格式要求,后者是告诉Spring该如何解析自定义的配置文件。
spring.handlers: http\://[www.superpay.com/schema/user=com.superpay.config.CustomNamespaceHandl er](http://www.superpay.com/schema/user=com.superpay.config.CustomNamespaceHandler) spring.schemas: http\://www.superpay.com/schema/user.xsd=META-INF/user.xsd
(6)在Spring中使用。
前面五个步骤完成了自定义标签的简单配置,使用方法和定义普通Spring Bean一样,只不过需要在XML中引入自定义的Scheme,如下所示。
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:custorm="www.superpay.com/schema/user"xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.x sd www.superpay.com/schema/user http://www.superpay.com/schema/user.xsd"> <custorm:user id="user" name="张三" email="zs@test.com"/> </beans>
2. Dubbo解析配置文件及加载Bean
前面介绍了在Spring中如何自定义XML标签,下面介绍在Dubbo中如何解析自定义XML标签,以及如何加载这些配置信息。
参考图3-3, Dubbo在启动的时候会从dubbo-container/dubbo-container-spring包中的SpringContainer类开始,这个类主要负责启动Spring的上下文,加载解析Spring的配置文件,同时从META-INF中加载spring.handlers和spring.schemas这两个文件,先看一下spring.handlers文件的内容:
http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.sche ma.DubboNamespaceHandler
Dubbo自定义标签的解析都是在DubboNamespaceHandler中定义的,源码如下所示。
public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init(){ registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); registerBeanDefinitionParser("mockSystemUrl", new DubboBeanDefinitionParser(MockSystemUrlConfig.class, true)); } }
从这段源码中可以看到Dubbo的自定义标签共有10个,最后一个mockSystemUrl是对Dubbo新增的Mock测试系统的配置,这个功能后面会重点介绍。所有Dubbo的标签都统一用DubboBeanDefinitionParser进行解析,基于一对一属性映射,将XML标签解析为Bean对象。在DubboBeanDefinitionParser解析类中最关键的是parse方法,由于方法的核心代码较多,所以将有注释的方法代码放到了本书的官网上,感兴趣的读者可以下载相应资源文件查看。
3.2.3 Dubbo Extension机制
Dubbo的架构体系采用的是“微核+插件”,这样做使整个架构的扩展性更强,可以在不修改核心代码的情况下进行新增插件的添加,而这个体系中最核心的机制是采用了SPI,为接口寻找服务实现的机制,这个机制与Spring中的IoC思想有些类似,将程序中接口与实现的强关联关系变成可插拔关系。
1. Java SPI
SPI全称为Service Provider Interface,是JDK内置的一种服务提供发现功能,一种动态替换发现的机制。举个例子,要想在运行时动态地给一个接口添加实现,只需要添加一个实现即可。
下面通过一个完整例子来说明如何使用JDK的SPI实现服务发现和动态替换,图3-5展示了使用SPI时需要遵循的规范。
图3-5
例子的工程项目结构如图3-6所示。
图3-6
具体说明如下。
● 接口类是HelloInterface,两个实现类分别是ImageHello和TextHello。
● 在META-INF目录下建立扩展文件,以接口HelloInterface全路径名命名。
接口HelloInterface的代码如下所示。
public interface HelloInterface { public void sayHello(); }
两个实现类ImageHello和TextHello的代码如下所示。
public class ImageHello implements HelloInterface { @Override public void sayHello(){ System.out.println("Image hello! "); } } public class TextHello implements HelloInterface { @Override public void sayHello(){ System.out.println("Text hello! "); } }
META-INF/services/下的com.spiexample.HelloInterface文件内容如下:
com.spiexample.impl.ImageHello com.spiexample.impl.TextHello
最后通过SPIMain对象测试整个过程是否正确,代码如下:
public class SPIMain { public static void main(String[] args){ ServiceLoader<HelloInterface> loaders = ServiceLoader.load(HelloInterface.class); if(loaders ! = null){ for(HelloInterface helloInterface : loaders){ helloInterface.sayHello(); } } } }
执行结果打印内容如下:
Image hello! Text hello!
2. Dubbo在SPI上的具体实现
Dubbo的扩展机制和Java的SPI机制非常相似,但增加了如下功能:
● 可以方便地获取某一个想要的扩展实现;
● 对于扩展实现IoC依赖注入功能。
举例来说:
接口A,实现者A1、A2
接口B,实现者B1、B2
现在实现者A1含有setB()方法,会自动注入一个接口B的实现者,此时注入B1还是B2呢?都不是,而是注入一个动态生成的接口B的实现者B$Adpative,该实现者能够根据参数的不同,自动引用B1或B2来完成相应的功能。
通过Dubbo的Protocol接口的SPI实现,我们来分析完整的Dubbo扩展点加载过程。
1)扩展点配置
Protocol接口的代码如下:
@SPI("dubbo") public interface Protocol { /** * 获取默认端口,当用户没有配置端口时使用 * * @return默认端口 */ int getDefaultPort(); /** * 暴露远程服务:<br> * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress() * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别 * 3. export()传入的Invoker由框架实现并传入,协议不需要关心 * * @param <T> 服务的类型 * @param invoker服务的执行体 * @return exporter暴露服务的引用,用于取消暴露 * @throws RpcException当暴露服务出错时抛出,比如端口已占用 */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker)throws RpcException; /** * 引用远程服务 * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法 * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求 * 3. 当URL中设置check=false时,连接失败不能抛出异常,并内部自动恢复 * * @param <T> 服务的类型 * @param type服务的类型 * @param url远程服务的URL地址 * @return invoker服务的本地代理 * @throws RpcException当连接服务提供方失败时抛出 */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url)throws RpcException; /** * 释放协议 * 1. 取消该协议所有已经暴露和引用的服务 * 2. 释放协议占用的所有资源,比如连接和端口 * 3. 协议在释放后,依然能暴露和引用新的服务 */ void destroy(); }
在上述代码中有两个非常重要的注解,分别是@SPI和@Adaptive。
● @SPI
定义默认实现类,比如@SPI(“dubbo”)默认调用的是DubboProtocol类。
● @Adaptive
该注解一般使用在方法上,代表自动生成和编译一个动态的Adpative类,它主要用于SPI,因为SPI的类是不固定的、未知的扩展类,所以设计了动态$Adaptive类。如果该注解使用在类上,则代表实现一个装饰模式的类。例如,Protocol的SPI类有injvm、dubbo、registry、filter和listener等很多扩展未知类,它设计了Protocol$Adaptive的类,通过ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(SPI类)来提取对象。
Protocol的扩展点文件在dubbo-rpc子模块的dubbo-rpc-api包中,如图3-7所示。
图3-7
而实际Dubbo在启动加载的时候会依次从以下目录中读取配置文件:
META-INF/dubbo/internal/ //Dubbo内部实现的各种扩展都放在这个目录中 META-INF/dubbo/ META-INF/services/
2)扩展点加载
Dubbo的扩展点主要是通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptive-Extension()方法进行加载的,每个定义的SPI接口都会产生一个ExtensionLoader实例,保存在一个名为EXTENSION_LOADERS的ConcurrentMap中,下面通过ExtensionLoadder.getExtension-Loader()方法逐步展开整个加载的全过程。
(1)ExtensionLoader.getExtensionLoader(Protocol.class)方法。
方法代码如下:
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type){ if(type == null) throw new IllegalArgumentException("Extension type == null"); if(! type.isInterface()){ throw new IllegalArgumentException("Extension type(" +type + ")is not interface! "); } //只接受使用@SPI注解注释的接口类型 if(! withExtensionAnnotation(type)){ throw new IllegalArgumentException("Extension type(" + type + ")is not extension, because WITHOUT @" + SPI.class.getSimpleName()+ " Annotation! "); } //先从静态缓存中获取对应的ExtensionLoader实例 ExtensionLoader<T> loader =(ExtensionLoader<T>)EXTENSION_LOADERS.get(type); //如果从EXTENSION_LOADERS获取的实例为null,则直接产生一个新的实例并存放到 //EXTENSION_LOADERS中 if(loader == null){ EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader =(ExtensionLoader<T>)EXTENSION_LOADERS.get(type); } return loader; }
● EXTENSION_LOADERS实例是一个ConcurrentMap实例,key是方法传过来的SPI接口类,value是ExtensionLoader实例类。示例代码如下所示。
ConcurrentMap<Class<? >, ExtensionLoader<? >> EXTENSION_LOADERS = new ConcurrentHashMap<Class<? >, ExtensionLoader<? >>();
● 如果从EXTENSION_LOADERS获取的实例为null,则直接产生一个新的实例并存放到EXTENSION_LOADERS中。
● 从getExtensionLoader中返回的是ExtensionLoader实例。
(2)ExtensionLoader getAdaptiveExtension()方法。
Dubbo中的扩展点都有多个实现,而框架设计原则又让我们针对接口编程而不是实现,这就需要在运行期才能决定具体使用哪个扩展实现类。Dubbo提供的Adpative注解,让我们自行决定究竟是自己提供扩展的适配还是由Dubbo来帮我们生成动态适配。
方法代码如下:
public T getAdaptiveExtension(){ Object instance = cachedAdaptiveInstance.get(); //从Adaptive缓存中获取实例对象 if(instance == null){ if(createAdaptiveInstanceError == null){ //采用双重检查锁保证一致性 synchronized(cachedAdaptiveInstance){ instance = cachedAdaptiveInstance.get(); if(instance == null){ try { //如果获取的缓存对象为null,则通过createAdaptiveExtension //方法创建一个并加入缓存中 instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch(Throwable t){ createAdaptiveInstanceError = t; throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("fail to create adaptive instance:" + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return(T)instance; }
从cachedAdaptiveInstance缓存中获取实例对象,如果为null,则通过方法createAdaptive-Extension创建对象并加入缓存。
createAdaptiveExtension方法代码如下:
private T createAdaptiveExtension(){ try { return injectExtension((T)getAdaptiveExtensionClass().newInstance()); } catch(Exception e){ throw new IllegalStateException("Can not create adaptive extenstion "+ type + ", cause: " + e.getMessage(), e); } }
在这个方法中共有两个过程,getAdaptiveExtensionClass获取Adaptive自适应扩展,injectExtension是为扩展对象注入其他依赖的实现。
先来看第一个过程getAdaptiveExtensionClass()方法的内部实现,代码如下:
private Class<? > getAdaptiveExtensionClass(){ getExtensionClasses(); if(cachedAdaptiveClass ! = null){ return cachedAdaptiveClass; } //如果自适应扩展为null,则调用createAdaptiveExtensionClass()方法创建 return cachedAdaptiveClass = createAdaptiveExtensionClass(); } private Class<? > createAdaptiveExtensionClass(){ String code = createAdaptiveExtensionClassCode(); ClassLoader classLoader = findClassLoader(); //动态生成编译类 com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExt ension(); return compiler.compile(code, classLoader); }
Compiler类是SPI接口类,通过ExtensionLoader进行加载,文件目录如图3-8所示。
图3-8
com.alibaba.dubbo.common.compiler.Compiler的内容如下:
adaptive=com.alibaba.dubbo.common.compiler.support.AdaptiveCompiler jdk=com.alibaba.dubbo.common.compiler.support.JdkCompiler javassist=com.alibaba.dubbo.common.compiler.support.JavassistCompiler
类继承结构如图3-9所示。
图3-9
这三个Compiler使用JavassistCompiler作为当前激活的Compiler类,但在AdaptiveCompiler的类定义上面有一个@Adaptive注解,表示是一个装饰模式的类,于是整个过程是:AdaptiveCompiler→JavassistCompiler。AdaptiveCompiler起装饰作用,在里面获取当前激活的JavassistCompiler类,然后执行compile方法产生默认的自适应扩展类。
自适应扩展类并不是一个真正的Java类实现,而是利用Javassist动态地生成代码,也就是手动拼装的代码,这段代码里会根据SPI上配置的信息加入对应的功能实现类,Javassist生成的动态代码如下所示。
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { public void destroy(){ throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy()of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method! "); } public int getDefaultPort(){ throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method! "); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1)throws java.lang.Class { if(arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; //这里会根据URL中的信息获取具体的实现类名 String extName =(url.getProtocol()== null ? "dubbo" : url.getProtocol()); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol)name from url(" + url.toString()+ ")use keys([protocol])"); //根据上面的实现类名,在运行时通过Dubbo的扩展机制加载具体实现类 com.alibaba.dubbo.rpc.Protocol extension =(com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).g etExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0)throws com.alibaba.dubbo.rpc.Invoker { if(arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument== null"); if(arg0.getUrl()== null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl()== null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); //这里会根据URL中的信息获取具体的实现类名 String extName =(url.getProtocol()== null ? "dubbo" : url.getProtocol()); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol)name from url(" + url.toString()+ ")use keys([protocol])"); //根据上面的实现类名,在运行时通过Dubbo的扩展机制加载具体实现类 com.alibaba.dubbo.rpc.Protocol extension =(com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).g etExtension(extName); return extension.export(arg0); } }
在前面扩展点加载的介绍中提到了这行代码:Protocol refprotocol = ExtensionLoader. getExtensionLoader(Protocol.class).getAdaptiveExtension()。在程序执行前我们并不知道Protocol接口要加载的是哪个实现类,Dubbo通过getAdaptiveExtension方法利用默认的JavassistCompiler生成了上述Protocol Adpative类,可以看到在Protocol接口类中所有被加了@Adaptive注解的方法都有了具体的实现,整个过程类似于Java的动态代理或Spring的AOP实现。当我们使用refprotocol对象调用方法时,其实是调用Protocol Adaptive类中对应的代理方法,根据URL的参数找到具体实现类名称,然后通过ExtensionLoader对象的getExtension方法找到具体实现类进行方法调用。
我们再回到createAdaptiveExtension方法中,在getAdaptiveExtensionClass方法返回Protocol$Adpative类后,调用injectExtension方法为扩展对象入注入其他依赖的实现,方法代码如下:
//参数instance就是上面说的Protocol$Adaptive实例 private T injectExtension(T instance){ try { //objectFactory是AdaptiveExtensionFactory if(objectFactory ! = null){ //遍历扩展实现类实例的方法 for(Method method : instance.getClass().getMethods()){ //只处理以set开头的public方法并且参数只能是一个 if(method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())){ //获取方法的参数类型 Class<? > pt = method.getParameterTypes()[0]; try { //通过截取set方法名获取属性名 String property = method.getName().length()> 3 ? method.getName().substring(3, 4).toLowerCase()+ method.getName().substring(4): ""; /** 根据参数类型和属性名称从ExtensionFactory中获取其他扩 展点的实现类 如果有,则调用set方法新注入一个自适应实现类;如果没有, 则返回Protocol$Adaptive **/ Object object = objectFactory.getExtension(pt, property); if(object ! = null){ //为set方法注入一个自适应的实现类 method.invoke(instance, object); } } catch(Exception e){ logger.error("fail to inject via method " + method.getName() + " of interface " + type.getName()+ ": " +e.getMessage(), e); } } } } } catch(Exception e){ logger.error(e.getMessage(), e); } return instance; }
我们在ExtensionLoader类中看到三个以Class结尾的属性类,分别是:
● Class<? > cachedAdaptiveClass——如果扩展类Class含有Adaptive注解,则将这个Class设置为Class<? > cachedAdaptiveClass。
● Set> cachedWrapperClasses——如果扩展类Class含有带参数的构造器,则说明这个Class是一个装饰类,需要存到Set> cachedWrapperClasses中。
● Reference>> cachedClasses——如果扩展类Class没有带参数的构造器,则获取Class上的Extension注解,将该注解定义的name作为key,存至Reference>> cachedClasses结构中。
3)ExtensionLoader获取扩展点的过程
以一个简单的扩展类加载代码为例:
ExtensionLoader<Protocol> protocolLoader = ExtensionLoader.getExtensionLoader(Protocol.class); Protocol registryProtocol = protocolLoader.getExtension("registry");
通过代码可以看到,我们实际上是想获得RegistryProtocol类,所以在getExtension中传入的name值是registry,但在实际过程中,会把RegistryProtocol放到一个调用链中,在它前面会有几个Wrapper类,比如ProtocolFilterWrapper类和ProtocolListenerWrapper类,代码如下:
private T createExtension(String name){ Class<? > clazz = getExtensionClasses().get(name); if(clazz == null){ throw findException(name); } try { T instance =(T)EXTENSION_INSTANCES.get(clazz); if(instance == null){ EXTENSION_INSTANCES.putIfAbsent(clazz,(T)clazz.newInstance()); instance =(T)EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); Set<Class<? >> wrapperClasses = cachedWrapperClasses; if(wrapperClasses ! = null && wrapperClasses.size()> 0){ //关键代码,将instance类通过构造方法注入Wrapper类,形成调用链 for(Class<? > wrapperClass : wrapperClasses){ instance = injectExtension((T)wrapperClass.getConstructor(type). newInstance(instance)); } } return instance; } catch(Throwable t){ throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
● 根据name获取对应的class。
这里获取了RegistryProtocol.class。
● 根据获取的class创建一个实例。
使用newInstance生成一个实例,并将实例加载到EXTENSION_INSTANCES缓存中。
● 对获取的实例进行依赖注入。
使用injectExtension方法对实例进行依赖注入。
● 对上述经过依赖注入的实例再次进行包装。
遍历Set> cachedWrapperClasses中每一个包装类,将上述获取的class实例以构造参数的方式注入,形成调用链。
3.2.4 Dubbo消费端
我们在调用远程服务时本身是无感知的,就像在本地调用方法一样,那么内部究竟是如何实现的呢?本节将继续从源码角度和大家一起探讨。
1.创建代理类
消费端的核心类自然是ReferenceBean,这个类是在Spring解析Dubbo的reference自定义标签时,在DubboNamespaceHandler类中进行加载的。Spring配置文件示例如下:
<dubbo:reference interface="com.tradecenter.facade.PayTradeFacade" timeout="2000"/>
ReferenceBean类的内容非常丰富,逻辑也较为复杂,但抽丝剥茧后,最主要的功能有三个,如图3-10所示,分别是配置初始化、服务订阅和创建代理对象。
图3-10
下面我们将以这三个大的功能为主线介绍ReferenceBean的实现原理,图3-11显示了ReferenceBean的类继承结构。
图3-11
从图3-11中看到,ReferenceBean继承了ReferenceConfig类,实现了FactoryBean、InitializingBean、DisposableBean和ApplicationContextAware接口。FactoryBean接口主要是通过getObject方法返回对远程服务调用的代理类实现的。InitializingBean接口为Bean提供了初始化方式,包括afterPropertiesSet方法,在初始化Bean的时候都会执行。DisposableBean接口提供了destroy方法,在Bean销毁的时候能够回调执行。而实现ApplicationContextAware接口就可以得到ApplicationContext中的所有Bean。
Dubbo核心类ReferenceConfig继承了AbstractReferenceConfig、AbstractInterfaceConfig、AbstractMethodConfig和AbstractConfig类,各类的说明如下。
● AbstractConfig:配置解析的工具方法和公共方法。
● AbstractMethodConfig:封装了配置文件标签中方法级别的相关属性。
● AbstractInterfaceConfig:封装了配置文件标签中接口级别的相关属性。
● AbstractReferenceConfig:封装了引用实例的默认配置,比如检查服务实例是否存在,是否使用泛化接口、版本号等。
● ReferenceConfig:封装了全局配置,包括接口名、方法配置、默认配置等。
1)配置初始化
从ReferenceConfig的afterPropertiesSet方法入手,先看如下源码:
//如果consumer未注册,则执行下面的内容 if(getConsumer()== null){ //根据ConsumerConfig.class类型从ApplicationContext中获取实例 Map<String, ConsumerConfig> consumerConfigMap = applicationContext ==null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,ConsumerConfig.class, false, false); if(consumerConfigMap ! = null && consumerConfigMap.size()> 0){ ConsumerConfig consumerConfig = null; //遍历ConsumerConfig for(ConsumerConfig config : consumerConfigMap.values()){ if(config.isDefault()== null || config.isDefault().booleanValue()){ //如果存在两个默认的ConsumerConfig,则报错 if(consumerConfig ! = null){ throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config); } consumerConfig = config; } } if(consumerConfig ! = null){ //设置默认的ConsumerConfig setConsumer(consumerConfig); } } }
这一步整体来说就是设置默认的consumer, consumer是默认配置,其实就是配置文件中的<dubbo:consumer/>,当reference某些属性没有配置的时候可以采用consumer的默认配置。后面依次设置Application、Module、Registries、Monitor等配置,这些均在Spring解析自定义标签时加载到Spring容器中,将容器的实例取出来设置到ReferenceBean中成为默认配置。
在方法afterPropertiesSet的最后有如下一段代码:
Boolean b = isInit(); if(b == null && getConsumer()! = null){ b = getConsumer().isInit(); } if(b ! = null && b.booleanValue()){ getObject(); }
调用FactoryBean中getObject方法,里面会继续调用ReferenceConfig的init方法进行数据组装,最终将数据组装到一个Map对象中,如图3-12所示。这些数据都非常关键,为以后创建的Dubbo URL,以及向ZooKeeper注册中心注册服务提供重要的依据。
图3-12
2)服务订阅
在分析createProxy方法之前,先了解如下几个概念,分别是Invoker、ProxyFactory和Protocol。
● Invoker
Invoker代表一个可执行的对象,可以是本地执行类的Invoker,比如provider端的服务实现类,通过反射实现最终的方法调用。也可以是一个远程通信执行类的Invoker, consumer端通过接口与provider端进行远程通信,provider端利用本地Invoker执行相应的方法并返回结果。还可以是聚合Invoker, consumer调用端可以将多个Invoker聚合成一个Invoker执行操作。
● Protocol
通信协议,默认的Protocol是DubboProtocol,通过Protocol创建Invoker对象,默认的也就是DubboInvoker,具体实现过程在后面会详细介绍。
● ProxyFactory
对于Consumer端来说是通过ProxyFactory创建调用接口的代理对象,对于Provider端来说主要是包装本地执行的Invoker类。ProxyFactory接口的实现类有JdkProxyFactory和JavassistProxyFactory,而默认是JavassistProxyFactory。JdkProxyFactory是利用JDK自带的Proxy来动态代理目标对象的远程通信Invoker类。JavassistProxyFactory是利用Javassit字节码技术来创建的远程通信Invoker类。
ReferenceConfig的createProxy方法内容如下:
private T createProxy(Map<String, String> map){ URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; if(isInjvm()== null){ if(url ! = null && url.length()> 0){ //指定URL的情况下,不做本地引用 isJvmRefer = false; } else if(InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)){ //默认情况下如果本地有服务暴露,则引用本地服务 isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } if(isJvmRefer){ URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0,interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if(logger.isInfoEnabled()){ logger.info("Using injvm service " + interfaceClass.getName()); } } else { if(url ! = null && url.length()> 0){ //用户指定URL,指定的URL可能是 //对点对直连地址,也可能是注册中心URL String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if(us ! = null && us.length > 0){ for(String u : us){ URL url = URL.valueOf(u); if(url.getPath()== null || url.getPath().length()== 0){ url = url.setPath(interfaceName); } if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())){ urls.add(url.addParameterAndEncoded(Constants.REFER_KEY,StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { //通过注册中心配置拼装URL List<URL> us = loadRegistries(false); if(us ! = null && us.size()> 0){ for(URL u : us){ URL monitorUrl = loadMonitor(u); if(monitorUrl ! = null){ map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY,StringUtils.toQueryString(map))); } } if(urls == null || urls.size()== 0){ throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost()+ " use dubbo version " + Version.getVersion()+ ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if(urls.size()== 1){ invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<? >> invokers = new ArrayList<Invoker<? >>(); URL registryURL = null; for(URL url : urls){ //Invokers存放的是所有可用的服务调用者 invokers.add(refprotocol.refer(interfaceClass, url)); if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())){ registryURL = url; //用了最后一个registry url } } if(registryURL ! = null){ //有注册中心协议的URL //对有注册中心的Cluster只用AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY,AvailableCluster.NAME); //加入集群,内部会做一些负载处理 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { //不是注册中心的URL invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if(c == null && consumer ! = null){ c = consumer.isCheck(); } if(c == null){ c = true; //default true } if(c && ! invoker.isAvailable()){ throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " +(group == null ?"" : group + "/")+ interfaceName +(version == null ? "" : ":" + version)+ " from the url " + invoker.getUrl()+ " to the consumer " + NetUtils.getLocalHost()+ " use dubbo version " + Version.getVersion()); } if(logger.isInfoEnabled()){ logger.info("Refer dubbo service " + interfaceClass.getName()+ " from url " + invoker.getUrl()); } //创建服务代理 return(T)proxyFactory.getProxy(invoker); }
这段代码主要表达了三个意思:
● 判断当前的服务是本地服务还是远程的;
● 根据SPI找到对应的Protocol类,生成对应的URL协议;
● 与注册中心进行交互,“watch”相应的节点。
下面分别对这三点进行深入分析。
(1)判断当前的服务是本地服务还是远程服务。
根据isJvmRefer参数判断当前调用的是否是本地服务,本地服务可以理解为Provider端。
(2)根据SPI找到对应的Protocol类,生成对应的URL协议。
在上述代码中根据loadRegistries(false)装入Registry URL协议,方法实现在AbstractInterfaceConfig类中,核心代码如下:
List<URL> urls = UrlUtils.parseURLs(address, map); for(URL url : urls){ url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); //将ZooKeeper协议更换为Registry URL协议 url = url.setProtocol(Constants.REGISTRY_PROTOCOL); if((provider && url.getParameter(Constants.REGISTER_KEY, true)) ||(! provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))){ registryList.add(url); } }
这段代码是将ZooKeeper URL协议更换为Registry URL协议,URL的变换过程如下所示。
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? applicat ion=demo-consumer&dubbo=2.0.0&organization=dubbox&owner=programmer&pid=43836×t amp=1515981482847 registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? applicati on=demo-consumer&dubbo=2.0.0&organization=dubbox&owner=programmer&pid=43584®istr y=zookeeper×tamp=1515979629178
在上述ReferenceConfig类的createProxy方法中有如下代码,载入相关的Protocol协议类:
if(urls.size()== 1){ invoker = refprotocol.refer(interfaceClass, urls.get(0)); }
这段代码的执行过程是ProtocolFilterWrapper→ProtocolListenerWrapper→RegistryProtocol。
在ProtocolFilterWrapper的refer方法中有一个判断,代码如下所示。
public <T> Invoker<T> refer(Class<T> type, URL url)throws RpcException { if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())){ return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY,Constants.CONSUMER); }
在方法中先判断当前是不是Registry URL协议,如果是,则直接调用RegistryProtocol执行;如果不是,则将Protocol对象加入调用链。
(3)与注册中心交互,“watch”相应的节点。
从官网上找到Dubbo与注册中心的结构图,如图3-13所示。
图3-13
从图3-13中可以看出,服务提供者Provider向服务注册中心Registry注册服务,而消费者Consumer从服务注册中心订阅所需要的服务,但不是所有服务。当有新的Provider出现,或者现有Provider宕机时,注册中心Registry都会尽早发现,并将新的Provider列表推送给对应的Consumer。有了这样的机制,Dubbo才能做到Failover,而Failover的时效性,由注册中心Registry的实现决定。
Dubbo线上支持三种注册中心:自带的SimpleRegistry、Redis和ZooKeeper,当然,最常用的还是ZooKeeper,因为太多分布式的中间件需要依赖ZooKeeper作为协作者。那么怎么才能让Dubbo知道我们使用哪个实现作为注册中心呢?我们只需要在Dubbo的XML配置文件中配置dubbo:registry节点即可:
<dubbo:registry id="registry"protocol="zookeeper"address="${dubbo.registry.address}"/>
在上面第二步中我们找到对应的RegistryProtocol类,通过这个类进行服务订阅等相关工作,在分析代码流程之前先介绍RegistryProtocol中涉及的几个关键类。
● ZooKeeperRegistry:负责与ZooKeeper进行交互。
● RegistryProtocol:从注册中心获取可用服务,或者将服务注册到ZooKeeper,然后提供服务或调用代理。
● RegistryDirectory:维护所有可用的远程Invoker或本地的Invoker。这个类实现了NotifyListner。
● NotifyListener:负责RegistryDirectory和ZooKeeperRegistry的通信。
● FailbackRegistry:继承自Registry,实现了失败重试机制。
类的继承和依赖关系如图3-14所示。
图3-14
在RegistryProtocol类的refer方法中主要通过getRegistry方法获取ZooKeeperRegistry实例,并将ZooKeeperRegistry实例以参数的方式传入doRefer方法,代码如下所示。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type,URL url){ //新生成RegistryDirectory实例 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); //将ZooKeeperRegistry实例注入RegistryDirectory,形成组合关系 directory.setRegistry(registry); //将RegistryProtocol实例注入RegistryDirectory,形成组合关系 directory.setProtocol(protocol); //生成consumer端URL协议 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(),0, type.getName(), directory.getUrl().getParameters()); if(! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)){ //调用registry实例进行消费者地址注册 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY,Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } //服务订阅 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + ", " + Constants.CONFIGURATORS_CATEGORY + ", " + Constants.ROUTERS_CATEGORY)); //默认的cluster是FailoverCluster //返回的是FailoverClusterInvoker return cluster.join(directory); }
● 消息者地址注册
通过FailbackRegistry实例的register方法调用ZooKeeperRegistry实例的doRegister方法实现消费者的地址注册。注册地址如下所示。
consumer://169.254.2.78/com.alibaba.dubbo.demo.bid.BidService? application= demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.ali baba.dubbo.demo.bid.BidService&methods=throwNPE, bid&organization=dubbox&ow ner=programmer&pid=49427&side=consumer×tamp=1516020015025
● 服务订阅
通过FailbackRegistry实例的subscribe方法调用ZooKeeperRegistry实例的doSubscribe方法实现消费者的地址注册。ZooKeeper的服务节点路径如下所示。
/dubbo/com.alibaba.dubbo.demo.bid.BidService/providers
完整的ZooKeeper的服务注册和订阅的节点路径如图3-15所示。
在ZooKeeper中,Dubbo的节点为根节点,第二层为接口层存放服务类的全路径,第三层是服务提供者和服务消费者集合,第四层为各自的注册地址。
3)返回默认的集群和容错Invoker实例
RegistryProtocol类doRefer方法的最后一行代码是:
return cluster.join(directory);
图3-15
这一行代码最终返回一个Invoker执行类,执行cluster.join方法会先进入MockCluster-Wrapper类,代码如下:
public class MockClusterWrapper implements Cluster { private Cluster cluster; public MockClusterWrapper(Cluster cluster){ this.cluster = cluster; } public <T> Invoker<T> join(Directory<T> directory)throws RpcException { //新生成MockClusterInvoker实例并返回 return new MockClusterInvoker<T>(directory, //默认是FailoverCluster实例 this.cluster.join(directory)); } }
在join方法中新生成一个MockClusterInvoker实例,并将FailOverCluster实例的join方法返回的Invoker对象作为构造参数传递给MockClusterInvoker对象。至于MockClusterWrapper实例为什么会在默认的FailOverCluster之前,请参考Dubbo SPI机制的内容。在FailOverCluster实例中返回的是FailoverClusterInvoker对象,这是Dubbo默认的集群容错策略,当服务出现失败时,重试其他服务器,但是重试会带来较长的延长时间。最终MockClusterInvoker实例作为创建代理对象的方法参数传入。
4)创建代理对象
在配置初始化和服务注册与订阅完成后,剩下的工作就是对服务接口类进行包装,产生代理对象并返回。
ReferenceConfig类的createProxy方法的最后一行代码是:
return(T)proxyFactory.getProxy(invoker);
Dubbo实现代理对象的方式有两种,一种是使用JDK动态代理,使用的是JDKProxyFactory;另一种是使用Javassit字节码来实现,使用JavassitProxyFactory来实现。Dubbo默认使用的是JavassitProxyFactory,代码如下:
public <T> T getProxy(Invoker<T> invoker, Class<? >[] interfaces){ return(T)Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
这段代码看似和JDK生成动态代理的代码一样,其实这里的Proxy类不是JDK自带的生成代理对象的Proxy类,而是Dubbo自己实现的,类的全路径是com.alibaba.dubbo.common. bytecode.Proxy,利用Javassit字节码技术生成代理。
我们直接来看Proxy类中的核心方法Proxy getProxy(ClassLoader cl, Class<? >... ics),先看第一部分代码:
//服务接口类长度不能大于65535
if(ics.length > 65535) throw new IllegalArgumentException("interface limit exceeded"); StringBuilder sb = new StringBuilder(); for(int i=0; i<ics.length; i++) { String itf = ics[i].getName(); //如果服务类不是接口则报错 if(! ics[i].isInterface()) throw new RuntimeException(itf + " is not a interface."); Class<? > tmp = null; try { //根据类的全路径名返回服务接口的Class tmp = Class.forName(itf, false, cl); } catch(ClassNotFoundException e) {} if(tmp ! = ics[i]) throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); sb.append(itf).append('; '); } // use interface class name list as key. //将接口全路径名以分号连接起来,拼成key字符串 String key = sb.toString(); //定义缓存对象 Map<String, Object> cache; synchronized(ProxyCacheMap) { cache = ProxyCacheMap.get(cl); if(cache == null) { //如果缓存对象为null,则创建一个HashMap cache = new HashMap<String, Object>(); //以Classloader为key,将cache对象缓存到ProxyCacheMap中 ProxyCacheMap.put(cl, cache); } } Proxy proxy = null; synchronized(cache) { do { Object value = cache.get(key); //从缓存中取实例,如果是Reference类型的则直接返回代理 if(value instanceof Reference<? >) { proxy =(Proxy)((Reference<? >)value).get(); if(proxy ! = null) return proxy; } //PendingGenerationMarker等于value,说明此时value是正在创建中的对象,使用 //wait进行等待,直到创建完成 if(value == PendingGenerationMarker) { try{ cache.wait(); }catch(InterruptedException e){} } //将key和PendingGenerationMarker缓存 else { cache.put(key, PendingGenerationMarker); break; } } while(true); }
这段代码主要是将服务接口全路径名以分号的方式连接起来,存放到cache对象中以便下次使用,下面的部分是Javassist的核心代码。
long id = PROXY_CLASS_COUNTER.getAndIncrement(); String pkg = null; //利用字节码生成对象实例工具 ClassGenerator ccp = null; ccm = null; try { ccp = ClassGenerator.newInstance(cl); Set<String> worked = new HashSet<String>(); List<Method> methods = new ArrayList<Method>(); for(int i=0; i<ics.length; i++) { if(! Modifier.isPublic(ics[i].getModifiers())) { String npkg = ics[i].getPackage().getName(); if(pkg == null) { pkg = npkg; } else { if(! pkg.equals(npkg)) throw new IllegalArgumentException("non-public interfaces from different packages"); } } ccp.addInterface(ics[i]); for(Method method : ics[i].getMethods()) { String desc = ReflectUtils.getDesc(method); if(worked.contains(desc)) continue; worked.add(desc); int ix = methods.size(); Class<? > rt = method.getReturnType(); Class<? >[] pts = method.getParameterTypes(); //生成代理方法体 StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("]; "); for(int j=0; j<pts.length; j++) code.append(" args[").append(j).append("] =($w)$").append(j+1).append("; "); code.append(" Object ret = handler.invoke(this, methods[" + ix + "],args); "); if(! Void.TYPE.equals(rt)) code.append(" return ").append(asArgument(rt, "ret")).append("; "); methods.add(method); ccp.addMethod(method.getName(), method.getModifiers(), rt, pts,method.getExceptionTypes(), code.toString()); } } if(pkg == null) pkg = PACKAGE_NAME; //生成的代理实例对象 String pcn = pkg + ".proxy" + id; //设置代理实例对象的类名 ccp.setClassName(pcn); //添加静态Method属生 ccp.addField("public static java.lang.reflect.Method[] methods; "); //添加InvokerInvocationHandler属性 ccp.addField("private " + InvocationHandler.class.getName()+ " handler; "); //添加构造方法,参数是InvokerInvocationHandler对象 ccp.addConstructor(Modifier.PUBLIC, new Class<? >[]{ InvocationHandler.class },new Class<? >[0], "handler=$1; "); ccp.addDefaultConstructor(); //生成代理类Class Class<? > clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0])); //创建代理类对象 String fcn = Proxy.class.getName()+ id; ccm = ClassGenerator.newInstance(cl); ccm.setClassName(fcn); //添加默认构造方法 ccm.addDefaultConstructor(); //设置父类是抽象类Proxy ccm.setSuperClass(Proxy.class); //生成新的方法,实例化代理实例对象并返回 ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName()+ " h){ return new " + pcn + "($1); }"); Class<? > pc = ccm.toClass(); //实例化代理类对象 proxy =(Proxy)pc.newInstance(); } catch(RuntimeException e) { throw e; }
整段代码的逻辑就是自己注入代码生成代理类,将InvokerInvocationHandler实例对象传入代理类,最终实现代理的功能。
反编译由Javassist生成的代理类,部分源代码如下所示。
public class com.alibaba.dubbo.common.bytecode.Proxy0 extends Proxy { //将InvocationHandler实例类传入 public Object newInstance(java.lang.reflect.InvocationHandler h){ //实例化proxy0对象 return new com.alibaba.dubbo.common.bytecode.proxy0(h); } } public class com.alibaba.dubbo.common.bytecode.proxy0 { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public com.alibaba.dubbo.common.bytecode.proxy0(InvocationHandler handler){ this.handler = handler; } public com.alibaba.dubbo.demo.bid.BidResponse bid(){ Object[] args = new Object[1]; args[0] =($w)$1; //这里的方法调用其实是委托给nvocationHandler实例对象的 Object ret = handler.invoke(this, methods[0], args); return(com.alibaba.dubbo.demo.bid.BidResponse)ret; } }
到目前为止,ReferenceBean整个类的源码已经基本分析完了,最终会使用InvokerInvocationHandler将服务接口包装成一个代理类并返回。我们在调用服务接口的时候就会触发代理类,通过代理类实现服务路由、服务选取,以及与服务提供者Provider端的远程通信,这些过程服务调用者是无法感知的,就像在应用中调用本地方法一样简单。虽然使用简单,但是在性能上和调用本地方法却有很大的差别,我们不仅要考虑服务提供者Provider的性能,还要考虑网络环境的健康状况。服务调用方根据返回的不同状态信息使用不同的策略应对,而Dubbo已经为我们提供了多种策略,下面看一下InvokerInvocationHandler代理类的实现过程。
2.远程调用
前面我们介绍了ReferenceBean的整个流程,通过ReferenceBean将服务接口以代理的形式进行了包装。下面介绍如何通过代理对象进行远程方法的调用,从大的方面也可以分为三步,分别是代理调用、容错负载和远程通信,如图3-16所示。
图3-16
1)代理调用
接下来就要直接调用服务接口实现远程调用,调用服务接口的示例代码如下:
BidService bidService =(BidService)ctx.getBean("bidService"); BidRequest bidRequest = new BidRequest(); bidRequest.setId("1001"); BidResponse bidResponse = bidService.bid(bidRequest);
这段代码是Dubbo源码工程中dubbo-demo/dubbo-demo-consumer模块下的示例代码,通过Spring的getBean方法获取服务接口BidService,然后设置请求参数数据,调用服务接口的bid方法,但bid方法已经被代理类InvokerInvocationHandler包装拦截。InvokerInvocationHandler代理类的代码如下所示。
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<? > invoker; public InvokerInvocationHandler(Invoker<? > handler){ this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args)throws Throwable { String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes(); if(method.getDeclaringClass()== Object.class){ return method.invoke(invoker, args); } //动态代理过滤toString方法 if("toString".equals(methodName)&& parameterTypes.length == 0){ return invoker.toString(); } //动态代理过滤hashCode方法 if("hashCode".equals(methodName)&& parameterTypes.length == 0){ return invoker.hashCode(); } //动态代理过滤equals方法 if("equals".equals(methodName)&& parameterTypes.length == 1){ return invoker.equals(args[0]); } //将方法和参数封装成RpcInvocation后调用,recreate方法的主要作用是在调用时如 //果发生异常则抛出异常,否则正常返回 return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
每一个动态代理类都必须实现InvocationHandler接口,并且每个代理类的实例都关联了一个handler,当我们通过代理对象调用一个方法时,这个方法的调用就会转为由InvocationHandler接口的invoke方法来调用,Invoker实例就是我们之前讲过的MockClusterInvoker。
public Result invoke(Invocation invocation)throws RpcException { Result result = null; //获取Mock状态值 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); //如果为false,则继续往下执行 if(value.length()== 0 || value.equalsIgnoreCase("false")){ //不进行“Mock”则直接调用后面的Invoker result = this.invoker.invoke(invocation); } //如果为true,则判断value字符串是否以force开头,如果是则强制执行 //doMockInvoker方法 else if(value.startsWith("force")){ if(logger.isWarnEnabled()){ logger.info("force-mock: " + invocation.getMethodName()+ "force-mock enabled , url : " + directory.getUrl()); } //如果值为force,表示强制“Mock”,即不访问远端方法,直接调用Mock数据 result = doMockInvoke(invocation, null); } else { //其他的值,则先调用后面的Invoker,如果失败且不是业务错误时使用Mock数 //据,非业务错误包含网络错误、超时错误、禁止访问错误、序列化错误及其他未 //知的错误,业务错误则是接口实现类中的方法抛出的错误 try { result = this.invoker.invoke(invocation); }catch(RpcException e){ if(e.isBiz()){ throw e; } else { if(logger.isWarnEnabled()){ logger.info("fail-mock: " + invocation.getMethodName()+ "fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; }
这段代码首先要根据请求的URL获取Mock的value状态值,如果value值为false,则直接继续下一步;如果value值是以force开头的字符串,则强制执行doMockInvoke方法。这个方法不进行远程访问,可以自己定义本地Mock方法执行。如果value值是mock=fail:return null,则可以放行继续执行;如果返回错误,则可以根据doMockInvoke方法进行功能降级。也就是说,这个类一共包括三个功能,分别是Mock挡板、功能降级和正常执行,正常执行我们后面会继续介绍,下面分别对Mock挡板和功能降级进行简要的介绍。
(1)Mock挡板。
以一个例子说一下Dubbo中Mock的用法。
<dubbo:reference interface="com.alibaba.dubbo.demo.bid.BidService" mock="force" />
在reference标签上加一个mock="force"就可以将当前服务设置为Mock。但是设置完Mock属性后还没有结束,需要有一个Mock类对应服务接口类。
规则如下:
接口名 + Mock后缀,服务接口调用失败Mock实现类,该Mock类必须有一个无参构造函数。
如果对应到com.alibaba.dubbo.demo.bid.BidService,则创建BidServiceMock类。
public class BidServiceMock implements BidService { public String bid(BidRequest request){ //可以伪造容错数据,此方法只在出现RpcException时被执行 return "容错数据"; } }
如果对应到com.alibaba.dubbo.demo.bid.BidService,则创建BidServiceMock类。经过以上设置后,当调用BidService进行接口调用时,请求将直接到BarServiceMock实例中进行相关的数据模拟。
(2)功能降级。
降级一词最简单的解释就是“弃卒保帅”,而降级的目的就是停止一些非核心的系统以保证系统的核心功能能够正常使用。在Dubbo中,降级一词还有另一层含义,因网络、超时等异常长时间出现后,Dubbo通过正常的通信协议(比如Netty)无法正常工作,则可以考虑采用其他的通信方式,比如Hessian或HTTP的方式,一些非关键和实时的数据也可以调用本地缓存的数据返回。
2)容错负载
下面从整体设计架构上做详细介绍,然后通过源码来分析整个过程。
(1)整体架构介绍。
容错负载是Dubbo的重要组成模块,该模块实现了多种集群特性,还实现了目录服务、负载均衡、路由策略和服务治理配置等特性。整体架构设计如图3-17所示。
图3-17
各部分组件说明:
● Invoker是服务提供者(Provider)的抽象,Invoker封装了Provider地址及服务接口信息。
● Directory代表多个Invoker,可以把它看作List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。
● Cluster将Directory中的多个Invoker伪装成一个Invoker,伪装过程包含了容错逻辑,调用失败后,重试另一个。
● Router可以从多个Invoker中通过路由规则进行过滤和筛选。
● LoadBalance可以从多个Invoker中选出一个使用。
负载均衡的类结构如图3-18所示。
图3-18
● RoundRobinLoadBalance:权重轮询算法,按照公约后的权重设置轮询比例
原理:把来自用户的请求轮流分配给内部中的服务器。例如:从1开始,一直到N(其中,N是内部服务器的总数),然后重新开始循环。
● LeastActiveLoadBalance:最少活跃调用数均衡算法
原理:最少活跃调用数,活跃数指调用前后计数差,使慢的机器收到更少。
● ConsistentHashLoadBalance:一致性Hash算法
原理:一致性Hash,相同参数的请求总是发到同一个提供者。一致性Hash算法可以解决服务提供者的增加、移除及“挂掉”时的情况,也可以通过构建虚拟节点,尽可能避免分配失衡,具有很好的平衡性。
● RandomLoadBalance:随机均衡算法(Dubbo的默认负载均衡策略)
原理:按权重设置随机概率,如果每个提供者的权重都相同,那么根据列表长度直接随机选取一个,如果权重不同,则累加权重值。从0~累加的权重值中选取一个随机数,然后判断该随机数落在哪个提供者上。
集群策略类结构如图3-19所示。
图3-19
● FailoverCluster:失败转移
当出现失败时,重试其他服务器,通常用于读操作,但重试会带来更长延迟(默认集群策略)。
● FailfastCluster:快速失败
只发起一次调用,失败立即报错,通常用于非幂等性操作。
● FailbackCluster:失败自动恢复
对于Invoker调用失败,后台记录失败请求,任务定时重发,通常用于通知。
● BroadcastCluster:广播调用
遍历所有Invokers,如果调用其中某个invoker报错,则“catch”住异常,这样就不影响其他Invoker调用。
● AvailableCluster:获取可用的调用
遍历所有Invokers并判断Invoker.isAvalible,只要有一个为true就直接调用返回,不管成不成功。
● FailsafeCluster:失败安全
出现异常时,直接忽略,通常用于写入审计日志等操作。
● ForkingCluster:并行调用
只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多的服务资源。
● MergeableCluster:分组聚合
按组合并返回结果,比如某个服务接口有多种实现,可以用group区分,调用者调用多种实现并将得到的结果合并。
集群目录类结构如图3-20所示。
图3-20
● Directory:代表多个Invoker,可以看作List,它的值可能是动态变化的,比如注册中心推送变更。
● StaticDirectory:静态目录服务,它的所有Invoker通过构造函数传入,并且将所有Invoker返回。
● RegistryDirectory:注册目录服务,它的Invoker集合是从注册中心获取的,并且实现了NotifyListener接口的notify(List)方法。
● AbstractDirectory:所有目录服务实现的抽象类,它在获取所有的Invoker后,通过Router服务进行路由过滤。
路由类结构如图3-21所示。
图3-21
● ConditionRouter:基于条件表达式的路由规则,不足之处是在规则复杂且多分支的情况下,规则不容易描述。
● ScriptRouter:基于脚本引擎的路由规则,没有运行沙箱,脚本能力强大,可能成为后门。
(2)源码分析。
注:本节只对Dubbo默认的集群和负载策略做源码分析,其他相关策略还请读者自行研究。
在MockClusterInvoker实例中正常执行流程,代码“走”到了AbstractClusterInvoker类的invoke(final Invocation invocation)方法中,AbstractClusterInvoker类主要用于集群选择的抽象类,如下所示。
public Result invoke(final Invocation invocation)throws RpcException { //健康检测 checkWheatherDestoried(); //定义负载接口类 LoadBalance loadbalance; //获取所有可用的服务列表 List<Invoker<T>> invokers = list(invocation); if(invokers ! = null && invokers.size()> 0){ //获取默认的负载策略 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBAL ANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { //如果暂时没有地址信息,则使用默认的负载均衡策略策略(random) loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } //如果是异步则需要加入相应的信息 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //根据地址及负载均衡策略发起调用 return doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation)throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers;
directory也就是RegistryDirectory实例,通过上层抽象类AbstractDirectory可以调用RegistryDirectory的doList(Invocation invocation)方法来获得invocation的所有Invoker。其中invocation只需要给出调用的方法名称即可,Invoker则负责发送调用请求和接收返回结果,里面封装了所有的通信、序列化细节。
RegistryDirectory是如何根据invocation参数来获取Invoker列表的呢?其实RegistryDirectory包含一个subscribe方法,用来向Registry请求所需要的服务调用地址,然后Registry会通过notify方法回调RegistryDirectory, notify方法就会把这些服务的地址进一步封装成Invoker,并且缓存起来。这样调用doList的时候直接根据invocation的方法名来找对应的Invoker就可以了。
RegistryDirectory的doList返回的是一个list列表,也就是可能会存在多个可用的服务实现,可以通过负载balance来决定使用哪个服务实现。
在上述invoke方法中通过list方法获取可用服务列表后,接着通过SPI的机制获取默认的负载均衡策略(RandomLoadBalance,随机均衡算法),然后将invocation、可用服务列表和默认负载策略以参数的方式传入默认的集群策略类FailoverClusterInvoker的doInvoker方法。
FailoverClusterInvoker类的doInvoker方法代码如下:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers,LoadBalance loadbalance)throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); //获取URL中retries关键字的值 //需要注意的是默认的重试次数为2(最多执行3次) int len = getUrl().getMethodParameter(invocation.getMethodName(),Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES)+ 1; if(len <= 0){ len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); //invoked invokers. Set<String> providers = new HashSet<String>(len); //发起指定次数的调用,只要有一次成功就返回 for(int i = 0; i < len; i++){ //重试时,进行重新选择,避免重试时Invoker列表已发生变化 //注意:如果列表发生了变化,那么invoked判断会失效,因为Invoker示例已经改变 if(i > 0){ checkWheatherDestoried(); copyinvokers = list(invocation); //重新检查一下 checkInvokers(copyinvokers, invocation); } //根据负载均衡算法得到一个地址 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); //记录发起过调用的地址,防止重试时调用了已经调用过的地址 invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { //通过之前选出的地址进行调用 Result result = invoker.invoke(invocation); //调用成功后,判断之前是否经过重试,如果重试过则记录警告信息 if(le ! = null && logger.isWarnEnabled()){ logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + "(" + providers.size()+ "/" + copyinvokers.size() + ")from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion()+ ".Last error is: " + le.getMessage(), le); } return result; } catch(RpcException e){ //如果业务异常则直接抛出错误,其他(如超时等错误)则不重试 if(e.isBiz()){ // biz exception. throw e; } le = e; } catch(Throwable e){ le = new RpcException(e.getMessage(), e); } finally { //记录调用的地址 providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le ! = null ? le.getCode(): 0, "Failed to invoke the method " + invocation.getMethodName()+ " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + "(" + providers.size()+ "/" + copyinvokers.size() + ")from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion()+ ". Last error is: " +(le ! = null ? le.getMessage(): ""), le ! = null && le.getCause()! =null ? le.getCause(): le); }
FailoverClusterInvoker的重试次数默认是两次,最多执行三次,每一次重试都要重新获取可用服务列表,然后根据选定的负载均衡策略选择出一个可用服务进行调用,如果调用失败则要判断当前异常是否是业务异常,如果是则不重试直接抛出异常。
下面深入分析如何通过负载均衡策略选择一个可用的服务。在FailoverClusterInvoker的上层抽象类AbstractClusterInvoker中有一个select方法,代码如下:
/** * 使用loadbalance选择Invoker *@param availablecheck如果设置为true,则在选择的时候先选invoker.available==true * @param selected已选过的Invoker.注意:输入保证不重复 * */ protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected)throws RpcException { if(invokers == null || invokers.size()== 0) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); //如果sticky为true,则调用端在访问该接口上的所有方法时使用相同的provider boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { //ignore overloaded method //如果provider已经不存在了,则将其设置为null if(stickyInvoker ! = null && ! invokers.contains(stickyInvoker)){ stickyInvoker = null; } //ignore cucurrent problem //如果sticky为true,且之前有调用过的未失败的provider,则继续使用该provider if(sticky && stickyInvoker ! = null &&(selected == null || ! selected.contains(stickyInvoker))){ if(availablecheck && stickyInvoker.isAvailable()){ return stickyInvoker; } } } //选择Invoker Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected); if(sticky){ stickyInvoker = invoker; } return invoker; }
这个方法中一个比较重要的参数是sticky,如果得到的值是true,则表示调用端在使用这个服务接口上面的所有方法,都使用同一个provider;如果得到的值是false,则通过doselect方法进行服务选择,代码如下:
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected)throws RpcException { if(invokers == null || invokers.size()== 0) return null; //如果可用服务只有一个,就直接返回 if(invokers.size()== 1) return invokers.get(0); //如果只有两个Invoker,则退化成轮循 if(invokers.size()== 2 && selected ! = null && selected.size()> 0){ return selected.get(0)== invokers.get(0)? invokers.get(1): invokers.get(0); } //通过负载均衡算法得到一个Invoker Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //如果selected中包含(优先判断)或不可用&&availablecheck=true则重试 if((selected ! = null && selected.contains(invoker)) ||(! invoker.isAvailable()&& getUrl()! =null && availablecheck)){ try{ Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers,selected, availablecheck); if(rinvoker ! = null){ invoker = rinvoker; }else{ //看下第一次选的位置,如果不是最后,则选+1位置 int index = invokers.indexOf(invoker); try{ //避免碰撞 invoker = index <invokers.size()-1? invokers.get(index+1):invoker; }catch(Exception e){ logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.", e); } } }catch(Throwable t){ logger.error("clustor relselect fail reason is :"+t.getMessage()+" if can not slove , you can set cluster.availablecheck=false in url", t); } } return invoker; }
● 如果当前可用的服务只有一个,则直接返回。
● 如果当前可用的服务有两个,则采用轮询的方式返回。
● 如果当前的可用服务大于两个,则采用负载算法选择服务。
● 如果选择的服务之前有过不可用的记录,则检查availablecheck的值是否为true,如果为true则重试。
● 如果重试的时候,选择的可用的服务不为null,则直接返回。
● 如果选择的可用服务为null,则选择当前服务的下一个服务,如果当前的Invoker已经是最后一个了,则只能选择最后一个返回。
根据上面的代码,先看一下如何通过负载算法来选择可用服务,在接口LoadBalance中可以看到默认的负载算法是RandomLoadBalance,代码如下:
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url,Invocation invocation){ int length = invokers.size(); //总数 int totalWeight = 0; //总权重 boolean sameWeight = true; //权重是否都一样 for(int i = 0; i < length; i++){ int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; //累计总权重 if(sameWeight && i > 0 && weight ! = getWeight(invokers.get(i -1), invocation)){ sameWeight = false; //计算所有权重是否一样 } } if(totalWeight > 0 && ! sameWeight){ //如果权重不相同且权重大于0则按总权重数随机 int offset = random.nextInt(totalWeight); //并确定随机值落在哪个片断上 for(int i = 0; i < length; i++){ offset -= getWeight(invokers.get(i), invocation); if(offset < 0){ return invokers.get(i); } } } // 如果权重相同或权重为0则均等随机 return invokers.get(random.nextInt(length)); } }
随机调度算法又分两种情况:
● 当所有服务提供者权重相同或无权重时,则根据列表size得到一个值,再随机得出一个[0, size)的数值,根据这个数值获取对应位置的服务提供者。
● 计算所有服务提供者权重之和,例如有5个Invoker,总权重为25,则随机得出[0, 24]的一个值,根据各个Invoker的区间来取Invoker,如随机值为10,则选择第二个Invoker。
继续来看权重获取方法getWeight,代码如下:
protected int getWeight(Invoker<? > invoker, Invocation invocation){ //先获取provider配置的权重(默认为100) int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(),Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if(weight > 0){ long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L); if(timestamp > 0L){ int uptime =(int)(System.currentTimeMillis()- timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY,Constants.DEFAULT_WARMUP); //如果启动时长小于预热时间,则需要降权。权重计算方式为启动时长占预热时间的 //百分比乘以权重,如启动时长为20000ms,预热时间为60000ms,权重为120,则 //最终权重为120×(1/3)= 40,注意calculateWarmupWeight使用float进行 //计算,因此结果并不精确 if(uptime > 0 && uptime < warmup){ weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; } static int calculateWarmupWeight(int uptime, int warmup, int weight){ int ww =(int)((float)uptime /((float)warmup /(float)weight)); return ww < 1 ? 1 :(ww > weight ? weight : ww); }
至此默认的随机负载算法已经介绍完毕,我们回到先前的FailoverClusterInvoker实例中,上面我们介绍了通过select方法如何选择服务,那么在选择出一个可用服务后,接下来就正式进入服务调用环节了,也就是Result result = invoker.invoke(invocation)。这一行代码会经过一系列的Filter通过配置好的通信协议,远程调用相应的Provider,执行并返回结果,返回结果和异常信息全部封装到Result对象中,最终实现一次完整的调用过程。关于Dubbo的通信机制我们会在后面进行深入介绍。
Dubbo Filter类列表如图3-22所示。
图3-22
图中的这些Filter,有consumer端的也有provider端的,都是在定义Filter的时候通过注解指定的。Filter是一种递归的链式调用,用来在远程调用真正执行的前后加入一些逻辑,跟AOP的拦截器Servlet中Filter概念一样,Filter接口的定义如下所示。
@SPI public interface Filter { Result invoke(Invoker<? > invoker, Invocation invocation)throws RpcException; }
在ProtocolFilterWrapper类中,通过服务的暴露与引用,根据Key是provider还是consumer来构建服务提供者与消费者的调用过滤器链。
在Filter的实现类需要加上@Activate注解,@Activate的group属性是一个string数组,我们可以通过这个属性来指定Filter是在consumer或provider还是两者都有的情况下激活,所谓激活就是能够被获取并组成Filter链。
3.2.5 Dubbo服务端
服务发布就是服务提供端向注册中心注册服务,这样调用端便能够从注册中心获取相应的服务。
与Dubbo消费端类似,服务端的核心类是ServiceBean,在Spring解析Dubbo的service标签的时候,在DubboNamespaceHandler类中进行加载。想要发布一个服务,只需要在Dubbo的XML文件中配置相应的服务即可,示例如下:
<dubbo:service interface="com.alibaba.dubbo.demo.bid.BidService" ref="bidService" protocol="dubbo" />
图3-23显示了ServiceBean的类继承结构。
图3-23
从图中可以看出整体继承结构与服务调用端的ReferenceBean非常相似,每个类的说明和作用在介绍ReferenceBean时都已经进行过讲解,本节不再重复。
通过ServiceBean的afterPropertiesSet方法查看配置初始化的代码,部分代码如下所示。
if(getProvider()== null){ //从ProviderConfig.class的ApplicationContext中获取实例 Map<String, ProviderConfig> providerConfigMap = applicationContext == null ?null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,ProviderConfig. class, false, false); if(providerConfigMap ! = null && providerConfigMap.size()> 0){ Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ?null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,ProtocolConfig.class, false, false); if((protocolConfigMap == null || protocolConfigMap.size()== 0) && providerConfigMap.size()> 1){ //兼容旧版本 List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>(); for(ProviderConfig config : providerConfigMap.values()){ if(config.isDefault()! = null && config.isDefault().booleanValue()){ providerConfigs.add(config); } } if(providerConfigs.size()> 0){ setProviders(providerConfigs); } } else { ProviderConfig providerConfig = null; for(ProviderConfig config : providerConfigMap.values()){ if(config.isDefault()== null || config.isDefault().booleanValue()){ if(providerConfig ! = null){ throw new IllegalStateException("Duplicate provider configs:" + providerConfig + " and " + config); } providerConfig = config; } } if(providerConfig ! = null){ setProvider(providerConfig); } } } }
这一步整体来说就是设置provider,当service某些属性没有配置的时候可以采用provider的默认配置。后面依次设置Application、Module、Registries、Monitor等配置,这些均在Spring解析自定义标签的时候加载到Spring容器中,将容器的实例取出来设置到ServiceBean中成为默认配置。整个初始化的过程与ReferenceBean非常相似,这里不再重复。
在ServiceBean中有两个重要的方法,一个是onApplicationEvent方法,代码如下:
public void onApplicationEvent(ApplicationEvent event){ if(ContextRefreshedEvent.class.getName().equals(event.getClass().getName())){ if(isDelay()&& ! isExported()&& ! isUnexported()){ if(logger.isInfoEnabled()){ logger.info("The service ready on spring started. service: " +getInterface()); } export(); } } }
ServiceBean实现了ApplicationListener和InitializingBean接口,onApplicationEvent方法是在Bean初始化或容器中所有Bean刷新完毕时被调用的。根据provider的延迟设置决定,如果设置了延迟(delay属性)则在Spring bean初始化结束之后再调用,否则在ServiceBean中直接被调用。默认delay是延迟的,也就是在所有Bean的刷新结束后被调用。
ServiceBean的另一个重要方法是export,这个方法在ServiceBean中两个地方出现,一个是上面说的onApplicationEvent,另一个是根据provider的延迟设置来调用,export方法实际上是ServiceBean的继承类ServiceConfig中的方法。
export方法内部初始化delay延迟时间,如果设置了延迟时间则启动一个Thread守护线程,线程的sleep时间是delay的int值。而后调用doExport方法初始化和校验Dubbo配置文件中定义的标签属性,再调用doExportUtils方法,代码如下:
private void doExportUrls(){ List<URL> registryURLs = loadRegistries(true); for(ProtocolConfig protocolConfig : protocols){ doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
loadRegistries方法是获取所有注册中心的地址,Dubbo可以配置多个注册中心,所以返回的是一个List列表,URL的示例内容如下所示。
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? applicat ion=demo-provider&dubbo=2.0.0&organization=dubbo&owner=programmer&pid=25286×ta mp=1516457002141
因为是注册中心地址封装,所以URL是以zookeeper开头的协议。
在Dubbo配置文件中配置注册中心地址的示例如下所示。
<dubbo:registry id="bidRegistry" address="zookeeper://127.0.0.1:2181"/>
如果是多注册中心配置,则通过id进行区分。
获得registryURLs注册地址后,遍历获取在Dubbo配置文件中配置的通信协议,示例如下:
<dubbo:protocol name="dubbo" serialization="hessian2"/>
通信协议也可以配置多个,所以用的也是List列表,遍历出每个协议后执行doExportUrlsFor1Protocol方法,主要包括设置服务端口、生成服务代理和服务注册三个过程,如图3-24所示。
图3-24
下面重点分析这三个过程。
1.设置服务端口
代码如下:
//获取通信协议,如果没有则用默认的Dubbo String name = protocolConfig.getName(); if(name == null || name.length()== 0){ name = "dubbo"; } //获取主机地址 String host = protocolConfig.getHost(); if(provider ! = null &&(host == null || host.length()== 0)){ host = provider.getHost(); } boolean anyhost = false; if(NetUtils.isInvalidLocalHost(host)){ anyhost = true; try { //获取主机IP地址 host = InetAddress.getLocalHost().getHostAddress(); } catch(UnknownHostException e){ logger.warn(e.getMessage(), e); } //判断地址是否有效 if(NetUtils.isInvalidLocalHost(host)){ if(registryURLs ! = null && registryURLs.size()> 0){ for(URL registryURL : registryURLs){ try { //创建Socket,连接到注册中心 Socket socket = new Socket(); try { SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort()); socket.connect(addr, 1000); //获取服务所在的IP host = socket.getLocalAddress().getHostAddress(); break; } finally { try { socket.close(); } catch(Throwable e){} } } catch(Exception e){ logger.warn(e.getMessage(), e); } } } if(NetUtils.isInvalidLocalHost(host)){ host = NetUtils.getLocalHost(); } } } //获取协议端口号 Integer port = protocolConfig.getPort(); if(provider ! = null &&(port == null || port == 0)){ //如果port是null,则用provider默认的端口号 port = provider.getPort(); } //name默认是Dubbo,从Dubbo中获取默认端口号 final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort(); if(port == null || port == 0){ port = defaultPort; } // if(port == null || port <= 0){ port = getRandomPort(name); if(port == null || port < 0){ port = NetUtils.getAvailablePort(defaultPort); putRandomPort(name, port); } logger.warn("Use random available port(" + port + ")for protocol " + name); }
这个过程主要是获得服务的IP地址和端口号,接下来就是获取application、module、provider、protocol、exporter、registries、monitor所有属性并封装到Map对象中,根据Map对象的值生成默认是Dubbo协议的URL, URL生成示例如下:
dubbo://192.168.2.1:20880/com.alibaba.dubbo.demo.bid.BidService? anyhost=true&a pplication=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.bid.BidService&methods=throwNPE, bid&organization=dubbo&owner=programmer&pid=27251&serialization=hessian2&side=provider×tamp=1516583267908
2.生成代理对象
代码如下:
String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 if(! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)){ //配置不是remote的情况下做本地暴露(配置为remote,则表示只暴露远程服务) if(! Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)){ exportLocal(url); } //如果配置不是local则暴露为远程服务(配置为local,则表示只暴露远程服务) if(! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)){ if(logger.isInfoEnabled()){ logger.info("Export dubbo service " + interfaceClass.getName()+ " to url " + url); } if(registryURLs ! = null && registryURLs.size()> 0 && url.getParameter("register", true)){ for(URL registryURL : registryURLs){ url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if(monitorUrl ! = null){ url = url.addParameterAndEncoded(Constants.MONITOR_KEY,monitorUrl.toFullString()); } if(logger.isInfoEnabled()){ logger.info("Register dubbo service " + interfaceClass.getName()+ " url " + url + " to registry " + registryURL); } //获取Invoker Invoker<? > invoker = proxyFactory.getInvoker(ref,(Class)interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY,url.toFullString())); //protocol为默认的RegistryProtocol,通过export方法实现服务的注册 //根据协议将Invoker暴露成exporter,具体过程是创建一个ExchangeServer, //它会绑定一个ServerSocket到配置端口 Exporter<? > exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker<? > invoker = proxyFactory.getInvoker(ref,(Class)interfaceClass,url); Exporter<? > exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url);
变量scope属性值主要用来判断暴露服务的方式,如果scope属性值不为none并且也不是remote,则服务是本地暴露服务,生成一个本地服务代理对象,同时生成一个新的URL协议,协议以injvm://开头,代表的是本地服务并且生成一个InjvmExporter实例,这时本地调用Dubbo接口时直接调用本地代理而不“走”网络请求。
先看一下exportLocal方法如何生成本地服务代理对象,代码如下:
private void exportLocal(URL url){ if(! Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())){ URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(NetUtils.LOCALHOST) .setPort(0); // modified by lishen ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); Exporter<? > exporter = protocol.export( proxyFactory.getInvoker(ref,(Class)interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName()+" to local registry"); } }
这个方法首先会判断当前的协议是什么,如果当前协议不是injvm则重新封装URL,示例如下所示。
injvm://127.0.0.1/com.alibaba.dubbo.demo.bid.BidService? anyhost=true&applicati on=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.bid.Bid Service&methods=throwNPE, bid&organization=dubbo &owner=programmer&pid=28314&serialization=hessian2&side=provider×tamp=1516587908074
通过proxyFactory.getInvoker(ref,(Class)interfaceClass, local)方法生成本地代理Invoker,这里的参数ref就是在dubbo:service中配置的ref属性,指定服务的具体实现类。Invoker的invoke方法被调用时,最终会调用ref指定的服务实现,在本例中是BidServieImpl, interfaceClass就是服务接口名,在本例中就是BidService接口。之后请求直接到JavassitProxyFactory的getInvoker方法中,以匿名内部类的方式生成AbstractProxyInvoker抽象代理类,由该类完成对本地服务的代理封装。然后将AbstractProxyInvoker实例以参数的方式传入InjvmProtocol协议类的export方法,生成InjvmExporter实例。
我们再接着看protocol.export()方法,这个方法主要是暴露本地服务,根据Wrapper扩展点加载机制加载ProtocolListenerWrapper和ListenerExporterWrapper两个Wrapper,然后依次调用ProtocolListenerWrapper→ListenerExporterWrapper→InjvmProtocol的export方法,最终返回的是包装了InjvmExporter实例的ListenerExporterWrapper实例,而ListenerExporterWrapper又实现了Exporter接口,如图3-25所示。
图3-25
exportLocal方法执行完后,返回到doExportUrlsFor1Protocol方法,继续判断scope值是否为local,如果不是则通过Invoker<? > invoker = proxyFactory.getInvoker()生成远程代理对象。与生成本地代理对象不同的是,AbstractProxyInvoker实例的URL内容不同,本地代理对象是以“dubbo://”协议开头的,而远程代理对象的URL是以“registry://”协议开头的,代表注册中心地址,URL示例如下所示。
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? applicati on=demo-provider&dubbo=2.0.0&export=dubbo%3A%2F%2F192.168.2.1%3A20880%2Fcom.alibaba.dubbo.demo.bid.BidService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3 D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.bid.BidService%26meth ods%3DthrowNPE%2Cbid%26organization%3Ddubbo%26owner%3Dprogrammer%26pid%3D28623%26se rialization%3Dhessian2%26side%3Dprovider%26timestamp%3D1516590241783&organization=d ubbo&owner=programmer&pid=28623®istry=zookeeper×tamp=1516590241747
生成Invoker实例后,通过Exporter<? > exporter = protocol.export(invoker)语句将Invoker以参数的方式传入ProtocolListenerWrapper类的export方法,代码如下所示。
public <T> Exporter<T> export(Invoker<T> invoker)throws RpcException { //如果invoker的URL协议是registry则直接调用RegistryProtocol.export方法 if(Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())){ return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY,Constants.PROVIDER)); }
在方法中判断URL协议类型,如果invoker.getUrl().getProtocol()的值是registry,那么就直接调用RegistryProtocol的export方法。
3.服务注册
服务注册主要是通过RegistryProtocol类的export方法来完成的:
public <T> Exporter<T> export(final Invoker<T> originInvoker)throws RpcException { //export Invoker //生成DubboExporter实例,并初始化和打开Dubbo协议连接 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider //获取ZookeeperRegistry注册实例 final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //使用ZookeeperRegistry向ZK注册数据提供者地址 registry.register(registedProviderUrl); //订阅override数据 //FIXME提供者订阅时,会影响同一JVM,即暴露服务,又引用同一服务的的场景,因为 //subscribed以服务名为缓存的key,导致订阅信息覆盖 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); //注册中心订阅overrideSubscribeUrl,当节点数据发生变化时会触发overrideSubscribeListener //的notify方法重新暴露服务 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>(){ public Invoker<T> getInvoker(){ return exporter.getInvoker(); } public void unexport(){ try { exporter.unexport(); } catch(Throwable t){ logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch(Throwable t){ logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch(Throwable t){ logger.warn(t.getMessage(), t); } } }; }
这段代码主要分为几步:
● 通过doLocalExport方法生成DubboExporter实例,初始化并且打开Dubbo协议服务连接。
● 获取ZooKeeperRegistry注册实例。
● 向ZooKeeper注册服务地址。
● 注册中心订阅overrideSubscribeUrl,当节点数据发生变化时会触发overrideSubscribeListener的notify方法重新暴露服务。
● 返回Exporter实例。
1)doLocalExport方法执行逻辑
代码如下所示:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T>originInvoker){ String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter =(ExporterChangeableWrapper<T>)bounds.get(key); if(exporter == null){ synchronized(bounds){ //先从缓存bounds中获取 exporter =(ExporterChangeableWrapper<T>)bounds.get(key); //如果没有则创建exporter,并放入缓存 if(exporter == null){ final Invoker<? > invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //通过DubboProtocol.export方法返回DubboExporter实例并强转为 //ExporterChangeableWrapper exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return(ExporterChangeableWrapper<T>)exporter; }
这个方法主要是返回ExporterChangeableWrapper对象,如果没有则通过DubboProtocol.export方法创建。
DubboProtocol.export方法的代码如下所示。
public <T> Exporter<T> export(Invoker<T> invoker)throws RpcException { URL url = invoker.getUrl(); // export service. //key是服务的全路径+端口号,比如com.alibaba.dubbo.demo.bid.BidService:20880 //客户端发起远程调用时,服务端通过key来决定调用哪个Exporter,也就是执行的 //Invoker String key = serviceKey(url); //创建DubboExporter对象,Invoker实际上就是真正的本地服务实现类实例 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //将key和exporter存入Map exporterMap.put(key, exporter); //export an stub service for dispaching event //是否支持本地存根 //服务提供者想在调用者上也执行部分逻辑,则设置此参数 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); //获取是否支持回调服务参数值,默认是false Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE,false); //判断是否支持存根事件,并且isCallbackservice不是回调服务 if(isStubSupportEvent && ! isCallbackservice){ //判断URL中是否有dubbo.stub.event.methods参数,如果有则将存根事件方法存 //入stubServiceMethodsMap String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if(stubServiceMethods == null || stubServiceMethods.length()== 0){ if(logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY)+ "], has set stubproxy support event , but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根据URL绑定IP与端口,建立NIO框架的Server openServer(url); // modified by lishen optimizeSerialization(url); return exporter; }
方法的前半部分用来判断是否支持本地存根,调用端在调用服务端的时候往往通过接口来进行调用,但是有时候服务端也想让调用端执行一些逻辑操作,这时候就需要用到本地存根,通过获取dubbo.stub.event参数判断当前是否支持存根,同时通过dubbo.stub.event.methods参数获取存根方法,如果有则将其存放到stubServiceMethodsMap中等待后续回调给客户端。
方法的后半部分主要是根据URL绑定IP与端口号,建立Netty的Server端,关于通信部分我们在后面章节中有详细介绍。最后执行完方法后将export对象返回到上层RegistryProtocol.export方法。
2)获取ZooKeeperRegistry注册实例
在RegistryProtocol.export中获取注册实例代码:final Registry registry = getRegistry(originInvoker)。其中getRegistry方法调用的是AbstractRegistryFactory中的getRegistry方法,代码如下:
public Registry getRegistry(URL url){ url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); //锁定注册中心获取过程,保证注册中心单一实例 LOCK.lock(); try { //从缓存中获取注册实例 Registry registry = REGISTRIES.get(key); if(registry ! = null){ return registry; } //如果缓存中没有注册实例,则创建一个 registry = createRegistry(url); if(registry == null){ throw new IllegalStateException("Can not create registry " + url); } //将新生成的注册实例加入缓存 REGISTRIES.put(key, registry); return registry; } finally { //释放锁 LOCK.unlock(); } }
createRegistry方法创建了ZooKeeperRegistry实例,在实例的构造方法中初始化了与ZooKeeper的连接,并将创建好的ZooKeeperRegistry实例缓存到REGISTRIES中,key就是服务的全路径名+Dubbo端口号,例子如下:
com.alibaba.dubbo.demo.bid.BidService:20880
3)向ZooKeeper注册服务地址
通过registry.register(registedProviderUrl)方法实现服务的注册,这个方法实际上调用了FailbackRegistry的register方法,关于FailbackRegistry的作用在前面已经介绍过了,register方法代码如下:
@Override public void register(URL url){ super.register(url); //从失败注册列表中删除注册URL failedRegistered.remove(url); //从失败取消请求列表中删除注册的URL failedUnregistered.remove(url); try { //向服务器端发送注册请求 doRegister(url); } catch(Exception e){ Throwable t = e; //如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if(check || skipFailback){ if(skipFailback){ t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress()+ ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause:" + t.getMessage(), t); } //将失败的注册请求记录到失败列表,定时重试 failedRegistered.add(url); } }
首先从注册失败列表和失败取消请求列表中删除注册的URL,然后执行doRegister方法向ZooKeeper注册中心注册服务。服务地址示例如下所示。
/dubbo/com.alibaba.dubbo.demo.bid.BidService/providers
服务注册需处理契约:
(1)当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。
(2)当URL设置了dynamic=false时,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。
(3)当URL设置了category=routers时,表示分类存储,默认类别为providers,可按分类部分通知数据。
(4)当注册中心重启、网络抖动时,不能丢失数据,包括断线自动删除数据。
(5)允许URI相同但参数不同的URL并存,不能覆盖。
4.注册中心订阅overrideSubscribeUrl
通过registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)方法订阅刚刚注册的provider服务,overrideSubscribeUrl示例如下所示。
provider://192.168.1.105:20880/com.alibaba.dubbo.demo.bid.BidService? anyhost=t rue&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generi c=false&interface=com.alibaba.dubbo.demo.bid.BidService&methods=throwNPE, bid&organi zation=dubbo&owner=programmer&pid=9489&serialization=hessian2&side=provider×ta mp=1518508037863
subscribe方法的代码如下所示。
@Override public void subscribe(URL url, NotifyListener listener){ super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { //向服务器端发送订阅请求 doSubscribe(url, listener); } catch(Exception e){ Throwable t = e; List<URL> urls = getCacheUrls(url); if(urls ! = null && urls.size()> 0){ notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " +urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home")+ "/dubbo-registry-" + url.getHost()+ ".cache")+ ", cause: " +t.getMessage(), t); } else { //如果开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if(check || skipFailback){ if(skipFailback){ t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ",cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry,cause: " + t.getMessage(), t); } } //将失败的订阅请求记录到失败列表,定时重试 addFailedSubscribed(url, listener); } }
订阅符合条件的已注册数据,当有注册数据变更时自动推送,并且会触发overrideSubscribeListener的notify方法重新暴露服务。订阅需处理契约:
(1)当URL设置了check=false时,订阅失败后不报错,在后台定时重试。
(2)当URL设置了category=routers时,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。
(3)允许以interface, group, version, classifier作为条件查询,如interface=com.alibaba.foo. BarService&version=1.0.0。
(4)查询条件允许星号通配,订阅有分组和版本的接口。
(5)当注册中心重启、网络抖动时,需要自动恢复订阅请求。
(6)允许URI相同但参数不同的URL并存,不能覆盖。
(7)阻塞订阅过程,等第一次通知完后再返回。
订阅条件不允许为空,如consumer://10.20.153.10/com.alibaba.foo.BarService? version=1.0.0&application=demo-provider。
Provider服务注册过程如图3-26所示。
图3-26
3.2.6 Dubbo的通信机制
Dubbo的remoting模块是远程通信模块,是Dubbo项目处理底层网络通信的层。图3-27显示了这一层中的类结构。
图3-27
可以看到在remoting包中还分为buffer、exchange、telnet和transport包。
● buffer:主要是针对NIO的Buffer做了一些封装。
● exchange:信息交换层,这也是整个通信过程的核心层,后面我们会详细介绍。
● telnet:主要是针对telnet提供编解码转换。
● transport:网络传输层(Transport),抽象Mina和Netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server和Codec等。在Dubbo中具体的传输功能实现都继承自Transporter接口,此接口只包含bind和connect两个方法接口。通过SPI的adaptive注解方式进行注解,默认为Netty。
1. Dubbo整体架构介绍
图3-28为Dubbo的整体通信流程。
图3-28
2. Transport网络传输层
Transport网络传输层主要包括两大部分,一个是基于Codec2的数据编码和解码,还有一个是基于Transport的数据传输封装。
图3-29展示了数据编码和解码的类结构。
● 从图中可以看出,AbstractCodec、ThriftCodec、CodecAdapter、DubboCountCodec和ThriftNativeCodec都实现了Codec2接口,而TransportCodec、TelnetCodec、ExchangeCodec和DubboCodec都继承了AbstractCodec。
● CodecAdapter是Codec2的适配器模式,通过内部的SPI机制加载指定的Codec2实现类。而后将CodecAdapter实例返回给AbstractClient构造方法,AbstractClient的实现类包括NettyClient、MinaClient和GrizzlyClient。
● DubboCountCodec:Dubbo的默认编码和解码实现类。
● TransportCodec:比较通用并且没有具体的协议编码类。
● ExchangeCodec:对request请求的编码和解码,对response响应的编码和解码。
● DubboCodec:对Dubbo的远程调用请求对象DecodeableRpcInvocation和请求返回结果对象DecodeableRpcResult进行编码/解码。
图3-29
图3-30显示的是Transporter数据传输封装对象,通过实现Transporter接口可以产生不同协议的实现类。
图3-30
Transporter接口的代码如下所示。
@SPI("netty") public interface Transporter { /** * Bind a server. * * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, Receiver, ChannelHandler) * @param url server url * @param handler * @return server * @throws RemotingException */ @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler)throws RemotingException; /** * Connect to a server. * * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, Receiver,ChannelListener) * @param url server url * @param handler * @return client * @throws RemotingException */ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler)throws RemotingException; }
通过代码可以看出接口使用了SPI,默认的实现类是NettyTransporter。bind方法是返回一个NettyServer对象,connect方法是返回一个NettyCient对象。
3. Exchange信息交换层的类图结构
类图结构如图3-31所示。
图3-31
● ReferenceCountExchangeClient:将请求交给HeaderExchangeClient处理。
● HeaderExchangeClient:提供心跳检查功能;将send、request、close等事件转由HeaderExchangeChannel处理。
● HeaderExchangeChannel:主要是完成同步转异步。在request(Object request, int timeout)方法中,将请求转换成Request对象,构建DefaultFuture对象,调用NIO框架对应的Client对象(默认选择NettyClient)的send方法将请求消息发送出去,返回DefultFuture对象。
● NettyClient:负责连接服务和完成消息的发送。
● HeaderExchangeServer:提供心跳检查功能;启动心跳监测线程池,该线程池初始化了一个线程,在线程中调用线程类HeartBeatTask进行心跳检查。
HeaderExchangeServer的类图结构如图3.32所示。
图3-32
HeartBeatTask处理心跳的规则:
(1)若通道最新的写入时间或最新的读取时间与当前时间相比,已经超过了心跳间隔时间,则发送心跳请求。
(2)如果通道最新的读取时间与当前时间相比,已经超过了心跳的超时时间,对于客户端来说则重连;对于服务端来说则关闭通道。
前面我们对信息交换层的类结构进行了简单的介绍,下面我们从DubboProtocol开始对整个过程做一个介绍。
1)provider服务端
通过export方法找到openServer方法,然后进入createServer方法,代码如下所示。
private ExchangeServer createServer(URL url){ //默认开启server,关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if(str ! = null && str.length()> 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion()?COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch(RemotingException e){ throw new RpcException("Fail to start server(url: " + url + ")" +e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if(str ! = null && str.length()> 0){ Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if(! supportedTypes.contains(str)){ throw new RpcException("Unsupported client type: " + str); } } return server; }
代码中首先通过url.addParameter方法为URL添加了channel.readonly.sent、heartbeat和codec参数,添加完的示例URL如下所示。
dubbo://192.168.0.70:20880/com.alibaba.dubbo.demo.bid.BidService? anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generi c=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.bid.BidService&methods=thr owNPE, bid&organization=dubbo&owner=programmer&pid=46252&serialization=hessian2&side=provider×tamp=1519381572341
接着进入Exchangers.bind(url, requestHandler)方法中,代码如下所示。
public static ExchangeServer bind(URL url, ExchangeHandler handler)throws RemotingException { if(url == null){ throw new IllegalArgumentException("url == null"); } if(handler == null){ throw new IllegalArgumentException("handler == null"); } //如果url中没有codec参数,则添加codec参数,参数值是exchange url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); }
getExchanger(url)方法中解析URL获取exchanger参数,如果没有获取则使用默认的header参数,然后通过SPI机制获取扩展的HeaderExchanger实例,再调用HeaderExchanger的bind方法,代码如下所示。
public ExchangeServer bind(URL url, ExchangeHandler handler)throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
这段代码连续使用了多个装饰模式,先是实例化一个HeaderExchangeHandler对象,然后将其包装到DecodeHandler对象中,再把DecodeHandler对象以参数的方式传递到Transporters.bind方法,bind方法会通过SPI机制调用NettyTransporter类的bind方法,并返回NettyServer对象。NettyTransporter.bind方法的代码如下所示。
public Server bind(URL url, ChannelHandler listener)throws RemotingException { return new NettyServer(url, listener); }
在NettyServer构造方法中会调用doOpen方法启动Netty服务,代码如下:
protected void doOpen()throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss,worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory(){ public ChannelPipeline getPipeline(){ //构造NettyCodecAdapter适配器,作用主要是初始化Dubbo传输信息的编码和解码 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(),NettyServer.this); //构造Netty通信管道 ChannelPipeline pipeline = Channels.pipeline(); //设置Netty的解码自定义类,自定义类从NettyCodecAdapter中获取 pipeline.addLast("decoder", adapter.getDecoder()); //设置Netty的编码码自定义类,自定义类从NettyCodecAdapter中获取 pipeline.addLast("encoder", adapter.getEncoder()); //设置事件处理自定义类,它会处理Netty的一系列事件 pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
这段代码主要是启动Netty服务来接收客户端传来的数据。我们重点来看如何为Channels. pipeline设置Handler处理链。
● 添加decoder为解码器,使用的是Netty中的SimpleChannelUpstreamHandler,也就是服务提供端收到消费端的请求的时候需要解码。
● encoder是编码器,使用的是Netty中的OneToOneEncoder,这个类实现了Channel-DownstreamHandler,从服务提供端发送给服务消费端的时候需要编码。
● handler:nettyHandler实现了ChannelUpstreamHandler、ChannelDownstreamHandler两个接口,上下的时候都需要处理。
执行顺序是:
● 收到服务消费者请求的时候会先执行decoder,然后执行nettyHandler。
● 发送给消费者的时候会先执行nettyHandler,然后执行encoder。
Dubbo服务端通信服务启动流程如图3-33所示。
图3-33
2)consumer消费端
在DubboProtocol中通过refer方法找到getClients(URL url)方法,代码如下:
private ExchangeClient[] getClients(URL url){ //是否共享连接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,则共享连接 if(connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for(int i = 0; i < clients.length; i++){ //如果设置为共享方式,则执行getSharedClient方法,否则执行initClient方法 if(service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
根据URL获取ExchangeClient对象,根据service_share_connect属性判断是否是共享模式,如果是则判断是否存在,如果存在则直接返回,不存在则创建新对象。不是共享模式就直接创建,service_share_connect默认是直接创建,initClient方法代码如下所示。
private ExchangeClient initClient(URL url){ // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible =(version ! = null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion()&&compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); //BIO存在严重性能问题,暂时不允许使用 if(str ! = null && str.length()> 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)){ throw new RpcException("Unsupported client type: " + str + ", " + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client ; try { //设置连接应该是lazy的 //如果lazy属性没有配置为true(我们没有配置,默认为false),则ExchangeClient会 //马上和服务端建立连接 if(url.getParameter(Constants.LAZY_CONNECT_KEY, false)){ client = new LazyConnectExchangeClient(url , requestHandler); } else { client = Exchangers.connect(url , requestHandler); } } catch(RemotingException e){ throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
代码的前半部分主要是获取关键参数,比如采用的通信协议是什么、Dubbo的版本号是什么等,同时为URL添加默认的编码和解码方式,以及设置heartbeat心跳时间参数。后半部分是和服务端建立通信连接,我们重点看Exchangers.connect(url , requestHandler)这个方法,代码如下:
public static ExchangeClient connect(URL url, ExchangeHandler handler)throws RemotingException { if(url == null){ throw new IllegalArgumentException("url == null"); } if(handler == null){ throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler); } public static Exchanger getExchanger(URL url){ String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type){ return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
根据SPI机制,最终返回的是HeaderExchanger对象,connect方法的代码如下:
//HeaderExchangeHandler包装 //DecodeHandler包装 //Transporters.connect包装 //最后返回一个HeaderExchangerClient,这里封装了client、channel、启动心跳的定时器等 public ExchangeClient connect(URL url, ExchangeHandler handler)throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
这个方法采用装饰者模式经过一系列封装,先将handler封装到HeaderExchangeHandler中,然后封装到DecodeHandler中,再通过Transporters.connect方法包装成Client对象,最后被统一包装成HeaderExchangeClient返回。Transporters.connect方法是根据SPI机制扩展获取的,这里默认使用NettyTransporter.connect,在NettyTransporter的connect方法中直接返回一个NettyClient对象。
在NettyClient的构造方法中主要有两个方法,分别是doOpen方法和connect方法,客户端和前面分析的服务端的doOpen方法内容很相似,这里不再介绍,我们来看connect方法中doConnect方法的代码。
protected void doConnect()throws Throwable { long start = System.currentTimeMillis(); //消费者端开始连接服务端 ChannelFuture future = bootstrap.connect(getConnectAddress()); try{ //等待连接请求结果 boolean ret = future.awaitUninterruptibly(getConnectTimeout(),TimeUnit.MILLISECONDS); //如果连接成功则获取通道Channel if(ret && future.isSuccess()){ Channel newChannel = future.getChannel(); newChannel.setInterestOps(Channel.OP_READ_WRITE); try { //关闭旧的连接 Channel oldChannel = NettyClient.this.channel; // copy reference if(oldChannel ! = null){ try { if(logger.isInfoEnabled()){ logger.info("Close old netty channel " + oldChannel + "on create new netty channel " + newChannel); } oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { if(NettyClient.this.isClosed()){ try { if(logger.isInfoEnabled()){ logger.info("Close new netty channel " + newChannel + ",because the client closed."); } newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } //如果没有连接成功,则获取错误原因并抛出异常 else if(future.getCause()! = null){ throw new RemotingException(this, "client(url: " + getUrl()+")failed to connect to server " + getRemoteAddress()+ ", error message is:" + future.getCause().getMessage(), future.getCause()); } else { throw new RemotingException(this, "client(url: " + getUrl()+")failed to connect to server " + getRemoteAddress()+ " client-side timeout " + getConnectTimeout()+ "ms(elapsed: " +(System.currentTimeMillis()- start)+ "ms)from netty client " + NetUtils.getLocalHost()+ " using dubbo version " +Version.getVersion()); } }finally{ if(! isConnected()){ future.cancel(); } } }
这段代码尝试连接服务端,如果返回成功则正常返回,否则获取错误原因并抛出异常。
Dubbo消费端通信服务启动流程如图3-34所示。
图3-34
3)同步调用的实现
我们在使用Netty进行消息通信的时候,ChannelHandler的send方法只负责不断地发送消息,而received方法只负责不断地接收消息,整个过程是异步的。我们在实际使用Dubbo进行通信的时候感受到的往往是同步的过程,客户端发送消息然后得到返回结果,这个过程是如何实现的呢?
我们来看Dubbo的执行类DubboInvoker.doInvoker方法,代码如下:
protected Result doInvoke(final Invocation invocation)throws Throwable { RpcInvocation inv =(RpcInvocation)invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; //判断clients客户端数量,如果长度为1,则直接返回第一个,否则自增值与client数量 //做Hash选取 if(clients.length == 1){ currentClient = clients[0]; } else { //自增值与client数量做Hash选取 currentClient = clients[index.getAndIncrement()% clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); //不需要返回值则直接发送 if(isOneway){ boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } //如果isAsync是异步的 else if(isAsync){ ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } //如果isAsync是同步的 else { RpcContext.getContext().setFuture(null); return(Result)currentClient.request(inv, timeout).get(); } } catch(TimeoutException e){ throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName()+ ", provider: " + getUrl()+", cause: " + e.getMessage(), e); } catch(RemotingException e){ throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName()+ ", provider: " + getUrl()+ ", cause:" + e.getMessage(), e); } }
如果不需要返回值则直接使用send方法将信息发送出去。
如果需要异步通信(isAsync为true),则使用request方法构建一个ResponseFuture,然后将ResponseFuture封装成FutureAdapter,再绑定RpcContext中。
如果需要同步通信(isAsync为false),则使用request方法构建一个ResponseFuture,阻塞等待请求完成。
在异步通信的情况下,代码中一个关键的类是ResponseFuture,将其和当前线程绑定在RpcContext对象中,如果我们要获取异步结果,则需要通过RpcContext来获取当前线程绑定的RpcContext,然后就可以获取Future对象了。
我们重点来看同步通信的过程,currentClient.request方法最终请求的是HeaderExchange-Channel.request方法,代码如下所示。
public ResponseFuture request(Object request, int timeout)throws RemotingException { if(closed){ throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed! "); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch(RemotingException e){ future.cancel(); throw e; } return future; }
这个过程首先创建一个Request请求对象,将请求消息作为Data值并创建了唯一标识ID,然后创建DefaultFuture对象并且在初始化的过程中,将自身this对象及channel对象存入全局变量DefaultFuture,最终通过get方法阻塞等待DefaultFuture得到服务端返回值,而get阻塞返回的条件又是由received方法触发的。下面我们来看一下DefaultFuture中的两个核心方法,分别是get和received方法。
public Object get(int timeout)throws RemotingException { if(timeout <= 0){ timeout = Constants.DEFAULT_TIMEOUT; } //如果当前没有返回值则进入 if(! isDone()){ long start = System.currentTimeMillis(); lock.lock(); try { //循环等待isDone的状态 while(! isDone()){ //阻塞等待,如果达到timout毫秒则继续执行 done.await(timeout, TimeUnit.MILLISECONDS); //判断当前状态是否得到返回值,或者判断阻塞时间是否超时,如果符合条件则 //退出 if(isDone()|| System.currentTimeMillis()- start > timeout){ break; } } } catch(InterruptedException e){ throw new RuntimeException(e); } finally { lock.unlock(); } if(! isDone()){ throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
循环判断isDone方法的状态,如果得到返回值则返回true,否则返回false,当返回false的时候则阻塞等待timeout毫秒时间,timeout默认是1秒,然后继续判断isDone方法状态是否得到返回值和阻塞时间是否超时。
有两种情况会唤醒该get方法:
● 收到响应消息并调用received方法,根据响应消息中返回的ID在ConcurrentHashMap里面使用get(ID)方法获取DefaultFuture对象,然后更新该对象的Response变量的值。
● RemotingInvocationTimeoutScan线程定时扫描响应是否超时,如果超时,则从ConcurrentHashMap对象中删掉缓存的Future对象并将Response变量设置为超时信息。
public static void received(Channel channel, Response response){ try { //从缓存中根据ID获取DefaultFuture对象,并且从FUTURES中删除 DefaultFuture future = FUTURES.remove(response.getId()); if(future ! = null){ //执行该方法为response对象赋值,触发isDone方法状态变更 future.doReceived(response); } else { logger.warn("The timeout response finally returned at " +(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res){ lock.lock(); try { //为response对象赋值,触发isDone方法状态 response = res; if(done ! = null){ done.signal(); } } finally { lock.unlock(); } if(callback ! = null){ invokeCallback(callback); } }
通过HeaderExchangeHandler对象的handleResponse方法收到响应请求后,调用DefaultFuture.received方法,从缓存中根据ID获取DefaultFuture对象,然后从FUTURES中删除,再将响应结果赋值给response对象从而触发isDone方法的状态变更。
图3-35展示了Dubbo同步和异步的调用过程。
图3-35
Invoker<? > getInvoker(Channel channel, Invocation inv)throws RemotingException{ boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); //如果是客户端的回调服务 isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY)); if(isStubServiceInvoke){ //获取客户端的端口 port = channel.getRemoteAddress().getPort(); } //callback isCallBackServiceInvoke = isClientSide(channel)&& ! isStubServiceInvoke; if(isCallBackServiceInvoke){ path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE,Boolean.TRUE.toString()); } //根据客户端的端口、服务名、版本号及所属组生成serviceKey String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); //根据serviceyKey获取缓存在exporterMap中的DubboExporter DubboExporter<? > exporter =(DubboExporter<? >)exporterMap.get(serviceKey); if(exporter == null) throw new RemotingException(channel, "Not found exported service: " +serviceKey + " in " + exporterMap.keySet()+ ", may be version or group mismatch " +", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " +channel.getLocalAddress()+ ", message:" + inv); return exporter.getInvoker(); }