码农翻身

XXL-CONF源码解读篇

- by MRyan, 2023-04-18


序. 介绍

XXL-CONF 是一款分布式配置中心,特性:轻量级、秒级动态推送、多环境、跨语言、跨机房、配置监听、权限控制、版本回滚

配置中心的作用相信大家都知道,不需要重新编译打包,不需要重启线上服务器,通过修改配置中心的配置项的数据,实时推送新的配置项数据到各个引用的项目中,实时更新。

XXL-CONF 项目文档完善,接入简单,部署容易,可高可用,高性能,使用文档如下,感兴趣的小伙伴可以看一遍,这里不在针对如何使用做阐述了。

官网链接:这是链接,点这

我们首先先思考一个问题:如何实现分布式配置中心呢

带着疑问阅读本文,本文主要剖析 XXL-CONF 1.6.2-SNAPSHOT 版本源码,通过学习优秀框架实现思路,沉淀技术积累。

1. 思考:如何实现配置中心

数据结构

首先在业务需求上,我们应该提前规划好配置中心支持哪些 “配置”,是普通的基础数据类型值,还是像 Json,或者 Yaml ,XML 这样的文件格式,也就是先确定数据结构

存储

元配置数据肯定是需要存储:所以我们需要一个可靠、高性能,可扩展的存储系统来存储和管理配置信息,例如分布式数据库、分布式文件系统。

数据隔离

支持多环境配置数据,各个环境之间需要相互隔离,如何隔离?

表中环境字段隔离,部署单个配置中心集群,定义多套环境隔离不同环境的配置数据可以同享配置中心资源。

还是部署多配置中心集群,集群 1 指定定义环境 product,集群 2 指定定义环境 test 等,避免多集群相互影响。

客户端和配置中心的通信选型

协议支持上,节点间的通信可以选则比较常用的 HTTP 协议,配置中心将配置数据暴露为 HTTP API,客户端使用 HTTP 访问该 API 以获取配置数据。

或者使用业界成熟的应用通信技术框架,例如 Dubbo,Thrift,gRPC 等。

再或者使用专为网络通信设计的基于 NIO 的高性能框架 Netty,更好的支持高并发,高吞吐的通信场景。

支持客户端的接入方式

使用 HTTP 协议实现的接口,天然支持多端,多语言,但可以为例如 Java 语言,Spring 框架提供接入组件。

实现配置变更通知机制

当配置数据发生变更时,需要及时通知各个实例。

可以基于轮询机制,客户端定期向配置中心发送请求,查询配置是否变更,若变更主动向配置中心拉取变更数据,优缺点显而易见,优点:简单,缺点:频繁发送请求,性能、网络资源消耗大。

或者基于长连接机制,客户端和配置中心建立长连接,配置数据更新时,配置中心通过长连接实时通知客户端,显而易见的缺点就是,维护长连接的成本对服务器的资源消耗较大。

在或者引入消息队列,配置中心将配置变更信息发布到消息队列,客户端从消息队列中订阅配置变更信息,缺点显而易见,引入消息队列后增加了系统的复杂度。

再或者可以选型 ZooKeeper 或者 Etcd 等作为分布式协调服务,管理配置,通过订阅配置节点上的变更来获取配置更新。

安全的考量

为保护配置数据的安全性,需要添加访问控制机制,例如在 API 接口中添加认证、授权等机制来限制访问。

WEB 端可视化配置管理

增删改查配置直接通过 WEB 界面完成。

跨机房,异地多活

配置中心集群关系对等特性,集群各节点提供幂等的配置服务。因此异地跨机房部署时,只需要请求本机房配置中心即可,避免跨机房请求可能遇到的网络疑难杂症。实现异地多活,也可以防止同机房服务全部宕机,访问其他机房可用,达到容灾的效果。

配置中心高可用

支持集群部署可提升系统可用性。设计多级存储可有助于降级容灾,

例如一级存储: DB 做元数据存储。

二级存储:配置中心磁盘文件作为配置中心集群的镜像文件。

三级存储:客户端镜像文件,配置中心故障降级使用。

四级存储:接入配置中心的客户端自己的内存 LocalCache 数据

提升性能的同时,降低对底层配置服务的压力。


以上就是笔者对如何实现配置中心这个问题的思考,各位小伙伴也可以思考下,下面让我们来看 XXL-CONF 是如何实现的吧。

2.项目架构

2.1 项目结构

clone 下来的源码项目结构如下图所示

image-20230415125245630

xxl-conf-admin 为配置中心 - 配置管理平台(包含 WEB 模块),提供可视化界面操作数据

有如下功能:环境管理、用户管理、项目管理、配置管理等

image-20230415125453902


xxl-conf-core 为客户端接入的组件,支持配置中心配置项变更动态监听功能,内存级 LocalCache 保证高性能。


xxl-conf-samples 提供了两个 demo 项目,其中 xxl-conf-sample-frameles 项目为无框架版本,可以直接 main 方法启动运行,xxl-conf-sample-springboot 则是支持 Springboot 版本。

2.2 多级存储

2.2.1 一级存储-DB存储

XXL-CONF 的 DB 存储使用的是 MySQL,我们先来看下数据库的表结构。

XXL-CONF 配置数据是通过表字段隔离,通过环境和项目名称字段当做业务属性隔离,也就是说不同的环境不同项目之间的配置是隔离的。

