项目
博客
文档
归档
资源链接
关于我
项目
博客
文档
归档
资源链接
关于我
Dubbo 源码分析 —— 服务暴露(二)之远程暴露(Dubbo)
2020-11-10
·
芋道源码
·
转载
·
Dubbo
·
本文共 4,114个字,预计阅读需要 14分钟。
> `转载 `于【[芋道源码](http://svip.iocoder.cn/)】 > 本文基于 Dubbo 2.6.1 版本,望知悉。 ## 1. 概述 在 [《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》](http://svip.iocoder.cn/Dubbo/service-export-local/?self) 一文中,我们已经分享了**本地暴露服务**。在本文中,我们来分享**远程暴露服务**。在 Dubbo 中提供多种协议( Protocol ) 的实现,大体流程一致,本文以 [Dubbo Protocol](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/dubbo.html) 为例子,这也是 Dubbo 的**默认**协议。 如果不熟悉该协议的同学,可以先看看 [《Dubbo 使用指南 —— dubbo://》](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/dubbo.html) ,简单了解即可。 > **特性** > > 缺省协议,使用基于 mina `1.1.7` 和 hessian `3.2.1` 的 remoting 交互。 > > - 连接个数:单连接 > - 连接方式:长连接 > - 传输协议:TCP > - 传输方式:NIO 异步传输 > - 序列化:Hessian 二进制序列化 > - 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。 > - 适用场景:常规远程服务方法调用 相比**本地暴露**,**远程暴露**会多做如下几件事情: - 启动通信服务器,绑定服务端口,提供远程调用。 - 向注册中心注册服务提供者,提供服务消费者从注册中心发现服务。 ## 2. 远程暴露 远程暴露服务的顺序图如下: [![远程暴露顺序图](http://static2.iocoder.cn/images/Dubbo/2018_03_10/02.png)](http://static2.iocoder.cn/images/Dubbo/2018_03_10/02.png)远程暴露顺序图 在 [`#doExportUrlsFor1Protocol(protocolConfig, registryURLs)`](https://github.com/YunaiV/dubbo/blob/c635dd1990a1803643194048f408db310f06175b/dubbo-config/dubbo-config-api/src/main/java/com/alibaba/dubbo/config/ServiceConfig.java#L621-L648) 方法中,涉及**远程暴露服务**的代码如下: ``` 1: // 服务远程暴露 2: // export to remote if the config is not local (export to local only when config is local) 3: if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 4: if (logger.isInfoEnabled()) { 5: logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); 6: } 7: if (registryURLs != null && !registryURLs.isEmpty()) { 8: for (URL registryURL : registryURLs) { 9: // "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。 10: url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); 11: // 获得监控中心 URL 12: URL monitorUrl = loadMonitor(registryURL); // TODO 芋艿,监控 13: if (monitorUrl != null) { 14: url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 15: } 16: if (logger.isInfoEnabled()) { 17: logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); 18: } 19: // 使用 ProxyFactory 创建 Invoker 对象 20: Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 21: 22: // 创建 DelegateProviderMetaDataInvoker 对象 23: DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 24: 25: // 使用 Protocol 暴露 Invoker 对象 26: Exporter> exporter = protocol.export(wrapperInvoker); 27: // 添加到 `exporters` 28: exporters.add(exporter); 29: } 30: } else { // 用于被服务消费者直连服务提供者,参见文档 http://dubbo.apache.org/zh-cn/docs/user/demos/explicit-target.html 。主要用于开发测试环境使用。 31: // 使用 ProxyFactory 创建 Invoker 对象 32: Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); 33: 34: // 创建 DelegateProviderMetaDataInvoker 对象 35: DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 36: 37: // 使用 Protocol 暴露 Invoker 对象 38: Exporter> exporter = protocol.export(wrapperInvoker); 39: // 添加到 `exporters` 40: exporters.add(exporter); 41: } 42: } ``` - 第 30 至 41 行:**大体和【第 7 至 29 行】逻辑相同**。差别在于,当配置注册中心为 `"N/A"` 时,表示即使远程暴露服务,也不向注册中心注册。这种方式用于被服务消费者直连服务提供者,参见 [《Dubbo 用户指南 —— 直连提供者》](http://dubbo.apache.org/zh-cn/docs/user/demos/explicit-target.html) 文档。 > 在**开发及测试环境下**,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表。 - 第 8 行:循环祖册中心 URL 数组 `registryURLs` 。 - 第 10 行:`"dynamic"` 配置项,服务是否动态注册。如果设为 **false** ,注册后将显示后 **disable** 状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。 - 第 12 行:调用#loadMonitor(registryURL)方法,获得监控中心 URL 。 - 🙂 在 [「2.1 loadMonitor」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 小节,详细解析。 - 第 13 至 15 行:调用 [`URL#addParameterAndEncoded(key, value)`](https://github.com/YunaiV/dubbo/blob/c635dd1990a1803643194048f408db310f06175b/dubbo-common/src/main/java/com/alibaba/dubbo/common/URL.java#L891-L896) 方法,将监控中心的 URL 作为 `"monitor"` 参数添加到服务提供者的 URL 中,**并且需要编码**。通过这样的方式,服务提供者的 URL 中,**包含了监控中心的配置**。 - 第 20 行:调用 [`URL#addParameterAndEncoded(key, value)`](https://github.com/YunaiV/dubbo/blob/c635dd1990a1803643194048f408db310f06175b/dubbo-common/src/main/java/com/alibaba/dubbo/common/URL.java#L891-L896) 方法,将服务体用这的 URL 作为 `"export"` 参数添加到注册中心的 URL 中。通过这样的方式,注册中心的 URL 中,**包含了服务提供者的配置**。 - 第 20 行:调用ProxyFactory#getInvoker(proxy, type, url)方法,创建 Invoker 对象。该 Invoker 对象,执行#invoke(invocation)方法时,内部会调用 Service 对象(ref)对应的调用方法。 - 🙂 详细的实现,后面单独写文章分享。 - 😈 为什么传递的是**注册中心的 URL** 呢?下文会详细解析。 - 第 23 行:创建 [`com.alibaba.dubbo.config.invoker.DelegateProviderMetaDataInvoker`](https://github.com/YunaiV/dubbo/blob/c635dd1990a1803643194048f408db310f06175b/dubbo-config/dubbo-config-api/src/main/java/com/alibaba/dubbo/config/invoker/DelegateProviderMetaDataInvoker.java) 对象。该对象在 Invoker 对象的基础上,增加了当前服务提供者 ServiceConfig 对象。 - 第 26 行:调用 `Protocol#export(invoker)` 方法,暴露服务。 - 此处 Dubbo SPI **自适应**的特性的**好处**就出来了,可以**自动**根据 URL 参数,获得对应的拓展实现。例如,`invoker` 传入后,根据 `invoker.url` 自动获得对应 Protocol 拓展实现为 DubboProtocol 。 - 实际上,Protocol 有两个 Wrapper 拓展实现类: ProtocolFilterWrapper、ProtocolListenerWrapper 。所以,`#export(...)` 方法的调用顺序是: - **Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol** - => - **Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol** - 也就是说, 这一条大的调用链,包含两条小的调用链 。原因是: - 首先,传入的是注册中心的 URL ,通过 Protocol$Adaptive 获取到的是 RegistryProtocol 对象。 - 其次,RegistryProtocol 会在其 `#export(...)` 方法中,使用服务提供者的 URL ( 即注册中心的 URL 的 `export` 参数值),再次调用 Protocol$Adaptive 获取到的是 DubboProtocol 对象,进行服务暴露。 - **为什么是这样的顺序**?通过这样的顺序,可以实现类似 **AOP** 的效果,在本地服务器启动完成后,再向注册中心注册。伪代码如下: ``` RegistryProtocol#export(...) { // 1. 启动本地服务器 DubboProtocol#export(...); // 2. 向注册中心注册。 } ``` - 这也是为什么上文提到的 “为什么传递的是**注册中心的 URL** 呢?” 的原因。 - 🙂 如果无法理解,在 [「3. Protocol」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 在解析代码,进一步理顺。 - 第 28 行:添加到 `exporters` 集合中。 ### 2.1 loadMonitor > 友情提示,监控中心不是本文的重点,简单分享下该方法的逻辑。 > 可直接跳过。 `#loadMonitor(registryURL)` 方法,加载监控中心 `com.alibaba.dubbo.common.URL` 数组。代码如下: ``` 1: /** 2: * 加载监控中心 URL 3: * 4: * @param registryURL 注册中心 URL 5: * @return 监控中心 URL 6: */ 7: protected URL loadMonitor(URL registryURL) { 8: // 从 属性配置 中加载配置到 MonitorConfig 对象。 9: if (monitor == null) { 10: String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address"); 11: String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol"); 12: if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) { 13: return null; 14: } 15: 16: monitor = new MonitorConfig(); 17: if (monitorAddress != null && monitorAddress.length() > 0) { 18: monitor.setAddress(monitorAddress); 19: } 20: if (monitorProtocol != null && monitorProtocol.length() > 0) { 21: monitor.setProtocol(monitorProtocol); 22: } 23: } 24: appendProperties(monitor); 25: // 添加 `interface` `dubbo` `timestamp` `pid` 到 `map` 集合中 26: Map
map = new HashMap
(); 27: map.put(Constants.INTERFACE_KEY, MonitorService.class.getName()); 28: map.put("dubbo", Version.getVersion()); 29: map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 30: if (ConfigUtils.getPid() > 0) { 31: map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 32: } 33: // 将 MonitorConfig ,添加到 `map` 集合中。 34: appendParameters(map, monitor); 35: // 获得地址 36: String address = monitor.getAddress(); 37: String sysaddress = System.getProperty("dubbo.monitor.address"); 38: if (sysaddress != null && sysaddress.length() > 0) { 39: address = sysaddress; 40: } 41: // 直连监控中心服务器地址 42: if (ConfigUtils.isNotEmpty(address)) { 43: // 若不存在 `protocol` 参数,默认 "dubbo" 添加到 `map` 集合中。 44: if (!map.containsKey(Constants.PROTOCOL_KEY)) { 45: if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) { 46: map.put(Constants.PROTOCOL_KEY, "logstat"); 47: } else { 48: map.put(Constants.PROTOCOL_KEY, "dubbo"); 49: } 50: } 51: // 解析地址,创建 Dubbo URL 对象。 52: return UrlUtils.parseURL(address, map); 53: // 从注册中心发现监控中心地址 54: } else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) { 55: return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)); 56: } 57: return null; 58: } ``` - 第 8 至 24 行:从**属性配置**中,加载配置到 MonitorConfig 对象。 - 第 25 至 32 行:添加 `interface` `dubbo` `timestamp` `pid` 到 `map` 集合中。 - 第 34 行:调用 `#appendParameters(map, config)` 方法,将 MonitorConfig ,添加到 `map` 集合中。 - 第 35 至 40 行:获得**监控中心**的地址。 - 第 42 行:当address非空时,直连监控中心服务器地址的情况。 - 第 43 至 50 行:若不存在protocol参数,缺省默认为 “dubbo” ,并添加到map集合中。 - 第 44 至 55 行:**可以忽略**。因为,`logstat` 这个拓展实现已经不存在。 - 第 52 行:调用`UrlUtils#parseURL(address, map)`方法,解析address,创建 Dubbo URL 对象。 - 🙂 已经添加了代码注释,胖友点击链接查看。 - 第 54 至 56 行:当protocol = registry时,并且注册中心 URL 非空时,从注册中心发现监控中心地址。以registryURL为基础,创建 URL : - `protocol = dubbo` - `parameters.protocol = registry` - `parameters.refer = map` - 第 57 行:**无注册中心**,返回空。 - ps :后续会有文章,详细分享。 ## 3. Protocol 本文涉及的 Protocol 类图如下: [![Protocol 类图](http://static2.iocoder.cn/images/Dubbo/2018_03_10/04.png)](http://static2.iocoder.cn/images/Dubbo/2018_03_10/04.png)Protocol 类图 ### 3.1 ProtocolFilterWrapper 接 [《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「 3.2 ProtocolFilterWrapper」](http://svip.iocoder.cn/Dubbo/service-export-local/?self) 小节。 `#export(invoker)` 方法,代码如下: ``` 1: public
Exporter
export(Invoker
invoker) throws RpcException { 2: // 注册中心 3: if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 4: return protocol.export(invoker); 5: } 6: // 建立带有 Filter 过滤链的 Invoker ,再暴露服务。 7: return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); 8: } ``` - 第 2 至 5 行:当 `invoker.url.protocl = registry` ,**注册中心的 URL** ,无需创建 Filter 过滤链。 - 第 7 行:调用 `#buildInvokerChain(invoker, key, group)` 方法,创建带有 Filter 过滤链的 Invoker 对象。 - 第 7 行:调用 `protocol#export(invoker)` 方法,继续暴露服务。 - 在 RegistryProtocol 中,会调用 `DubboProtocol#export(...)` 方法时,会走【**第 7 行**】的流程。 ### 3.2 RegistryProtocol [`com.alibaba.dubbo.registry.integration.RegistryProtocol`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/integration/RegistryProtocol.java) ,实现 Protocol 接口,注册中心协议实现类。 #### 3.2.1 属性 属性相关,代码如下: > 友情提示,仅包含本文涉及的属性。 ``` // ... 省略部分和本文无关的属性。 /** * 单例。在 Dubbo SPI 中,被初始化,有且仅有一次。 */ private static RegistryProtocol INSTANCE; /** * 绑定关系集合。 * * key:服务 Dubbo URL */ // To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. // 用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露 // providerurl <--> exporter private final Map
> bounds = new ConcurrentHashMap
>(); /** * Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。 */ private Protocol protocol; /** * RegistryFactory 自适应拓展实现类,通过 Dubbo SPI 自动注入。 */ private RegistryFactory registryFactory; public RegistryProtocol() { INSTANCE = this; } public static RegistryProtocol getRegistryProtocol() { if (INSTANCE == null) { ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); // load } return INSTANCE; } ``` - INSTANCE静态属性,单例。通过 Dubbo SPI 加载创建,有且仅有一次。 - `#getRegistryProtocol()` **静态**方法,获得单例。 - `bounds` 属性,绑定关系集合。其中,Key 为**服务提供者 URL** 。 - `protocol` 属性,Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。 - registryFactory属性,自适应拓展实现类,通过 Dubbo SPI 自动注入。 - 用于创建注册中心 Registry 对象。 #### 3.2.2 export 本文涉及的 `#export(invoker)` 方法,代码如下: ``` 1: public
Exporter
export(final Invoker
originInvoker) throws RpcException { 2: // 暴露服务 3: // export invoker 4: final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker); 5: 6: // 获得注册中心 URL 7: URL registryUrl = getRegistryUrl(originInvoker); 8: 9: // 获得注册中心对象 10: // registry provider 11: final Registry registry = getRegistry(originInvoker); 12: 13: // 获得服务提供者 URL 14: final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 15: 16: //to judge to delay publish whether or not 17: boolean register = registedProviderUrl.getParameter("register", true); 18: 19: // 向本地注册表,注册服务提供者 20: ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); 21: 22: // 向注册中心注册服务提供者(自己) 23: if (register) { 24: register(registryUrl, registedProviderUrl); 25: ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); // 标记向本地注册表的注册服务提供者,已经注册 26: } 27: 28: // 使用 OverrideListener 对象,订阅配置规则 29: // Subscribe the override data 30: // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. 31: final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 32: final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 33: overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 34: registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 35: //Ensure that a new exporter instance is returned every time export 36: return new DestroyableExporter
(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); 37: } ``` - 第 4 行:调用 `#doLocalExport(invoker)` 方法,暴露服务。 - 第 7 行:调用 `#getRegistryUrl(originInvoker)` 方法,获得注册中心 URL 。代码如下: ``` /** * 获得注册中心 URL * * @param originInvoker 原始 Invoker * @return URL */ private URL getRegistryUrl(Invoker> originInvoker) { URL registryUrl = originInvoker.getUrl(); if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { // protocol String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } return registryUrl; } ``` - 该过程是我们在 [《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「2.1 loadRegistries」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 的那张图的反向流程,即**红线部分** :[![getRegistryUrl](http://static2.iocoder.cn/images/Dubbo/2018_03_10/01.png)](http://static2.iocoder.cn/images/Dubbo/2018_03_10/01.png)getRegistryUrl - 第 11 行:获得注册中心对象。 - 第 14 行:调用 `#getRegistedProviderUrl(originInvoker)` 方法,获得服务提供者 URL 。代码如下: ``` private URL getRegistedProviderUrl(final Invoker> originInvoker) { // 从注册中心的 export 参数中,获得服务提供者的 URL URL providerUrl = getProviderUrl(originInvoker); //The address you see at the registry return providerUrl.removeParameters(getFilteredKeys(providerUrl)) // 移除 .hide 为前缀的参数 .removeParameter(Constants.MONITOR_KEY) // monitor .removeParameter(Constants.BIND_IP_KEY) // bind.ip .removeParameter(Constants.BIND_PORT_KEY) // bind.port .removeParameter(QOS_ENABLE) // qos.enable .removeParameter(QOS_PORT) // qos.port .removeParameter(ACCEPT_FOREIGN_IP); // qos.accept.foreign.ip } private URL getProviderUrl(final Invoker> origininvoker) { String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); // export if (export == null || export.length() == 0) { throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl()); } return URL.valueOf(export); } private static String[] getFilteredKeys(URL url) { Map
params = url.getParameters(); if (params != null && !params.isEmpty()) { List
filteredKeys = new ArrayList
(); for (Map.Entry
entry : params.entrySet()) { if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) { filteredKeys.add(entry.getKey()); } } return filteredKeys.toArray(new String[filteredKeys.size()]); } else { return new String[]{}; } } ``` - **重点**,从注册中心的 URL 中获得 `export` 参数对应的值,即服务提供者的 URL 。 - 移除**多余**的参数。因为,这些参数注册到注册中心没有实际的用途。 - 第 17 行:配置项 `register` ,服务提供者是否注册到配置中心。 - 第 20 行:调用ProviderConsumerRegTable#registerProvider(invoker, registryUrl)方法,向本地注册表,注册服务提供者。 - 在 [《精尽 Dubbo 源码分析 —— 注册中心(一)之抽象 API》「5. ProviderConsumerRegTable」 ](http://svip.iocoder.cn/Dubbo/registry-api/?self),有详细解析。 - 第 24 行:调用 `#register(registryUrl, registedProviderUrl)` 方法,向注册中心注册服务提供者(自己)。代码如下: ``` public void register(URL registryUrl, URL registedProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); } ``` - 调用 `RegistryService#register(url)` 方法,在 [《精尽 Dubbo 源码分析 —— 注册中心(一)之抽象 API》「3. RegistryService」 ](http://svip.iocoder.cn/Dubbo/registry-api/?self),有详细解析。 - 第 25 行:标记向本地注册表的注册服务提供者,已经注册。 - 在 [《精尽 Dubbo 源码分析 —— 注册中心(一)之抽象 API》「5. ProviderConsumerRegTable」 ](http://svip.iocoder.cn/Dubbo/registry-api/?self),有详细解析。 - 第 28 至 34 行:使用 OverrideListener 对象,订阅配置规则。详细解析,见 [《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》](http://svip.iocoder.cn/Dubbo/cluster-6-impl-configurator?self) 中。 - 第 36 行:创建 DestroyableExporter 对象。 #### 3.2.3 doLocalExport `#doLocalExport()` 方法,暴露服务。**此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思**。代码如下: ``` 1: /** 2: * 暴露服务。 3: * 4: * 此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。 5: * 6: * @param originInvoker 原始 Invoker 7: * @param
泛型 8: * @return Exporter 对象 9: */ 10: @SuppressWarnings("unchecked") 11: private
ExporterChangeableWrapper
doLocalExport(final Invoker
originInvoker) { 12: // 获得在 `bounds` 中的缓存 Key 13: String key = getCacheKey(originInvoker); 14: // 从 `bounds` 获得,是不是已经暴露过服务 15: ExporterChangeableWrapper
exporter = (ExporterChangeableWrapper
) bounds.get(key); 16: if (exporter == null) { 17: synchronized (bounds) { 18: exporter = (ExporterChangeableWrapper
) bounds.get(key); 19: // 未暴露过,进行暴露服务 20: if (exporter == null) { 21: // 创建 Invoker Delegate 对象 22: final Invoker> invokerDelegete = new InvokerDelegete
(originInvoker, getProviderUrl(originInvoker)); 23: // 暴露服务,创建 Exporter 对象 24: // 使用 创建的Exporter对象 + originInvoker ,创建 ExporterChangeableWrapper 对象 25: exporter = new ExporterChangeableWrapper
((Exporter
) protocol.export(invokerDelegete), originInvoker); 26: // 添加到 `bounds` 27: bounds.put(key, exporter); 28: } 29: } 30: } 31: return exporter; 32: } ``` - 第 13 行:调用 `#getCacheKey(originInvoker)` 方法,获得在 `bounds` 中的缓存 Key 。代码如下: ``` /** * Get the key cached in bounds by invoker * * 获 取invoker 在 bounds中 缓存的key * * @param originInvoker 原始 Invoker * @return url 字符串 */ private String getCacheKey(final Invoker> originInvoker) { URL providerUrl = getProviderUrl(originInvoker); return providerUrl.removeParameters("dynamic", "enabled").toFullString(); } ``` - 第 14 至 18 行:从 `bounds` 中,获得已经暴露过的 ExporterChangeableWrapper 对象。 - 第 20 至 28 行:未暴露过,进行暴露服务。 - 第 22 行:调用 `#getProviderUrl(originInvoker)` 方法,获得服务提供者的 URL 。 - 第 22 行:创建`com.alibaba.dubbo.registry.integration.RegistryProtocol.InvokerDelegete`对象。 - InvokerDelegete 实现 [`com.alibaba.dubbo.rpc.protocol.InvokerWrapper`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/InvokerWrapper.java) 类,主要增加了 `#getInvoker()` 方法,获得真实的,非 InvokerDelegete 的 Invoker 对象。因为,可能会存在 `InvokerDelegete.invoker` 也是 InvokerDelegete 类型的情况。 - 第 25 行:调用DubboProtocol#export(invoker)方法,暴露服务,返回 Exporter 对象。 - 在 [「4.3 DubboProtocol」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 中,详细分享。 - 🙂 若此若是**其他**协议,若调用对应协议的 `XXXProtocol#export(invoker)` 方法。 - 第 25 行:使用【创建的 Exporter 对象】+【originInvoker】,创建 ExporterChangeableWrapper 对象。这样,originInvoker就和 Exporter 对象,形成了绑定的关系。 - 🙂 ExporterChangeableWrapper 在 [「4.1 ExporterChangeableWrapper」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 看详细代码。 - 第 27 行:添加到 `bounds` 。 ### 3.3 DubboProtocol [`com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboProtocol.java) ,实现 AbstractProtocol 抽象类,Dubbo 协议实现类。 ### 3.3.1 属性 属性相关,代码如下: > 友情提示,仅包含本文涉及的属性。 ``` // ... 省略部分和本文无关的属性。 /** * 通信服务器集合 * * key: 服务器地址。格式为:host:port */ private final Map
serverMap = new ConcurrentHashMap
(); //
``` - `serverMap` 属性,通信服务器集合。其中,Key 为**服务器地址**,格式为 `host:port`。 #### 3.3.2 export 本文涉及的 `#export(invoker)` 方法,代码如下: ``` 1: public
Exporter
export(Invoker
invoker) throws RpcException { 2: URL url = invoker.getUrl(); 3: 4: // 创建 DubboExporter 对象,并添加到 `exporterMap` 。 5: // export service. 6: String key = serviceKey(url); 7: DubboExporter
exporter = new DubboExporter
(invoker, key, exporterMap); 8: exporterMap.put(key, exporter); 9: 10: // TODO 【8033 参数回调】 11: //export an stub service for dispatching event 12: Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); 13: Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); 14: if (isStubSupportEvent && !isCallbackservice) { 15: String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); 16: if (stubServiceMethods == null || stubServiceMethods.length() == 0) { 17: if (logger.isWarnEnabled()) { 18: logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + 19: "], has set stubproxy support event ,but no stub methods founded.")); 20: } 21: } else { 22: stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); 23: } 24: } 25: 26: // 启动服务器 27: openServer(url); 28: 29: // 初始化序列化优化器 30: optimizeSerialization(url); 31: return exporter; 32: } ``` - 第 6 行:调用 [`#serviceKey(url)`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java#L45-L47) 方法,获得服务键。该方法从父类继承而来。 - 第 7 行:创建 DubboExporter 对象。 - 🙂 在 [「4.3 DubboExporter」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 详细解析。 - 第 8 行:添加到 [`exporterMap`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProtocol.java#L40) 中。该属性从父类继承而来。 - 第 10 至 24 行:TODO 【8033 参数回调】 - 第 27 行:调用 `#openServer(url)` 方法,启动服务器。 - 第 30 行:调用 `#optimizeSerialization(url)` 方法,初始化序列化优化器。在 [《精尽 Dubbo 源码分析 —— 序列化(一)之总体实现》](http://svip.iocoder.cn/Dubbo/serialize-1-all?self) 中,详细解析。 #### 3.3.3 openServer > 友情提示:本小节的内容,胖友先看过 [《精尽 Dubbo 源码分析 —— NIO 服务器》](http://svip.iocoder.cn/Dubbo/remoting-api-interface/?self) 所有的文章。 `#openServer(url)` 方法,启动通信服务器。代码如下: ``` /** * 通信客户端集合 * * key: 服务器地址。格式为:host:port */ private final Map
referenceClientMap = new ConcurrentHashMap
(); //
1: /** 2: * 启动服务器 3: * 4: * @param url URL 5: */ 6: private void openServer(URL url) { 7: // find server. 8: String key = url.getAddress(); 9: //client can export a service which's only for server to invoke 10: boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver 11: if (isServer) { 12: ExchangeServer server = serverMap.get(key); 13: if (server == null) { 14: serverMap.put(key, createServer(url)); 15: } else { 16: // server supports reset, use together with override 17: server.reset(url); 18: } 19: } 20: } ``` - 第 8 行:获得服务器地址。 - 第 10 行:配置项 `isserver` ,可以暴露一个仅当前 JVM 可调用的服务。目前该配置项已经不存在。 - 第 12 行:从 `serverMap` 获得对应服务器地址已存在的通信服务器。即,**不重复创建**。 - 第 13 至 14 行:通信服务器不存在,调用 `#createServer(url)` 方法,创建服务器。 - 第 15 至 18 行:通信服务器已存在,调用Server#reset(url)方法,重置服务器的属性。 - 为什么**会存在**呢?因为键是 `host:port` ,那么例如,多个 Service 共用同一个 Protocol ,服务器是同一个对象。 #### 3.3.4 createServer `#createServer(url)` 方法,创建并启动通信服务器。代码如下: ``` 1: private ExchangeServer createServer(URL url) { 2: // 默认开启 server 关闭时发送 READ_ONLY 事件 3: // send readonly event when server closes, it's enabled by default 4: url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); 5: // 默认开启 heartbeat 6: // enable heartbeat by default 7: url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 8: 9: // 校验 Server 的 Dubbo SPI 拓展是否存在 10: String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); 11: if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { 12: throw new RpcException("Unsupported server type: " + str + ", url: " + url); 13: } 14: 15: // 设置编解码器为 `"Dubbo"` 16: url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); 17: 18: // 启动服务器 19: ExchangeServer server; 20: try { 21: server = Exchangers.bind(url, requestHandler); 22: } catch (RemotingException e) { 23: throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); 24: } 25: 26: // 校验 Client 的 Dubbo SPI 拓展是否存在 27: str = url.getParameter(Constants.CLIENT_KEY); 28: if (str != null && str.length() > 0) { 29: Set
supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 30: if (!supportedTypes.contains(str)) { 31: throw new RpcException("Unsupported client type: " + str); 32: } 33: } 34: return server; 35: } ``` - 第 4 行:默认开启 Server 关闭时,发送 **READ_ONLY** 事件。 - 第 7 行:默认开启**心跳**功能。 - 第 9 至 13 行:校验配置的 Server 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。 - 第 16 行:设置编解码器为 `"Dubbo"` 协议,即 DubboCountCodec 。 - 第 18 至 24 行:调用 `Exchangers#bind(url, handler)` 方法,启动服务器。具体 `requestHanlder` 属性值的实现,我们放在 Dubbo 协议下的 RPC 的文章里分享。`requestHanlder` 属性的简化代码如下: ``` private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { // .... 省略具体实现代码 return invoker.invoke(inv); } } ``` - 第 26 至 33 行:校验配置的 Client 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。默认情况下,未配置,所以不会校验。 - 第 34 行:返回通信服务器。 ## 4. Exporter 本文涉及的 Exporter 类图如下: [![Exporter 类图](http://static2.iocoder.cn/images/Dubbo/2018_03_10/03.png)](http://static2.iocoder.cn/images/Dubbo/2018_03_10/03.png)Exporter 类图 ### 4.1 ExporterChangeableWrapper [`com.alibaba.dubbo.registry.integration.RegistryProtocol.ExporterChangeableWrapper`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/integration/RegistryProtocol.java#L422-L454) ,实现 Exporter 接口,Exporter 可变的包装器。 - 建立【返回的 Exporter】与【Protocol export 出的 Exporter】的对应关系。 - 在 override 时可以进行关系修改。 代码如下: ``` private class ExporterChangeableWrapper
implements Exporter
{ /** * 原 Invoker 对象 */ private final Invoker
originInvoker; /** * 暴露的 Exporter 对象 */ private Exporter
exporter; public ExporterChangeableWrapper(Exporter
exporter, Invoker
originInvoker) { this.exporter = exporter; this.originInvoker = originInvoker; } public Invoker
getOriginInvoker() { return originInvoker; } public Invoker
getInvoker() { return exporter.getInvoker(); } public void setExporter(Exporter
exporter) { this.exporter = exporter; } public void unexport() { String key = getCacheKey(this.originInvoker); // 移除出 `bounds` bounds.remove(key); // 取消暴露 exporter.unexport(); } } ``` ### 4.2 DestroyableExporter [`com.alibaba.dubbo.registry.integration.RegistryProtocol.DestroyableExporter`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/integration/RegistryProtocol.java#L456-L506) ,实现 Exporter 接口,**可销毁**的 Exporter 实现类。 ``` private class ExporterChangeableWrapper
implements Exporter
{ /** * 原 Invoker 对象 */ private final Invoker
originInvoker; /** * 暴露的 Exporter 对象 */ private Exporter
exporter; public ExporterChangeableWrapper(Exporter
exporter, Invoker
originInvoker) { this.exporter = exporter; this.originInvoker = originInvoker; } public Invoker
getOriginInvoker() { return originInvoker; } @Override public Invoker
getInvoker() { return exporter.getInvoker(); } // 可以重新设置 Exporter 对象 public void setExporter(Exporter
exporter) { this.exporter = exporter; } @Override public void unexport() { String key = getCacheKey(this.originInvoker); // 移除出 `bounds` bounds.remove(key); // 取消暴露 exporter.unexport(); } } ``` - Exporter **代理**,建立**原始的** Invoker 与 `Protocol#export(Invoker
invoker)` 方法返回的 Exporter 的**对应关系**。 - 在配置规则发生变化时,可调用 `#setExporter(Exporter
)` 方法,修改**对应关系**。详细解析,见 [《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》](http://svip.iocoder.cn/Dubbo/cluster-6-impl-configurator/?self) 。 ### 4.3 DubboExporter [`com.alibaba.dubbo.rpc.protocol.dubbo.DubboExporter`](https://github.com/YunaiV/dubbo/blob/8de6d56d06965a38712c46a0220f4e59213db72f/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboExporter.java) ,实现 AbstractExporter 抽象类,Dubbo Exporter 实现类。代码如下: ``` public class DubboExporter
extends AbstractExporter
{ /** * 服务键 */ private final String key; /** * Exporter 集合 * * key: 服务键 * * 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap} */ private final Map
> exporterMap; public DubboExporter(Invoker
invoker, String key, Map
> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; } @Override public void unexport() { // 取消暴露 super.unexport(); // 移除 exporterMap.remove(key); } } ``` - `key` 属性,服务键。 - #exporterMap属性,Exporter 集合。在上文DubboProtocol#export(invoker)方法中,我们可以看到,该属性就是AbstractProtocol.exporterMap属性。 - **构造方法**,**发起**暴露,将自己添加到 `exporterMap` 中。 - `#unexport()` 方法,**取消**暴露,将自己移除出 `exporterMap` 中。