13 简介
Dubbo 作为服务治理框架,比较核心的就是服务相关的一些概念
若按照完整服务启动与订阅的顺序,可以总结归纳为以下几点:
- 导出服务(提供者):服务提供方通过指定端口对外暴露服务
- 注册服务(提供者):提供方向注册中心注册自己的信息
- (服务发现)订阅服务(消费者):服务调用方通过注册中心订阅自己感兴趣的服务
- (服务发现)服务推送(消费者):注册中心向调用方推送地址列表
- 调用服务(消费者调用提供者):调用方选择一个地址发起 RPC 调用
- 监控服务:服务提供方、调用方的统计数据由监控模块收集展示
完整的服务启动订阅与调用流程不仅仅适用于 Dubbo 同样也适用于其他服务治理与发现的模型
一般服务发现与服务调用的思路就是这样的,先将以上内容扩展,暴漏服务可以使用 http、tcp、udp 等各种协议
注册服务可以注册到 Redis、Dns、Etcd、Zookeeper、Nacos 等注册中心中,订阅服务可以主动去注册中心查询服务列表,服务发现可以让注册中心将服务数据动态推送给消费者
Dubbo 其实就是基于这种简单的服务模型来扩展出各种功能的支持,来满足服务治理的各种场景。
回到主题,从以上的服务完整发布调用流程可以看到,所有的功能都是由导出服务(提供者)开始的,只有提供者先提供了服务才可以有真正的服务让消费者调用
之前的博客内容《9-DubboBootstrap 服务启动的生命周期》了解了 DefaultModuleDeployer 模块启动器的流程,其中在 start 模版方法中开始了导出服务的功能,这里来详细看下服务发布的全过程,入口代码 DefaultModuleDeployer#startSync 启动方法会执行暴露服务方法 exportServices
13.1 导出服务的入口
private void exportServices() {
// 从配置管理器缓存中查询服务配置然后逐个服务发布
for (ServiceConfigBase sc : configManager.getServices()) {
exportServiceInternal(sc);
}
}
主要流程是遍历初始化的服务列表列表,然后逐个开始导出服务 exportServiceInternal 方法源码如下:
private void exportServiceInternal(ServiceConfigBase sc) {
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
// 服务配置刷新,配置优先级覆盖
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
// 服务已经导出则直接返回
if (sc.isExported()) {
return;
}
// 是否异步方式导出,全局配置或者服务级其中一个配置了异步则进行异步处理
if (exportAsync || sc.shouldExportAsync()) {
// 异步其实就是使用 serviceExportExecutor 线程来导出服务
ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(
() -> {
try {
if (!sc.isExported()) {
sc.export();
exportedServices.add(sc);
}
} catch (Throwable t) {
logger.error(
CONFIG_FAILED_EXPORT_SERVICE,
"",
"",
"Failed to async export service config: " + getIdentifier() + " , catch error : "
+ t.getMessage(),
t);
}
},
executor);
asyncExportingFutures.add(future);
} else {
// 同步导出服务
if (!sc.isExported()) {
sc.export(RegisterTypeEnum.AUTO_REGISTER_BY_DEPLOYER);
exportedServices.add(sc);
}
}
if (serviceConfig.hasRegistrySpecified()) {
registryInteracted = true;
}
}
该方法逻辑里做了一些基本的操作,可以直接看注释然后调用 ServiceConfig#export 方法来导出服务,继续往后看服务配置的导出服务方法
13.2 服务配置 export 方法
核心的服务导出代码是在服务配置中来做的,通过 ServiceConfig#export 方法进行处理,其代码如下:
public void export(RegisterTypeEnum registerType) {
// 已经导出服务直接放行
if (this.exported) {
return;
}
// 模块生命周期管理器是否依赖外部管理
if (getScopeModel().isLifeCycleManagedExternally()) {
// prepare model for reference
getScopeModel().getDeployer().prepare();
} else {
// ensure start module, compatible with old api usage
// 确保模块启动, 兼容旧 api 使用
getScopeModel().getDeployer().start();
}
synchronized (this) {
// 双重校验
if (this.exported) {
return;
}
// 配置是否刷新
if (!this.isRefreshed()) {
this.refresh();
}
// 确保服务导出,配置为 false 则不导出
if (this.shouldExport()) {
// 初始化元数据对象
this.init();
// 是否延迟导出
if (shouldDelay()) {
// should register if delay export
// 配置了服务的延迟发布配置则走延迟发布逻辑
doDelayExport();
} else if (Integer.valueOf(-1).equals(getDelay())
&& Boolean.parseBoolean(ConfigurationUtils.getProperty(
getScopeModel(), CommonConstants.DubboProperty.DUBBO_MANUAL_REGISTER_KEY, "false"))) {
// should not register by default
doExport(RegisterTypeEnum.MANUAL_REGISTER);
} else {
// 导出服务
doExport(registerType);
}
}
}
}
在这里总结一下 ServiceConfig#export 方法执行的流程:
- 校验当前域模型的模块生命周期管理器是否依赖外部管理,比如 DubboSpringInitializer 会设置其为 true,完成应用初始化、模块发布器启动。不依赖于外部管理就提前将模块发布器启动好
- 双重校验服务提供者配置 exported 是否已暴露
- 校验当前配置是否已刷新,若未完成刷新,则调用 refresh 方法完成刷新准备工作
- 确保当前服务是否要导出,若配置 export 为 false 则不进行导出工作,需要导出时先进行元数据对象的初始化
- 若属性 delay 配置不为空并且大于 0,则通过定时调度线程池执行 doDelayExport 方法延迟导出服务
- 若属性 delay 配置为 -1,并且域模型配置
dubbo.application.manual-register
手动注册属性为 true,后面会在提供者 URL 参数重增加属性 register 为 false - 以上两个条件都不满足,则执行默认的导出服务工作
13.2.1 服务配置导出前 init 方法
当我们确定要导出服务时,执行 ServiceConfig#init 方法完成元数据对象的初始化工作
public void init() {
if (this.initialized.compareAndSet(false, true)) {
// load ServiceListeners from extension
// 加载服务监听器 ServiceListener 接口扩展实现,这里暂时没有服务监听器扩展
ExtensionLoader<ServiceListener> extensionLoader = this.getExtensionLoader(ServiceListener.class);
this.serviceListeners.addAll(extensionLoader.getSupportedExtensionInstances());
}
// 服务提供者配置传递给元数据对象,一个服务提供者配置会有一个元数据配置、服务配置
initServiceMetadata(provider);
// 元数据
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setTarget(getRef());
// 元数据 KEY 格式:group/interface:version
serviceMetadata.generateServiceKey();
}
13.3 服务配置 doExport 方法
ServiceConfig 执行导出服务核心逻辑
protected synchronized void doExport(RegisterTypeEnum registerType) {
// 取消导出,当 ServiceConfig 被销毁时,会调用 unexport 方法
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
// 已经导出,则直接返回
if (exported) {
return;
}
// 服务路径为空则设置为接口名,例子:org.apache.dubbo.demo.DemoService
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 导出 URL 列表
doExportUrls(registerType);
exported();
}
13.3.1 导出 URL 列表
ServiceConfig 导出 URL 核心逻辑
private void doExportUrls(RegisterTypeEnum registerType) {
// 模块服务存储库
ModuleServiceRepository repository = getScopeModel().getServiceRepository();
ServiceDescriptor serviceDescriptor;
// ref 为服务实现类型,这里对应的是 DemoServiceImpl
final boolean serverService = ref instanceof ServerService;
if (serverService) {
serviceDescriptor = ((ServerService) ref).getServiceDescriptor();
if (!this.provider.getUseJavaPackageAsPath()) {
// for stub service, path always interface name or IDL package name
this.path = serviceDescriptor.getInterfaceName();
}
repository.registerService(serviceDescriptor);
} else {
// 注册服务,这个注册不是向注册中心注册
// 为了解析服务接口将服务方法等描述信息,存放在 ModuleServiceRepository 类型对象的成员变量 services 中
serviceDescriptor = repository.registerService(getInterfaceClass());
}
// 提供者领域模型,其封装了一些提供者需要的基本属性同时内部解析封装方法信息,包括:ProviderMethodModel 列表、服务标识符【group/服务接:版本号】
providerModel = new ProviderModel(
serviceMetadata.getServiceKey(),
// 服务实现类 DemoServiceImpl
ref,
// 服务描述符,描述符里面包含了服务接口的方法信息
serviceDescriptor,
// 当前所处模型
getScopeModel(),
// 当前服务接口的元数据对象
serviceMetadata,
interfaceClassLoader);
// Compatible with dependencies on ServiceModel#getServiceConfig(), and will be removed in a future version
providerModel.setConfig(this);
// 提供者领域模型销毁执行器
providerModel.setDestroyRunner(getDestroyRunner());
// ModuleServiceRepository 存储提供者模型对象
repository.registerProvider(providerModel);
// 获取配置的注册中心列表,同时将注册中心配置转 URL(Dubbo 中 URL 就是配置信息的一种形式)
// 这里会获取到两个,由 dubbo.application.register-mode 双注册配置决定
// 1、service-discovery-registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=default&application=dubbo-demo-api-provider&dubbo=2.0.2&executor-management-mode=isolation&file-cache=true&pid=50032®istry=nacos&release=3.3.0×tamp=1749373618835
// 2、registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=default&application=dubbo-demo-api-provider&dubbo=2.0.2&executor-management-mode=isolation&file-cache=true&pid=50032®istry=nacos&release=3.3.0×tamp=1749373618835
// 参数 dubbo 是 dubbo 协议的版本而不是 Dubbo 版本,Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2
// 这里后面详细说下服务双注册:dubbo.application.register-mode
List<URL> registryURLs = !Boolean.FALSE.equals(isRegister())
? ConfigValidationUtils.loadRegistries(this, true)
: Collections.emptyList();
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(
getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
// stub service will use generated service name
if (!serverService) {
// In case user specified path, register service one more time to map it to path.
// ModuleServiceRepository 存储服务接口信息
repository.registerService(pathKey, interfaceClass);
}
// 根据协议导出 URL 配置到注册中心
doExportUrlsFor1Protocol(protocolConfig, registryURLs, registerType);
}
providerModel.setServiceUrls(urls);
}
这里总结一下 doExportUrls 方法整体的执行逻辑,如下:
- 获取 ref 实现类型是否为 ServerService,在这边我们是 DemoService 接口类型,解析服务接口将服务方法等描述信息存放在模块服务存储库的 services 变量中
- 创建一个提供者领域模型 ProviderModel,填充服务标识符、服务元数据对象、服务接口描述符信息,最后构建销毁执行器
- 通过
ConfigValidationUtils#loadRegistries
方法加载注册中心列表,同时将注册中心配置转 URL - 遍历当前服务可支持的协议配置列表,获取当前协议配置的路径标识,组合协议配置上下文路径、group、version 值,并将其注册到模块服务存储库的 services 变量中
- 依次将协议配置调用
doExportUrlsFor1Protocol
方法导出 URL 配置到注册中心中,填充当前服务配置的 urls 属性,并将其填充到提供者领域模型中
13.3.2 获取应用级和接口级服务注册地址
在执行 doExportUrls 方法时会通过 ConfigValidationUtils#loadRegistries 方法来加载注册中心列表,在这里会涉及到服务的双注册配置
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
关于 loadRegistries 方法的详情我们就不看了,主要看 loadRegistries 方法中内部调用的 genCompatibleRegistries 方法,会将服务的配置转换为 URL 以此来适配对应的注册中心
/**
* @param scopeModel 域模型
* @param registryList 注册中心列表
* @param provider 是否是提供者,这里 DemoService 为 true
*/
private static List<URL> genCompatibleRegistries(ScopeModel scopeModel, List<URL> registryList, boolean provider) {
List<URL> result = new ArrayList<>(registryList.size());
// 遍历所有的注册中心,为每个注册中心增加兼容的注册中心地址配置
registryList.forEach(registryURL -> {
if (provider) {
// for registries enabled service discovery, automatically register interface compatible addresses.
String registerMode;
// 注册协议配置了 service-discovery-registry
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
registerMode = registryURL.getParameter(
REGISTER_MODE_KEY,
ConfigurationUtils.getCachedDynamicProperty(
scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_INSTANCE));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INSTANCE;
}
// 配置的是应用级配置,则先添加应用级地址,再根据条件判断是否添加接口级注册中心地址
result.add(registryURL);
if (DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)
&& registryNotExists(registryURL, registryList, REGISTRY_PROTOCOL)) {
URL interfaceCompatibleRegistryURL = URLBuilder.from(registryURL)
.setProtocol(REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(interfaceCompatibleRegistryURL);
}
} else {
// 正常情况下配置会走这个逻辑
// 获取服务注册的注册模式配置:dubbo.application.register-mode,默认值为 all 既注册接口级又注册应用级信息
registerMode = registryURL.getParameter(
REGISTER_MODE_KEY,
ConfigurationUtils.getCachedDynamicProperty(
scopeModel, DUBBO_REGISTER_MODE_DEFAULT_KEY, DEFAULT_REGISTER_MODE_ALL));
if (!isValidRegisterMode(registerMode)) {
registerMode = DEFAULT_REGISTER_MODE_INTERFACE;
}
// 根据逻辑条件判断是否添加应用级注册中心地址
if ((DEFAULT_REGISTER_MODE_INSTANCE.equalsIgnoreCase(registerMode)
|| DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode))
&& registryNotExists(registryURL, registryList, SERVICE_REGISTRY_PROTOCOL)) {
URL serviceDiscoveryRegistryURL = URLBuilder.from(registryURL)
.setProtocol(SERVICE_REGISTRY_PROTOCOL)
.removeParameter(REGISTRY_TYPE_KEY)
.build();
result.add(serviceDiscoveryRegistryURL);
}
// 根据逻辑条件判断是否添加接口级注册中心地址
if (DEFAULT_REGISTER_MODE_INTERFACE.equalsIgnoreCase(registerMode)
|| DEFAULT_REGISTER_MODE_ALL.equalsIgnoreCase(registerMode)) {
result.add(registryURL);
}
}
FrameworkStatusReportService reportService = ScopeModelUtil.getApplicationModel(scopeModel)
.getBeanFactory()
.getBean(FrameworkStatusReportService.class);
reportService.reportRegistrationStatus(reportService.createRegistrationReport(registerMode));
} else {
result.add(registryURL);
}
});
return result;
}
该方法是根据服务注册模式来判断使用接口级注册地址还是应用级注册地址,按照配置属性:dubbo.application.register-mode 来判断,一共支持三种配置方式,如下:
interface:接口级注册
registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=default&application=dubbo-demo-api-provider&dubbo=2.0.2&executor-management-mode=isolation&file-cache=true&pid=50032®istry=nacos&release=3.3.0×tamp=1749373618835
instance:应用级注册
service-discovery-registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=default&application=dubbo-demo-api-provider&dubbo=2.0.2&executor-management-mode=isolation&file-cache=true&pid=50032®istry=nacos&release=3.3.0×tamp=1749373618835
all:接口级和应用级都注册
13.4 导出服务配置到本地、注册中心
在执行 doExportUrls 方法时最后一步会通过 doExportUrlsFor1Protocol 方法来将服务配置导出到本地和注册中心
// 根据协议导出 URL 配置到注册中心
doExportUrlsFor1Protocol(protocolConfig, registryURLs, registerType);
protocolConfig:dubbo 协议的配置 <dubbo:protocol preferSerialization=“hessian2,fastjson2” port="-1" name=“dubbo” />
registryURLs 目前有两个:应用级注册中心地址、接口级注册中心地址
13.4.1 导出服务配置 doExportUrlsFor1Protocol 方法
private void doExportUrlsFor1Protocol(
ProtocolConfig protocolConfig, List<URL> registryURLs, RegisterTypeEnum registerType) {
// 生成协议配置作为元数据配置中 attachments
Map<String, String> map = buildAttributes(protocolConfig);
// remove null key and null value 移除空值简化配置
map.keySet().removeIf(key -> StringUtils.isEmpty(key) || StringUtils.isEmpty(map.get(key)));
// init serviceMetadata attachments 协议配置放到元数据对象中
serviceMetadata.getAttachments().putAll(map);
// 协议配置 + 默认协议配置转 URL 类型的配置存储
URL url = buildUrl(protocolConfig, map);
// 添加 service-executor 属性到 URL 中
processServiceExecutor(url);
if (CollectionUtils.isEmpty(registryURLs)) {
registerType = RegisterTypeEnum.NEVER_REGISTER;
}
// 导出url
exportUrl(url, registryURLs, registerType);
// 初始化服务的方法指标度量统计
initServiceMethodMetrics(url);
}
元数据配置中 attachments 可参考如下 Debug 出来的图
13.4.2 导出服务配置模版方法
继续看导出服务的模版方法 exportUrl,分为本地导出和注册中心导出,协议配置参数 URL 可参考如下:
dubbo://192.168.2.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.2.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&executor-management-mode=isolation&file-cache=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=50032&prefer.serialization=hessian2,fastjson2&release=3.3.0&side=provider×tamp=1749388995602
private void exportUrl(URL url, List<URL> registryURLs, RegisterTypeEnum registerType) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// 未明确指定远程导出则开启本地导出
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 未明确指定本地导出则开启远程导出
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// export to extra protocol is used in remote export
String extProtocol = url.getParameter(EXT_PROTOCOL, "");
List<String> protocols = new ArrayList<>();
if (StringUtils.isNotBlank(extProtocol)) {
// export original url
url = URLBuilder.from(url)
.addParameter(IS_PU_SERVER_KEY, Boolean.TRUE.toString())
.build();
}
// 远程导出
url = exportRemote(url, registryURLs, registerType);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());
}
if (StringUtils.isNotBlank(extProtocol)) {
String[] extProtocols = extProtocol.split(",", -1);
protocols.addAll(Arrays.asList(extProtocols));
}
// export extra protocols
for (String protocol : protocols) {
if (StringUtils.isNotBlank(protocol)) {
URL localUrl = URLBuilder.from(url)
.setProtocol(protocol)
.addParameter(IS_EXTRA, Boolean.TRUE.toString())
.removeParameter(EXT_PROTOCOL)
.build();
localUrl = exportRemote(localUrl, registryURLs, registerType);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(
localUrl, providerModel.getServiceModel(), getApplicationModel());
}
this.urls.add(localUrl);
}
}
}
}
this.urls.add(url);
}
13.5 exportLocal 导出服务到本地
在导出服务本地时,本地调用使用了 injvm 协议,是一个伪协议,其不开启端口也不发起远程调用,只在 JVM 内直接关联,但是执行 Dubbo Filter 链,直接查看如下代码:
private void exportLocal(URL url) {
// 协议转为 injvm 代表本地导出 host 为 127.0.0.1
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
local = local.setScopeModel(getScopeModel()).setServiceModel(providerModel);
local = local.addParameter(EXPORTER_LISTENER_KEY, LOCAL_PROTOCOL);
doExportUrl(local, false, RegisterTypeEnum.AUTO_REGISTER);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
13.5.1 doExportUrl 方法
private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {
// 设置注册类型
if (!url.getParameter(REGISTER_KEY, true)) {
registerType = RegisterTypeEnum.MANUAL_REGISTER;
}
// 如果是手动注册或者自动注册则取消注册,则添加参数 register=false
if (registerType == RegisterTypeEnum.NEVER_REGISTER
|| registerType == RegisterTypeEnum.MANUAL_REGISTER
|| registerType == RegisterTypeEnum.AUTO_REGISTER_BY_DEPLOYER) {
url = url.addParameter(REGISTER_KEY, false);
}
// 这里是由 adaptor 扩展类型处理过的
// 直接看默认的类型 javassist 对应 JavassistProxyFactory 代理工厂来获取调用对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
// 导出远程服务才会执行这里
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
Exporter<?> exporter = protocolSPI.export(invoker);
exporters
.computeIfAbsent(registerType, k -> new CopyOnWriteArrayList<>())
.add(exporter);
}
13.5.2 JavassistProxyFactory#getInvoker 方法
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
try {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 创建实际服务提供者的代理类型,代理类型后缀为 DubboWrap 在这里类型为 link.elastic.dubbo.entity.DemoServiceImplDubboWrap0
final Wrapper wrapper =
Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建一个匿名内部类对象,继承自 AbstractProxyInvoker Invoker 对象
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments)
throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
} catch (Throwable fromJavassist) {
// try fall back to JDK proxy factory
try {
Invoker<T> invoker = jdkProxyFactory.getInvoker(proxy, type, url);
logger.error(
PROXY_FAILED,
"",
"",
"Failed to generate invoker by Javassist failed. Fallback to use JDK proxy success. "
+ "Interfaces: " + type,
fromJavassist);
// log out error
return invoker;
} catch (Throwable fromJdk) {
logger.error(
PROXY_FAILED,
"",
"",
"Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. "
+ "Interfaces: " + type + " Javassist Error.",
fromJavassist);
logger.error(
PROXY_FAILED,
"",
"",
"Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. "
+ "Interfaces: " + type + " JDK Error.",
fromJdk);
throw fromJavassist;
}
}
}
13.5.3 export Invoker 对象
使用 JavassistProxyFactory#getInvoker 方法创建好 Invoker 以后,再通过协议导出 Invoker 对象
Exporter<?> exporter = protocolSPI.export(invoker);
这个 Protocol#export 方法使用了 Adaptor 扩展和 Wrapper 机制,所以 Debug 起来不太方便,在这里贴一下调用堆栈
下面我们来介绍几种不同的 Wrapper 实现:
1、协议序列化机制:ProtocolSerializationWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 将服务提供者 url 添加到服务存储仓库中
getFrameworkModel(invoker.getUrl().getScopeModel())
.getBeanFactory()
.getBean(PermittedSerializationKeeper.class)
.registerService(invoker.getUrl());
return protocol.export(invoker);
}
2、协议过滤器 Wrapper:ProtocolFilterWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 注册中心的协议导出直接执行
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 过滤器调用链 FilterChainBuilder 扩展对象查询
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
// 这里分为 2 步
// 查询 provider 类型的过滤器生成过滤器调用链、然后使用链表中的节点调用
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
过滤器调用链,调用 DefaultFilterChainBuilder#buildInvokerChain 方法生成
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
// originalInvoker 代表真正的服务调用器
Invoker<T> last = originalInvoker;
URL url = originalInvoker.getUrl();
List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);
List<Filter> filters;
if (moduleModels != null && moduleModels.size() == 1) {
// 类型 Filter key 为 service.filter 分组为 provider 所有提供者过滤器拉取
filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
} else if (moduleModels != null && moduleModels.size() > 1) {
filters = new ArrayList<>();
List<ExtensionDirector> directors = new ArrayList<>();
for (ModuleModel moduleModel : moduleModels) {
List<Filter> tempFilters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModel)
.getActivateExtension(url, key, group);
filters.addAll(tempFilters);
directors.add(moduleModel.getExtensionDirector());
}
filters = sortingAndDeduplication(filters, directors);
} else {
filters = ScopeModelUtil.getExtensionLoader(Filter.class, null).getActivateExtension(url, key, group);
}
// 倒序拼接,将过滤器的调用对象添加到链表中,最后倒序遍历之后 last 节点指向了调用链路链表头节点的对象
if (!CollectionUtils.isEmpty(filters)) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
// 每个 invoker 对象中都有 originalInvoker 对象
last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
}
return new CallbackRegistrationInvoker<>(last, filters);
}
return last;
}
3、协议监听器 Wrapper:ProtocolListenerWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 注册中心地址则直接导出
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 先导出对象 再创建过滤器包装对象 执行监听器逻辑
List<ExporterListener> exporterListeners = ScopeModelUtil.getExtensionLoader(
ExporterListener.class, invoker.getUrl().getScopeModel())
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY);
// injvm 本地调用则添加本地监听器
if (LOCAL_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
exporterListeners.add(invoker.getUrl()
.getOrDefaultFrameworkModel()
.getBeanFactory()
.getBean(InjvmExporterListener.class));
}
return new ListenerExporterWrapper<>(protocol.export(invoker), Collections.unmodifiableList(exporterListeners));
}
4、QOS 协议 Wrapper:QosProtocolWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// start qos server 注册中心导出的时候开启 QOS 默认端口 22222
startQosServer(invoker.getUrl(), true);
return protocol.export(invoker);
}
5、InjvmProtocol 导出方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
13.6 exportRemote 导出服务到注册中心
在 16.4.2 导出服务配置模版方法中,看到了服务导出会导出到本地和远程,接下来来看下导出到远程方法 exportRemote 参数 URL
private URL exportRemote(URL url, List<URL> registryURLs, RegisterTypeEnum registerType)
dubbo://192.168.2.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.2.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&executor-management-mode=isolation&file-cache=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=50032&prefer.serialization=hessian2,fastjson2&release=3.3.0&side=provider×tamp=1749388995602
参数 registryURLs 目前有两个应用级注册地址和接口级注册地址:
interface:接口级注册
registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=default&application=dubbo-demo-api-provider&dubbo=2.0.2&executor-management-mode=isolation&file-cache=true&pid=50032®istry=nacos&release=3.3.0×tamp=1749373618835
instance:应用级注册
service-discovery-registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=default&application=dubbo-demo-api-provider&dubbo=2.0.2&executor-management-mode=isolation&file-cache=true&pid=50032®istry=nacos&release=3.3.0×tamp=1749373618835
private URL exportRemote(URL url, List<URL> registryURLs, RegisterTypeEnum registerType) {
if (CollectionUtils.isNotEmpty(registryURLs) && registerType != RegisterTypeEnum.NEVER_REGISTER) {
// 遍历所有注册地址与注册模式 逐个注册
for (URL registryURL : registryURLs) {
// 协议 URL,添加应用级注册 service-discovery-registry 参数 service-name-mapping 为 true
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
// if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// 协议 url,添加动态配置 dynamic
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
// 监控配置暂时为 null
if (monitorUrl != null) {
url = url.putAttribute(MONITOR_KEY, monitorUrl);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 开始注册服务了 打印个认知 提示下
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url
+ " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true, registerType);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
doExportUrl(url, true, registerType);
}
return url;
}
13.6.1 doExportUrl 方法
与 16.5.1 doExportUrl 执行导出服务到本地的方法是一样的逻辑
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
1、与本地导出 ProtocolFilterWrapper 的不同之处,服务发现 service-discovery-registry 导出会通过 UrlUtils#isRegistry 方法判断,其结果为 true 会走这个逻辑
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
2、与协议监听器 Wrapper ProtocolListenerWrapper 的不同之处
服务发现 service-discovery-registry 导出会通过 UrlUtils#isRegistry 方法判断,其结果为 true 会走这个逻辑
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
最后一个地方不同的是,exportRemote 调用链路会执行 RegistryProtocol#export 方法
13.6.2 注册协议导出服务与注册服务
RegistryProtocol 导出方法: 该方法非常重要同时也是服务注册的核心代码,先概括下包含了哪些步骤
- 覆盖配置
- 导出协议端口开启 TCP 服务
- 注册到注册中心
- 通知服务启动
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// service-discovery-registry://127.0.0.1:8848/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=default&application=dubbo-demo-api-provider&dubbo=2.0.2&executor-management-mode=isolation&file-cache=true&pid=66599®ister=false®istry=nacos&release=3.3.0×tamp=1749393270234
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
// dubbo://192.168.2.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.2.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&executor-management-mode=isolation&file-cache=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=66599&prefer.serialization=hessian2,fastjson2&release=3.3.0&service-name-mapping=true&side=provider×tamp=1749393273090
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// override 配置
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
Map<URL, Set<NotifyListener>> overrideListeners =
getProviderConfigurationListener(overrideSubscribeUrl).getOverrideListeners();
overrideListeners
.computeIfAbsent(overrideSubscribeUrl, k -> new ConcurrentHashSet<>())
.add(overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 通过 URL 获取 注册中心 Registry 操作对象
final Registry registry = getRegistry(registryUrl);
// 需要向注册中心注册地址转换
// dubbo://192.168.2.1:20880/org.apache.dubbo.demo.DemoService?application=dubbo-demo-api-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&prefer.serialization=hessian2,fastjson2&release=3.3.0&service-name-mapping=true&side=provider×tamp=1749402354267
final URL registeredProviderUrl = customizeURL(providerUrl, registryUrl);
// decide if we need to delay publish (provider itself and registry should both need to register)
boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
// 是否向注册中心注册
if (register) {
register(registry, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
exporter.setNotifyListener(overrideSubscribeListener);
exporter.setRegistered(register);
ApplicationModel applicationModel = getApplicationModel(providerUrl.getScopeModel());
if (applicationModel
.modelEnvironment()
.getConfiguration()
.convert(Boolean.class, ENABLE_26X_CONFIGURATION_LISTEN, true)) {
if (!registry.isServiceDiscovery()) {
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
}
}
// 内置监听器通知 这个不是通知消费者的
notifyExport(exporter);
// Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
13.7 doLocalExport 本地导出协议开启端口
直接看 doLocalExport 方法是如何执行的
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String providerUrlKey = getProviderUrlKey(originInvoker);
String registryUrlKey = getRegistryUrlKey(originInvoker);
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
ReferenceCountExporter<?> exporter =
exporterFactory.createExporter(providerUrlKey, () -> protocol.export(invokerDelegate));
// protocol 对象是 dubbo 自动生成的适配器对象
// protocol$Adaptive 适配器对象会根据当前协议的参数来查询具体的协议扩展对象
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(providerUrlKey, k -> new ConcurrentHashMap<>())
.computeIfAbsent(
registryUrlKey,
s -> new ExporterChangeableWrapper<>((ReferenceCountExporter<T>) exporter, originInvoker));
}
上面这个 protocol$Adaptive 协议的 export 导出方法与之前的一样也会经历下面几个过程,具体的细节可以参考 injvm 协议的导出:
- ProtocolSerializationWrapper
- ProtocolFilterWrapper
- ProtocolListenerWrapper
- QosProtocolWrapper
- 唯一不同的是这里对应的协议扩展类型为 DubboProtocol,接下来看 DubboProtocol 类的 export 方法是如何实现的
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
checkDestroyed();
// dubbo://192.168.2.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.2.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&executor-management-mode=isolation&file-cache=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=74593&prefer.serialization=hessian2,fastjson2&release=3.3.0&service-name-mapping=true&side=provider×tamp=1749403369884
URL url = invoker.getUrl();
// export service. 生成 key: org.apache.dubbo.demo.DemoService:20880
String key = serviceKey(url);
// 创建导出服务用的导出器 DubboExporter
DubboExporter<T> exporter = new DubboExporter<>(invoker, key, exporterMap);
// export a stub service for dispatching event
// stub 配置校验
boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
boolean isCallbackService = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackService) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(
PROTOCOL_UNSUPPORTED,
"",
"",
"consumer [" + url.getParameter(INTERFACE_KEY)
+ "], has set stub proxy support event ,but no stub methods founded.");
}
}
}
// 创建服务开启服务端口
openServer(url);
optimizeSerialization(url);
return exporter;
}
13.7.1 开启服务端口
此处就到了 RPC 协议的 TCP 通信模块了,对应 DubboProtocol#openServer 方法
private void openServer(URL url) {
checkDestroyed();
// find server. address -> 192.168.2.1:20880
String key = url.getAddress();
// client can export a service which only for server to invoke
// 默认提供者开启服务,消费者是不能开启服务的
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 协议服务器 下面一个双重校验锁检查,如果为空则创建服务
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
return;
}
}
}
// server supports reset, use together with override
server.reset(url);
}
}
为当前地址创建协议服务对应的方法 DubboProtocol#createServer,如下:
private ProtocolServer createServer(URL url) {
// 下面将 url 增加了 heartbeat 和 codec 参数,如下
// dubbo://192.168.2.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&background=false&bind.ip=192.168.2.1&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&executor-management-mode=isolation&file-cache=true&generic=false&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=75071&prefer.serialization=hessian2,fastjson2&release=3.3.0&service-name-mapping=true&side=provider×tamp=1749403863500
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 服务端使用的网络库默认值 netty
String transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (StringUtils.isNotEmpty(transporter)
&& !url.getOrDefaultFrameworkModel()
.getExtensionLoader(Transporter.class)
.hasExtension(transporter)) {
throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);
}
// dubbo 交换器层对象创建
ExchangeServer server;
try {
// 这个方法会绑定端口,关于交换器与传输网络层到后面统一说
// 这里通过绑定url和请求处理器来创建交换器对象
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
transporter = url.getParameter(CLIENT_KEY);
if (StringUtils.isNotEmpty(transporter)
&& !url.getOrDefaultFrameworkModel()
.getExtensionLoader(Transporter.class)
.hasExtension(transporter)) {
throw new RpcException("Unsupported client type: " + transporter);
}
DubboProtocolServer protocolServer = new DubboProtocolServer(server);
// 关闭等待时长默认为10秒
loadServerProperties(protocolServer);
return protocolServer;
}
13.8 register 注册服务
下篇博客我们重点分析 RegistryProtocol#register 注册服务方法是如何进行处理的