xxl_conf_env 表为环境标识表,存放所有的环境标识。

CREATE TABLE `xxl_conf_env` (
  `env` varchar(100) NOT NULL COMMENT 'Env',
  `title` varchar(100) NOT NULL COMMENT '环境名称',
  `order` tinyint(4) NOT NULL DEFAULT '0' COMMENT '显示排序',
  PRIMARY KEY (`env`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

image-20230416195836110


xxl_conf_project 表为项目描述表,存放项目标识。

CREATE TABLE `xxl_conf_project` (
  `appname` varchar(100) NOT NULL COMMENT 'AppName',
  `title` varchar(100) NOT NULL COMMENT '项目名称',
  PRIMARY KEY (`appname`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

其中 appname 多提一嘴,每个项目拥有唯一的 appname,作为项目标识,同时作为该项目下配置的统一前缀。

image-20230416200106605


xxl_conf_user 表存放着用户数据,包括账号,密码,该用户拥有的权限数据等。

CREATE TABLE `xxl_conf_user` (
  `username` varchar(100) NOT NULL COMMENT '账号',
  `password` varchar(100) NOT NULL COMMENT '密码',
  `permission` tinyint(4) NOT NULL DEFAULT '0' COMMENT '权限:0-普通用户、1-管理员',
  `permission_data` varchar(1000) DEFAULT NULL COMMENT '权限配置数据',
  PRIMARY KEY (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


xxl_conf_node 表存放配置数据最新的版本,按照 env 环境和 appname 项目标识隔离。

CREATE TABLE `xxl_conf_node` (
  `env` varchar(100) NOT NULL COMMENT 'Env',
  `key` varchar(200) NOT NULL COMMENT '配置Key',
  `appname` varchar(100) NOT NULL COMMENT '所属项目AppName',
  `title` varchar(100) NOT NULL COMMENT '配置描述',
  `value` varchar(2000) DEFAULT NULL COMMENT '配置Value',
  PRIMARY KEY (`env`,`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

其中多提一嘴,配置的 key,创建时将会自动添加所属项目的 appname 所谓前缀,生成最终的 key。可通过客户端使用最终的 key 获取配置。


xxl_conf_node_log 表记录着配置项的所有变更操作日志。

CREATE TABLE `xxl_conf_node_log` (
  `env` varchar(255) NOT NULL COMMENT 'Env',
  `key` varchar(200) NOT NULL COMMENT '配置Key',
  `title` varchar(100) NOT NULL COMMENT '配置描述',
  `value` varchar(2000) DEFAULT NULL COMMENT '配置Value',
  `addtime` datetime NOT NULL COMMENT '操作时间',
  `optuser` varchar(100) NOT NULL COMMENT '操作人'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

xxl_conf_node_msg 表记录着配置项数据的变更通知,是配置中心实时通知客户端配置变更的核心。

CREATE TABLE `xxl_conf_node_msg` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `addtime` datetime NOT NULL,
  `env` varchar(100) NOT NULL COMMENT 'Env',
  `key` varchar(200) NOT NULL COMMENT '配置Key',
  `value` varchar(2000) DEFAULT NULL COMMENT '配置Value',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.2.2 多级存储

上文中我们提到了设计多级存储可有助于降级容灾

二级存储:配置中心磁盘文件作为配置中心集群的镜像文件。

三级存储:存储客户端磁盘的镜像文件,配置中心故障降级使用。

四级存储:接入配置中心的客户端自己的内存 LocalCache 数据

多级存储的存在提升性能的同时,降低对底层配置服务的压力。

实际上 XXL-CONF 就是这么设计的,我们直接来看 XXL-CONF 多级存储的具体实现。

二级存储

是存储在配置中心服务器磁盘上的镜像文件 Properties,文件存放的路径可以通过 admin 配置中心的 application.properties 中的 xxl.conf.confdata.filepath 属性指定。

admin 配置中心在给接入客户端提供的 API:/conf/find 查询配置数据就是直接从配置中心的磁盘文件 Properties 中寻找该配置项数据。

配置中心守护线程定时推送客户端通知告知配置变更,数据的对比就是通过 key 关联的 xxl_conf_node_msg 消息,与磁盘文件 Properties 的配置项数据比对。

考虑到 value 的值可能比较大,为了提高检索效率,每一个配置项用一个单独的文件来保存,文件名由环境+ 项目名 + key组成,这样做便于查询的时候能够快速精准定位。


文件记录当前配置的值

格式如下:value=XXX

image-20230417223142233

当配置被删除后文件记录着文件被删除

格式如下:value-deleted=true

image-20230417223611755

三级存储

是存储在客户端磁盘的镜像文件,文件存放的路径可以通过客户端的 application.properties 中 xxl.conf.mirrorfile 属性指定。

客户端在第一次启动时,先拉取服务端指定配置项数据,存储客户端磁盘镜像文件中。

同理每一个配置项用一个单独的文件来保存。

四级存储

客户端内存级别存储,客户端在第一次启动时,先拉取服务端指定配置项数据,存储客户端磁盘镜像文件中,在添加到 localCacheRepository 中。

    private static ConcurrentHashMap<String, CacheNode> localCacheRepository = null;

2.3 架构设计

2.3.1 CONF(admin)架构

我们可以首先看到 xxl-conf-admin 配置中心服务提供的配置管理的 CRUD 操作都强依赖 DB,DB 作为配置数据源存储着配置信息,配置的版本变更信息等。

其次维护一个本地磁盘文件 Properties,作为配置中心集群配置的镜像数据,每当新增,更新,删除配置数据的时候同步更新到磁盘文件中,并广播通知每个集群节点实时刷新节点磁盘配置数据,接入方客户端long-polling 接收到配置已变更通知,主动向配置中心拉取最新配置数据。

下图来源 XXL-CONF 官网

输入图片说明

2.3.2 客户端架构

客户端只需要使用组件封装好的 API,即可一行代码获取配置信息。

客户端内存级的 LocalCache 本地缓存,极大提升 API 层的性能,降低对配置中心集群的压力。首次加载配置、监听配置变更、底层异步周期性同步配置时,写入或更新缓存。

配置数据的本地快照文件,会周期性同步 LocalCache 中的配置数据写入到 Mirror-File 镜像文件中;当无法从配置中心获取配置,如配置中心宕机时,将会使用 Mirror-File 中的配置数据,提高系统的可用性。

因此接入方可以在高 QPS、高并发场景下使用 XXL-CONF 的客户端, 不必担心并发压力或配置中心宕机导致系统问题。

下图来源 XXL-CONF 官网

输入图片说明

3. API 相关

admin 为 WEB 配置项可视化管理提供的相关 REST API 如下(ps:环境管理,项目管理,用户管理非本文核心,不做描述):

API:/conf/pageList

区分环境,项目名分页读取数据库中 xxl_conf_node 表中的所有配置项,以及根据 key 指定查询一个配置项。

API:/conf/delete

物理删除 xxl_conf_node 表中指定的配置项,同时记录变更操作消息存入 xxl_conf_node_msg 表中,等待守护线程定时通过 key 关联的 xxl_conf_node_msg 消息,与磁盘文件 Properties 的配置项数据比对,推送客户端通知告知配置变更。

API:/conf/add

xxl_conf_node 表中新增配置项,同时记录变更操作消息存入 xxl_conf_node_msg 表中,等待守护线程定时推送客户端通知告知配置变更。

API:/conf/update

更新 xxl_conf_node 表中指定的配置项,同时记录变更操作消息存入 xxl_conf_node_msg 表中,等待守护线程定时推送客户端通知告知配置变更。


admin 为接入的客户端提供的 REST API 如下:

API:/conf/find 入参:keys,支持批量查询

主要作用是查询配置数据,比较特殊的是,该 API 是直接从上文我们提及过的配置中心的磁盘文件 Properties 中寻找该配置项数据,不会查询数据库。

API:/conf/monitor 入参:keys,支持批量查询

主要作用广播通知接入客户端指定配置项是否发生变更,利用了 SpringMVC DeferredResult 特性将异步操作的结果同步返回给接入客户端,对 DeferredResult 不了解的小伙伴可以参考这篇文章《DeferredResult 扫盲》

该 API 是直接从上文我们提及过的配置中心的磁盘文件 Properties 中寻找该配置项数据,若监测到数据变更则利用 DeferredResult 的 setResult 特性将变更结果返回给接入客户端,告知配置已变更,快来请求我(配置中心)获取最新的配置项数据吧。


针对 API 笔者做了些详细描述,下面让我们来走进 XXL-CONF 源码,一探究竟

4.源码分析-项目启动

4.1 admin 配置中心启动

admin 配置中心正常启动后会启动一个向线程池提交两个线程任务。

   @Override
    public void afterPropertiesSet() throws Exception {
        startThead();
    }
  public void startThead() throws Exception {

        /**
         * brocast conf-data msg, sync to file, for "add、update、delete"
         * 监控配置是否发生变更,线程会每隔1秒就查询一次xxl_conf_node_msg表,如果该表中有数据,就说明配置有变化,就立即更新本地快照,并广播给所有客户端,每隔30秒清空一次老数据
         */
        executorService.execute(() -> {
            while (!executorStoped) {
                try {
                    // new message, filter readed
                    List<XxlConfNodeMsg> messageList = xxlConfNodeMsgDao.findMsg(readedMessageIds);
                    if (messageList != null && messageList.size() > 0) {
                        for (XxlConfNodeMsg message : messageList) {
                            readedMessageIds.add(message.getId());


                            // sync file
                            // 配置发生变更,配置中心不会直接将变更的数据推送给客户端,而是告诉客户端数据有变化,需要客户端主动发起http请求调用配置中心的查询接口获取最新的配置
                            setFileConfData(message.getEnv(), message.getKey(), message.getValue());
                        }
                    }

                    // 清除老的消息
                    if ((System.currentTimeMillis() / 1000) % confBeatTime == 0) {
                        xxlConfNodeMsgDao.cleanMessage(confBeatTime);
                        readedMessageIds.clear();
                    }
                } catch (Exception e) {
                    if (!executorStoped) {
                        logger.error(e.getMessage(), e);
                    }
                }
                try {
                     // 休眠 1 秒
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    if (!executorStoped) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });

    }

简单说明下,首先利用 Spring 容器启动执行 Bean 的初始化时机,启动两个线程任务。

第一个线程:监控配置是否发生变更,每隔 1 秒就查询一次 xxl_conf_node_msg 表,如果该表中有数据,就说明配置有变化,就立即更新本地快照,并广播给所有客户端,每隔 30 秒清空一次老数据。

配置变化广播实现如下:

private String setFileConfData(String env, String key, String value) {

        // 获取文件名称
        String confFileName = parseConfDataFileName(env, key);

        // 通过文件名加载磁盘文件
        Properties existProp = PropUtil.loadFileProp(confFileName);
        // 如果存在文件,且证明没有被删除则返回文件路径
        if (existProp != null
                && value != null
                && value.equals(existProp.getProperty("value"))
        ) {
            return new File(confFileName).getPath();
        }

        // write
        Properties prop = new Properties();
        // 配置项数据为空说明已经被删除,则设置属性 value-deleted=true
        if (value == null) {
            prop.setProperty("value-deleted", "true");
        } else {
            // 否则设置设置 value 属性当前的配置项数据
            prop.setProperty("value", value);
        }
        // 写磁盘文件到指定目录下
        PropUtil.writeFileProp(prop, confFileName);
        logger.info(">>>>>>>>>>> xxl-conf, setFileConfData: confFileName={}, value={}", confFileName, value);

        // 从 confDeferredResultMap 中获取客户端请求,响应客户端数据更新
        // brocast monitor client
        List<DeferredResult> deferredResultList = confDeferredResultMap.get(confFileName);
        if (deferredResultList != null) {
            confDeferredResultMap.remove(confFileName);
            for (DeferredResult deferredResult : deferredResultList) {
                deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
            }
        }

        return new File(confFileName).getPath();
    }

上述代码都比较简单,有一处你可以能不太理解如何实现广播通知客户端呢?

其实是通过 Spring MVC 的 DeferredResult 特性,当客户端请求配置中心 /conf/monitor API 时,会将当次请求的 DeferredResult 存储在内存中,key 为文件名称,value 为 DeferredResult 集合,admin 配置中心启动后的线程任务会从 confDeferredResultMap 中获取对应的 DeferredResult,为其设置 result,同步响应客户端是否变更。

对 DeferredResult 不了解的小伙伴可以参考这篇文章《DeferredResult 扫盲》

    /**
     * 配置中心的广播机制利用了Spring MVC 的 DeferredResult 对象特性实现
     */
    private Map<String, List<DeferredResult>> confDeferredResultMap = new ConcurrentHashMap<>();
@Override
    public DeferredResult<ReturnT<String>> monitor(String accessToken, String env, List<String> keys) {

        // 默认的超时时间是 30 秒,如果配置没有发生变化,则等待超时返回
        DeferredResult deferredResult = new DeferredResult(confBeatTime * 1000L, new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor timeout, no key updated."));

        // valid
        if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.equals(accessToken)) {
            deferredResult.setResult(new ReturnT<>(ReturnT.FAIL.getCode(), "AccessToken Invalid."));
            return deferredResult;
        }
        if (env == null || env.trim().length() == 0) {
            deferredResult.setResult(new ReturnT<>(ReturnT.FAIL.getCode(), "env Invalid."));
            return deferredResult;
        }
        if (keys == null || keys.size() == 0) {
            deferredResult.setResult(new ReturnT<>(ReturnT.FAIL.getCode(), "keys Invalid."));
            return deferredResult;
        }
      
        // monitor by client
        for (String key : keys) {
            // invalid key, pass
            if (key == null || key.trim().length() < 4 || key.trim().length() > 100
                    || !RegexUtil.matches(RegexUtil.abc_number_line_point_pattern, key)) {
                continue;
            }

            // monitor each key
            String fileName = parseConfDataFileName(env, key);

            List<DeferredResult> deferredResultList = confDeferredResultMap.get(fileName);
            if (deferredResultList == null) {
                deferredResultList = new ArrayList<>();
                confDeferredResultMap.put(fileName, deferredResultList);
            }

            deferredResultList.add(deferredResult);
        }

        return deferredResult;
    }

第二个线程任务,从数据库加载全量数据,考虑到数据量可能比较大,线程会间隔 30 秒查询一次xxl_conf_node 表,如果数据有变化,就立即更新本地快照并广播给所有客户端



        /**
         *  sync total conf-data, db + file      (1+N/30s)
         *
         *  clean deleted conf-data file
         *
         *  从数据库加载全量数据,考虑到数据量可能比较大,线程会间隔30秒查询一次xxl_conf_node表.如果数据有变化,就立即更新本地快照并广播给所有客户端
         */
        executorService.execute(() -> {
            while (!executorStoped) {

                // align to beattime
                try {
                    long sleepSecond = confBeatTime - (System.currentTimeMillis() / 1000) % confBeatTime;
                    if (sleepSecond > 0 && sleepSecond < confBeatTime) {
                        TimeUnit.SECONDS.sleep(sleepSecond);
                    }
                } catch (Exception e) {
                    if (!executorStoped) {
                        logger.error(e.getMessage(), e);
                    }
                }

                try {

                    // sync registry-data, db + file
                    int offset = 0;
                    int pagesize = 1000;
                    List<String> confDataFileList = new ArrayList<>();

                    List<XxlConfNode> confNodeList = xxlConfNodeDao.pageList(offset, pagesize, null, null, null);
                    while (confNodeList != null && confNodeList.size() > 0) {

                        for (XxlConfNode confNoteItem : confNodeList) {

                            // 配置发生变更,配置中心不会直接将变更的数据推送给客户端,而是告诉客户端数据有变化,需要客户端主动发起http请求调用配置中心的查询接口获取最新的配置
                            String confDataFile = setFileConfData(confNoteItem.getEnv(), confNoteItem.getKey(), confNoteItem.getValue());

                            // collect confDataFile
                            confDataFileList.add(confDataFile);
                        }


                        offset += 1000;
                        confNodeList = xxlConfNodeDao.pageList(offset, pagesize, null, null, null);
                    }

                    // clean old registry-data file
                    cleanFileConfData(confDataFileList);

                    logger.debug(">>>>>>>>>>> xxl-conf, sync totel conf data success, sync conf count = {}", confDataFileList.size());
                } catch (Exception e) {
                    if (!executorStoped) {
                        logger.error(e.getMessage(), e);
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(confBeatTime);
                } catch (Exception e) {
                    if (!executorStoped) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });

生成文件名字的方法如下:

 private String parseConfDataFileName(String env, String key) {
        // fileName
        String fileName = confDataFilePath
                .concat(File.separator).concat(env)
                .concat(File.separator).concat(key)
                .concat(".properties");
        return fileName;
    }

4.2 客户端启动

客户端正常启动,从 properties 中获取系统配置填充到 XxlConfConfig 配置类中。

@Configuration
public class XxlConfConfig {
    private Logger logger = LoggerFactory.getLogger(XxlConfConfig.class);


    @Value("${xxl.conf.admin.address}")
    private String adminAddress;

    @Value("${xxl.conf.env}")
    private String env;

    @Value("${xxl.conf.access.token}")
    private String accessToken;

    @Value("${xxl.conf.mirrorfile}")
    private String mirrorfile;


    @Bean
    public XxlConfFactory xxlConfFactory() {

        XxlConfFactory xxlConf = new XxlConfFactory();
        xxlConf.setAdminAddress(adminAddress);
        xxlConf.setEnv(env);
        xxlConf.setAccessToken(accessToken);
        xxlConf.setMirrorfile(mirrorfile);

        logger.info(">>>>>>>>>>> xxl-conf config init.");
        return xxlConf;
    }

}

利用 Spring 容器启动机制首先启动工厂,来看 XxlConfFactory 类初始化方法

XxlConfBaseFactory.init(adminAddress, env, accessToken, mirrorfile);
public class XxlConfBaseFactory {


    /**
     * init
     *
     * @param adminAddress
     * @param env
     */
    public static void init(String adminAddress, String env, String accessToken, String mirrorfile) {
            // init remote util
        XxlConfRemoteConf.init(adminAddress, env, accessToken);
    // init mirror util
        XxlConfMirrorConf.init(mirrorfile);            
    // init cache + thread, cycle refresh + monitor
        XxlConfLocalCacheConf.init();                
     // listener all key change
        XxlConfListenerFactory.addListener(null, new BeanRefreshXxlConfListener());   

    }

    /**
     * destory
     */
    public static void destroy() {
        XxlConfLocalCacheConf.destroy();    // destroy
    }

}

先来看 XxlConfRemoteConf.init(adminAddress, env, accessToken); 方法

简单来说就是填充字段属性,其中可以看到解析了配置中心 url,可以配置多个,用逗号分隔。

 private static String adminAddress;
    private static String env;
    private static String accessToken;

    private static List<String> adminAddressArr = null;

public static void init(String adminAddress, String env, String accessToken) {

        // valid
        if (adminAddress==null || adminAddress.trim().length()==0) {
            throw new XxlConfException("xxl-conf adminAddress can not be empty");
        }
        if (env==null || env.trim().length()==0) {
            throw new XxlConfException("xxl-conf env can not be empty");
        }


        XxlConfRemoteConf.adminAddress = adminAddress;
        XxlConfRemoteConf.env = env;
        XxlConfRemoteConf.accessToken = accessToken;


        // parse
        XxlConfRemoteConf.adminAddressArr = new ArrayList<>();
        if (adminAddress.contains(",")) {
            XxlConfRemoteConf.adminAddressArr.addAll(Arrays.asList(adminAddress.split(",")));
        } else {
            XxlConfRemoteConf.adminAddressArr.add(adminAddress);
        }

    }

接着执行 XxlConfMirrorConf.init(mirrorfile); 方法

设置 mirrorfile 属性,也就是客户端镜像文件(三级存储)的磁盘存放路径。

private static String mirrorfile = null;

    public static void init(String mirrorfileParam) {
        // valid
        if (mirrorfileParam == null || mirrorfileParam.trim().length() == 0) {
            throw new XxlConfException("xxl-conf mirrorfileParam can not be empty");
        }

        mirrorfile = mirrorfileParam;
    }

接着执行 XxlConfLocalCacheConf.init(); 方法

        // 客户端内存级缓存 四级存储
    private static ConcurrentHashMap<String, CacheNode> localCacheRepository = null;

    private static Thread refreshThread;
    private static boolean refreshThreadStop = false;

    public static void init() {

        // 创建一个本地缓存用于保存配置数据
        localCacheRepository = new ConcurrentHashMap<String, CacheNode>();

        // preload: mirror or remote
        Map<String, String> preConfData = new HashMap<>();
    
          // 读取客户端镜像文件配置
        Map<String, String> mirrorConfData = XxlConfMirrorConf.readConfMirror();

        Map<String, String> remoteConfData = null;
        if (mirrorConfData != null && mirrorConfData.size() > 0) {
            // 拉取镜像文件中配置集在远端的真实配置
            remoteConfData = XxlConfRemoteConf.find(mirrorConfData.keySet());
        }

        if (mirrorConfData != null && mirrorConfData.size() > 0) {
            preConfData.putAll(mirrorConfData);
        }
        // 如果远端加载成功,则会覆盖镜像配置
        if (remoteConfData != null && remoteConfData.size() > 0) {
            preConfData.putAll(remoteConfData);
        }
        if (preConfData != null && preConfData.size() > 0) {
            for (String preKey : preConfData.keySet()) {
                // PRELOAD
                set(preKey, preConfData.get(preKey), SET_TYPE.PRELOAD);
            }
        }

        // refresh thread
        // 启动一个守护线程,每隔3秒钟查看一下本地缓存是否有数据,线程会在此阻塞.
        // 客户会通过守护线程与服务端保持长连接,客户端会循环调用配置中心的监控接口,如果配置中心数据有变化,会立刻通知客户端,客户端接收到通知会立刻调用配置中心的查询接口获取数据,如果配置中心的数据没有变更,则默认30秒后再调用查询接口
        refreshThread = new Thread(() -> {
            while (!refreshThreadStop) {
                try {
                    refreshCacheAndMirror();
                } catch (Exception e) {
                    if (!refreshThreadStop && !(e instanceof InterruptedException)) {
                        logger.error(">>>>>>>>>> xxl-conf, refresh thread error.");
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>> xxl-conf, refresh thread stoped.");
        });
        refreshThread.setDaemon(true);
        refreshThread.start();

        logger.info(">>>>>>>>>> xxl-conf, XxlConfLocalCacheConf init success.");
    }

简单说明下上述代码的作用

首先读取客户端镜像文件配置,接着拉取客户端镜像文件中配置集在远端的真实配置:调用配置中心 /conf/find API,首先使用客户端镜像文件中的配置,但如果远端加载成功,则会覆盖镜像配置。

最后遍历每个配置,调用 set(preKey, preConfData.get(preKey), SET_TYPE.PRELOAD); 方法,

可以看到当前 optType 为 SET_TYPE.PRELOAD,所以直接将每个配置项,封装成 CacheNode 节点,存储在客户端内存级 localCacheRepository 缓存中。

 private static void set(String key, String value, SET_TYPE optType) {
        localCacheRepository.put(key, new CacheNode(value));
        logger.info(">>>>>>>>>> xxl-conf: {}: [{}={}]", optType, key, value);

        // value updated, invoke listener
        if (optType == SET_TYPE.RELOAD) {
            XxlConfListenerFactory.onChange(key, value);
        }

        // new conf, new monitor
        if (optType == SET_TYPE.SET) {
            refreshThread.interrupt();
        }
    }

    public static class CacheNode implements Serializable {
        private static final long serialVersionUID = 42L;

        private String value;

        public CacheNode() {
        }

        public CacheNode(String value) {
            this.value = value;
        }

        public String getValue() {
            return value;
        }

        public void setValue(String value) {
            this.value = value;
        }
    }

接着会启动一个守护线程,调用 refreshCacheAndMirror(); 方法

每隔 3 秒钟查看一下本地缓存是否有数据,线程会在此阻塞

 private static void refreshCacheAndMirror() throws InterruptedException {
        // 每隔3秒钟查看一下本地缓存是否有数据 没数据就等待3秒
        if (localCacheRepository.size() == 0) {
            TimeUnit.SECONDS.sleep(3);
            return;
        }

        // 有数据,会向配置中心发送http请求查询自己需要的配置信息 查询是否配置变更
        boolean monitorRet = XxlConfRemoteConf.monitor(localCacheRepository.keySet());

        // 避免失败重试请求太快
        if (!monitorRet) {
            TimeUnit.SECONDS.sleep(10);
        }

        // refresh cache: remote > cache
        Set<String> keySet = localCacheRepository.keySet();
        if (keySet.size() > 0) {

            Map<String, String> remoteDataMap = XxlConfRemoteConf.find(keySet);
            // 先比对一下配置中心的配置项和本地缓存中配置项是否相同
            if (remoteDataMap != null && remoteDataMap.size() > 0) {
                for (String remoteKey : remoteDataMap.keySet()) {
                    String remoteData = remoteDataMap.get(remoteKey);

                    CacheNode existNode = localCacheRepository.get(remoteKey);
                    // 如果相同就直接忽略不处理
                    if (existNode != null && existNode.getValue() != null && existNode.getValue().equals(remoteData)) {
                        logger.debug(">>>>>>>>>> xxl-conf: RELOAD unchange-pass [{}].", remoteKey);
                    }
                    // 如果本地缓存仓库中没有该key,或该key的值为空,或该key的值有变化
                    else {
                        // 更新本地缓存
                        set(remoteKey, remoteData, SET_TYPE.RELOAD);
                    }

                }
            }

        }

        // refresh mirror: cache > mirror
        // 将缓存中的配置同步到镜像文件
        Map<String, String> mirrorConfData = new HashMap<>();
        for (String key : keySet) {
            CacheNode existNode = localCacheRepository.get(key);
            mirrorConfData.put(key, existNode.getValue() != null ? existNode.getValue() : "");
        }
        XxlConfMirrorConf.writeConfMirror(mirrorConfData);

        logger.debug(">>>>>>>>>> xxl-conf, refreshCacheAndMirror success.");
    }

当客户端本地缓存中有数据的时候,就会被守护线程扫描到,会向配置中心发送 http 请求查询自己需要的配置信息,查询到配置后,它会先比对一下配置中心的配置项和本地缓存中配置项是否相同,如果相同就直接忽略不处理,如果不相同,说明有变化,如果本地缓存仓库中没有该key,或该key的值为空,或该key的值有变化,则更新本地缓存,最后将缓存中的配置同步到镜像文件。

而客户会通过守护线程与服务端保持长连接,客户端会循环调用配置中心的监控接口,如果配置中心数据有变化会立刻通知客户端,客户端接收到通知会立刻调用配置中心的查询接口获取数据,如果配置中心的数据没有变更,则默认 30 秒后再调用查询接口。


这里需要注意的是:如果配置中心增加了新的配置,客户端是不会收到通知的,因为客户端每次请求接口只拉取自己所使用到的配置


接着回到 XxlConfBaseFactory#init() 方法,继续执行 XxlConfListenerFactory.addListener(null, new BeanRefreshXxlConfListener()); 方法

注册监听器,支持监听配置变更事件,例如可据此实现动态刷新JDBC连接池等高级功能场景,如下使用方式

XxlConfClient.addListener("default.key01", new XxlConfListener(){
    @Override
    public void onChange(String key, String value) throws Exception {
        logger.info("配置变更事件通知:{}={}", key, value);
    }
});

来看下源码实现

public class XxlConfListenerFactory {
    private static Logger logger = LoggerFactory.getLogger(XxlConfListenerFactory.class);

    /**
     * xxl conf listener repository
     */
    private static ConcurrentHashMap<String, List<XxlConfListener>> keyListenerRepository = new ConcurrentHashMap<>();
  
    private static List<XxlConfListener> noKeyConfListener = Collections.synchronizedList(new ArrayList<XxlConfListener>());

    /**
     * add listener and first invoke + watch
     *
     * @param key   empty will listener all key
     * @param xxlConfListener
     * @return
     */
    public static boolean addListener(String key, XxlConfListener xxlConfListener){
        if (xxlConfListener == null) {
            return false;
        }
        if (key==null || key.trim().length()==0) {
            // listene all key used
            noKeyConfListener.add(xxlConfListener);
            return true;
        } else {

            // first use, invoke and watch this key
            try {
                String value = XxlConfClient.get(key);
                xxlConfListener.onChange(key, value);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }

            // listene this key
            List<XxlConfListener> listeners = keyListenerRepository.get(key);
            if (listeners == null) {
                listeners = new ArrayList<>();
                keyListenerRepository.put(key, listeners);
            }
            listeners.add(xxlConfListener);
            return true;
        }
    }

    /**
     * invoke listener on xxl conf change
     *
     * @param key
     */
    public static void onChange(String key, String value){
        if (key==null || key.trim().length()==0) {
            return;
        }
        List<XxlConfListener> keyListeners = keyListenerRepository.get(key);
        if (keyListeners!=null && keyListeners.size()>0) {
            for (XxlConfListener listener : keyListeners) {
                try {
                    listener.onChange(key, value);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
        if (noKeyConfListener.size() > 0) {
            for (XxlConfListener confListener: noKeyConfListener) {
                try {
                    confListener.onChange(key, value);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }

}

代码比较简单,BeanRefreshXxlConfListener 实现 onChange 方法

BeanRefreshXxlConfListener#onChange() 方法具体实现如下:

   @Override
    public void onChange(String key, String value) throws Exception {
        List<BeanField> beanFieldList = key2BeanField.get(key);
        if (beanFieldList!=null && beanFieldList.size()>0) {
            for (BeanField beanField: beanFieldList) {
                XxlConfFactory.refreshBeanField(beanField, value, null);
            }
        }
    }

key2BeanField 是什么呢?

  // key : object-field[]
    private static Map<String, List<BeanField>> key2BeanField = new ConcurrentHashMap<String, List<BeanField>>();

    public static void addBeanField(String key, BeanField beanField){
        List<BeanField> beanFieldList = key2BeanField.get(key);
        if (beanFieldList == null) {
            beanFieldList = new ArrayList<>();
            key2BeanField.put(key, beanFieldList);
        }
        for (BeanField item: beanFieldList) {
            if (item.getBeanName().equals(beanField.getBeanName()) && item.getProperty().equals(beanField.getProperty())) {
                return; // avoid repeat refresh
            }
        }
        beanFieldList.add(beanField);
    }

它实际上是在 Spring 生命周期,postProcessPropertyValues 时设置,用来支持 Spring XML 占位符的方式动态更新配置,具体实现如下:

@Override
    public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {

        // 2、XML('$XxlConf{...}'):resolves placeholders + watch
        if (!beanName.equals(this.beanName)) {

            PropertyValue[] pvArray = pvs.getPropertyValues();
            for (PropertyValue pv : pvArray) {
                if (pv.getValue() instanceof TypedStringValue) {
                    String propertyName = pv.getName();
                    String typeStringVal = ((TypedStringValue) pv.getValue()).getValue();
                    if (xmlKeyValid(typeStringVal)) {

                        // object + property
                        String confKey = xmlKeyParse(typeStringVal);
            // 向配置中心发送http请求查询数据,先将查询到的数据放入本地缓存,然后通过反射给这些字段赋值
                        String confValue = XxlConfClient.get(confKey, "");

                        // resolves placeholders
                        BeanRefreshXxlConfListener.BeanField beanField = new BeanRefreshXxlConfListener.BeanField(beanName, propertyName);
                        //refreshBeanField(beanField, confValue, bean);

                        Class propClass = String.class;
                        for (PropertyDescriptor item: pds) {
                            if (beanField.getProperty().equals(item.getName())) {
                                propClass = item.getPropertyType();
                            }
                        }
                        Object valueObj = FieldReflectionUtil.parseValue(propClass, confValue);
                        pv.setConvertedValue(valueObj);

                        // watch
                        BeanRefreshXxlConfListener.addBeanField(confKey, beanField);

                    }
                }
            }

        }

        return super.postProcessPropertyValues(pvs, pds, bean, beanName);
    }

如果扫描到 XML 配置格式满足 '$XxlConf{...}',通过 key 先从客户端四级存储 localCacheRepository 中获取,获取不到则向配置中心请求 /conf/find API 查询数据,先将查询到的数据放入本地缓存,然后通过反射给这些字段赋值。


XxlConfFactory.refreshBeanField(beanField, value, null); 实现如下

主要是支持 Spring XML 占位符的方式动态更新配置

// XxlConfFactory.java
public static void refreshBeanField(final BeanRefreshXxlConfListener.BeanField beanField, final String value, Object bean){
        if (bean == null) {
          // 已优化:启动时禁止实用,getBean 会导致Bean提前初始化,风险较大;
            bean = XxlConfFactory.beanFactory.getBean(beanField.getBeanName());    
        }
        if (bean == null) {
            return;
        }

        BeanWrapper beanWrapper = new BeanWrapperImpl(bean);

        // property descriptor
        PropertyDescriptor propertyDescriptor = null;
        PropertyDescriptor[] propertyDescriptors = beanWrapper.getPropertyDescriptors();
        if (propertyDescriptors!=null && propertyDescriptors.length>0) {
            for (PropertyDescriptor item: propertyDescriptors) {
                if (beanField.getProperty().equals(item.getName())) {
                    propertyDescriptor = item;
                }
            }
        }

        // refresh field: set or field
        if (propertyDescriptor!=null && propertyDescriptor.getWriteMethod() != null) {
            beanWrapper.setPropertyValue(beanField.getProperty(), value);    // support mult data types
            logger.info(">>>>>>>>>>> xxl-conf, refreshBeanField[set] success, {}#{}:{}",
                    beanField.getBeanName(), beanField.getProperty(), value);
        } else {

            final Object finalBean = bean;
            ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
                @Override
                public void doWith(Field fieldItem) throws IllegalArgumentException, IllegalAccessException {
                    if (beanField.getProperty().equals(fieldItem.getName())) {
                        try {
                            Object valueObj = FieldReflectionUtil.parseValue(fieldItem.getType(), value);

                            fieldItem.setAccessible(true);
                            fieldItem.set(finalBean, valueObj);        // support mult data types

                            logger.info(">>>>>>>>>>> xxl-conf, refreshBeanField[field] success, {}#{}:{}",
                                    beanField.getBeanName(), beanField.getProperty(), value);
                        } catch (IllegalAccessException e) {
                            throw new XxlConfException(e);
                        }
                    }
                }
            });

        }

    }

这里是不是很简单,就是通过反射来填充字段来设置配置项,没什么可说的。


了解 Spring 都知道 postProcessPropertyValues 是否执行,实际上是通过 postProcessAfterInstantiation 控制,当 postProcessAfterInstantiation 返回 true,postProcessPropertyValues 才会被执行,在 Bean 的实例化之后回调。

// XxlConfFactory.java
@Override
    public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {


        // 1、Annotation('@XxlConf'):resolves conf + watch
        if (!beanName.equals(this.beanName)) {

            ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
                @Override
                public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                    // 解析所有加了@XxlConf注解的字段或使用$XxlConf{}占位符配置,拿到key,
                    if (field.isAnnotationPresent(XxlConf.class)) {
                        String propertyName = field.getName();
                        XxlConf xxlConf = field.getAnnotation(XxlConf.class);

                        String confKey = xxlConf.value();
                        // 向配置中心发送http请求查询数据,先将查询到的数据放入本地缓存,然后通过反射给这些字段赋值
                        String confValue = XxlConfClient.get(confKey, xxlConf.defaultValue());


                        // resolves placeholders
                        BeanRefreshXxlConfListener.BeanField beanField = new BeanRefreshXxlConfListener.BeanField(beanName, propertyName);
                        refreshBeanField(beanField, confValue, bean);

                        // watch
                        if (xxlConf.callback()) {
                            BeanRefreshXxlConfListener.addBeanField(confKey, beanField);
                        }

                    }
                }
            });
        }

        return super.postProcessAfterInstantiation(bean, beanName);
    }

通过这个方法支持了通过 @XxlConf 注解的方式获取配置,动态刷新配置。

5. 总结

至此本文完。

作者:MRyan


本文采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
转载时请注明本文出处及文章链接。本文链接:https://www.wormholestack.com/archives/711/
2024 © MRyan 97 ms