转载 

面试官:如何实现一个读写分离的中间件?

分类:    138人阅读    IT小君  2023-05-29 21:17

上方蓝色“程序员追风”,选择“设为星标”

回复“资料”获取整理好的面试资料


原文:jianshu.com/p/549d88222528


第1-100期:Java面试题及答案整理


公司 DBA 一直埋怨 Atlas 的难用,希望从客户端层出一个读写分离的方案。开源市场上在客户端做读写分离的开源软件基本上没有。业务方利用 Spring 自带的路由数据源能实现部分读写分离的功能,但是功能不够完善。部分参考 Sharing-JDBC 源码思想,利用部分业余时间,写了这个 Robustdb,总共只使用了十多个类,两千多行代码左右。

一、背景

随着业务量的增长,所有公司都不是直接选择分库分表设计方案的。很长一段时间内,会采用 库垂直拆分和分区表 来解决库表数据量比较大的问题,采用读写分离来解决访问压力比较大的问题。我们公司也是一样。目前绝大部分业务还是使用读写分离的方案。我相信很多公司和我们公司的架构一样,采用中间代理层做读写分离。结构如下:
图片

第一层是 VIP 曾。通过 VIP 做中间映射层,避免了应用绑定数据库的真实 IP,这样在数据库故障时,可以通过 VIP 飘移来将流量打到另一个库。但是 VIP 无法跨机房,为未来的异地多活设计埋下绕不过去的坎。
VIP 下面一层是读写分离代理,我们公司使用的是 360 的 Atlas。Atlas 通过将 SQL 解析为 DML(Data Modify Language)和 DQL(Data Query Language),DML 的请求全部发到主库,DQL 根据配置比例分发到读库(读库包括主库和从库)。
使用 Atlas 有以下不足:
  • Altas 不再维护更新,现存一些 bug,bug 网上很多描述;
  • Altas 中没有具体应用请求 IP 与具体数据库 IP 之间的映射数据,所以无法准确查到访问DB的请求是来自哪个应用;
  • Altas 控制的粒度是 SQL 语句,只能指定某条查询 SQL 语句走主库,不能根据场景指定;
  • DB 在自动关闭某个与 Altas 之间的连接时,Altas 不会刷新,它仍有可能把这个失效的连接给下次请求的应用使用;
  • 使用 Altas,对后期增加其他功能模会比较麻烦。
基于 Atlas 以上问题,以及我们需要将数据库账号和连接配置集中管控。我们设计了下面这套方案:
图片

通过在客户端做读写分离可以解决 Atlas 上面存在的不足。整个流程如下图所示:
图片

二、Robustdb 原理

1、读写分离设计核心点——路由
支持每条 SQL 按照 DML、DQL 类型的默认路由。
需求描述
目前公司采用读写分离的方案来增强数据库的性能,所有的 DML(insert、updata、delete)操作在主库,通过 MySQL 的 binlog 同步,将数据同步到多个读库。所有的 DQL(select) 操作主库或从库,从而增强数据的读能力。
支持方法级别的指定路由
需求描述
在 Service 中指定方法中所有 DB 操作方法操作同一个数据库(主要是主库),保证方法中的 DB 读写都操作主库,避免数据同步延迟导致读从库数据异常。从而保证整个方法的事务属性。
解决思路
我们将获取真实数据库(主库还是哪个从库)放到需要建立连接时的地方,为此我们创建了 BackendConnection(传统是先连接数据库,然后再创建连接)。
在获取数据库连接时,通过对请求的 SQL 进行解析和类型判别,识别为 DML 和 DQL。如果是 DML,则在线程的单 SQL 线程本地变量上设置为 master,DQL 则设置为 slave,为后续选择数据库提供选择参考。
如果要支持方法级别的事务(也就是整个方法的 SQL 请求都发送到主库),需要借助拦截器,我们采用的是 AspectJ 方式的拦截器。会拦截所有带有类型为 dataSourceType 的 annotation 的方法。在执行方法前,在线程的多 SQL 线程本地变量上设置 dataSourceType 的 name 值(name 值为 master 代表走主库,name 值为 slave 代表走从库)。线程的多 SQL 线程本地变量为后续选择数据库提供选择参考。在方法执行完后,清理本地线程变量。
多 SQL 线程本地变量的优先级高于单 SQL 线程本地变量的优先级。
图片

图片

注意点
本地线程变量要使用阿里包装的 Ttl,防止用户在方法内部启动线程池,导致普通的线程本地变量丢失,从而导致选库异常。
使用 Ttl 之后,需要在公司的 JVM 启动参数中增加
-javaagent:/{Path}/transmittable-thread-local-2.6.0-SNAPSHOT.jar
原理就是在 JVM 启动时,加载 transmittable-thread-local 中的类替换逻辑,将以后的 Runnable、Callable、ExecuteService 等线程池相关类替换成增强后的 TtlRunnable、TtlCallable、TtlExecuteService 等。
下面展示一下时序图中类的核心代码,仅供参考:
DataSoueceAspect
@Aspect@Componentpublic class DataSourceAspect{    @Around("execution(* *(..)) && @annotation(dataSourceType)")    public Object aroundMethod(ProceedingJoinPoint pjd, DataSourceType dataSourceType) throws Throwable {      DataSourceContextHolder.setMultiSqlDataSourceType(dataSourceType.name());        Object result = pjd.proceed();        DataSourceContextHolder.clearMultiSqlDataSourceType();        return result;    }}
BackendConnection
public final class BackendConnection extends AbstractConnectionAdapter {
private AbstractRoutingDataSource abstractRoutingDataSource; //用于缓存一条sql(可能对应多个statement)或者一次事务中的连接 private final Map<String, Connection> connectionMap = new HashMap<String, Connection>();
//构造函数 public BackendConnection(AbstractRoutingDataSource abstractRoutingDataSource) { this.abstractRoutingDataSource = abstractRoutingDataSource; }
@Override public PreparedStatement prepareStatement(String sql) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql); }
@Override public DatabaseMetaData getMetaData() throws SQLException { if(connectionMap == null || connectionMap.isEmpty()){ return abstractRoutingDataSource.getResolvedDefaultDataSource().getConnection().getMetaData(); } return fetchCachedConnection(connectionMap.keySet().iterator().next().toString()).get().getMetaData(); }
@Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql,resultSetType,resultSetConcurrency); }
@Override public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); }
@Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, autoGeneratedKeys); }
@Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, columnIndexes); }
@Override public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { return getConnectionInternal(sql).prepareStatement(sql, columnNames); }
@Override protected Collection<Connection> getConnections() { return connectionMap.values(); }
/** * 根据sql获取连接,对连接进行缓存 * @param sql * @return * @throws SQLException */ private Connection getConnectionInternal(final String sql) throws SQLException { //设置线程环境遍历 if (ExecutionEventUtil.isDML(sql)) { DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.MASTER); } else if (ExecutionEventUtil.isDQL(sql)) { DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.SLAVE); } //根据上面设置的环境变量,选择相应的数据源 Object dataSourceKey = abstractRoutingDataSource.determineCurrentLookupKey(); String dataSourceName = dataSourceKey.toString(); //看缓存中是否已经含有相应数据源的连接 Optional<Connection> connectionOptional = fetchCachedConnection(dataSourceName); if (connectionOptional.isPresent()) { return connectionOptional.get(); } //缓存中没有相应连接,建立相应连接,并放入缓存 Connection connection = abstractRoutingDataSource.getTargetDataSource(dataSourceKey).getConnection(); connection.setAutoCommit(super.getAutoCommit()); connection.setTransactionIsolation(super.getTransactionIsolation()); connectionMap.put(dataSourceKey.toString(), connection); return connection; }
/** * 从缓存中取数据源 * @param dataSourceName * @return */ private Optional<Connection> fetchCachedConnection(final String dataSourceName) { if (connectionMap.containsKey(dataSourceName)) { return Optional.of(connectionMap.get(dataSourceName)); } return Optional.absent(); } }
AbstractRoutingDataSource
/** *  * @Type AbstractRoutingDataSource * @Desc 数据源路由器(spring的AbstractRoutingDataSource将resolvedDataSources的注入放在bean初始化) * @Version V1.0 */public abstract class AbstractRoutingDataSource extends AbstractDataSource {        private boolean lenientFallback = true;        private Map<Object, Object> targetDataSources;
private Object defaultTargetDataSource; private Map<Object, DataSource> resolvedDataSources = new HashMap<Object, DataSource>(); private DataSource resolvedDefaultDataSource; private Logger logger = LoggerFactory.getLogger(AbstractRoutingDataSource.class);
public BackendConnection getConnection() throws SQLException { return new BackendConnection(this); }
public BackendConnection getConnection(String username, String password) throws SQLException { return new BackendConnection(this); } public void afterPropertiesSet() { if (this.targetDataSources == null) { throw new IllegalArgumentException("Property 'targetDataSources' is required"); } this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size()); for (Map.Entry entry : this.targetDataSources.entrySet()) { Object lookupKey = resolveSpecifiedLookupKey(entry.getKey()); DataSource dataSource = resolveSpecifiedDataSource(entry.getValue()); this.resolvedDataSources.put(lookupKey, dataSource); } if (this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource); } }
public void putNewDataSource(Object key, DataSource dataSource){ if(this.resolvedDataSources == null){ this.resolvedDataSources = new HashMap<Object, DataSource>(); } if(this.resolvedDataSources.containsKey(key)){ this.resolvedDataSources.remove(key); logger.info("remove old key:" + key); } logger.info("add key:" + key + ", value=" + dataSource); this.resolvedDataSources.put(key, dataSource); } /** * 数据源选择逻辑 */ public DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSourceContextHolder.clearSingleSqlDataSourceType(); int index = 0; for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) { logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString()); index++; } DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } logger.debug("myAbstractDS, hit DS is " + dataSource.toString()); return dataSource; } public DataSource getTargetDataSource(Object lookupKey) { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); if(lookupKey == null){ lookupKey = determineCurrentLookupKey(); } DataSourceContextHolder.clearSingleSqlDataSourceType(); int index = 0; for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) { logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString()); index++; } DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } logger.debug("myAbstractDS, hit DS is " + dataSource.toString()); return dataSource; } public abstract Object determineCurrentLookupKey(); public abstract Object getCurrentSlaveKey(); @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface)); }
@SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> iface) throws SQLException { if (iface.isInstance(this)){ return (T) this; } return determineTargetDataSource().unwrap(iface); } protected Object resolveSpecifiedLookupKey(Object lookupKey) { return lookupKey; } protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException { if (dataSource instanceof DataSource) { return (DataSource) dataSource; } else { throw new IllegalArgumentException( "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource); } } //get set方法省略}
DataSourceContextHolder
public class DataSourceContextHolder {        private static final TransmittableThreadLocal<String> singleSqlContextHolder = new TransmittableThreadLocal<String>();        private static final TransmittableThreadLocal<String> multiSqlContextHolder = new TransmittableThreadLocal<String>();        /**     * @Description: 设置单条sql数据源类型     * @param dataSourceType  数据库类型     * @return void     * @throws     */    public static void setSingleSqlDataSourceType(String dataSourceType) {        singleSqlContextHolder.set(dataSourceType);    }        /**     * @Description: 获取单条sql数据源类型     * @param      * @return String     * @throws     */    public static String getSingleSqlDataSourceType() {        return singleSqlContextHolder.get();    }        /**     * @Description: 清除单条sql数据源类型     * @param      * @return void     * @throws     */    public static void clearSingleSqlDataSourceType() {        singleSqlContextHolder.remove();    }        /**     * @Description: 设置多条sql数据源类型     * @param dataSourceType  数据库类型     * @return void     * @throws     */    public static void setMultiSqlDataSourceType(String dataSourceType) {        multiSqlContextHolder.set(dataSourceType);    }        /**     * @Description: 获取多条sql数据源类型     * @param      * @return String     * @throws     */    public static String getMultiSqlDataSourceType() {        return multiSqlContextHolder.get();    }        /**     * @Description: 清除多条sql数据源类型     * @param      * @return void     * @throws     */    public static void clearMultiSqlDataSourceType() {        multiSqlContextHolder.remove();    }
/** * 判断当前线程是否为使用从库为数据源. 最外层service有slave的aop标签 或者 service没有aop标签且单条sql为DQL * * @return */ public static boolean isSlave() { return "slave".equals(multiSqlContextHolder.get()) || (multiSqlContextHolder.get()==null && "slave".equals(singleSqlContextHolder.get())) ; } }
DynamicDataSource
public class DynamicDataSource extends AbstractRoutingDataSource implements InitializingBean{          private static final Logger logger = LoggerFactory.getLogger(DynamicDataSource.class);         private Integer slaveCount = 0;          // 轮询计数,初始为-1,AtomicInteger是线程安全的      private AtomicInteger counter = new AtomicInteger(-1);         // 记录读库的key      private List<Object> slaveDataSources = new ArrayList<Object>(0);         // slave库的权重    private  Map<Object,Integer>  slaveDataSourcesWeight;        private Object currentSlaveKey;        public DynamicDataSource() {        super();    }
/** * 构造函数 * @param defaultTargetDataSource * @param targetDataSources * @param slaveDataSourcesWeight */ public DynamicDataSource(Object defaultTargetDataSource, Map<Object,Object> targetDataSources, Map<Object,Integer> slaveDataSourcesWeight) { this.setResolvedDataSources(new HashMap<Object, DataSource>(targetDataSources.size())); for (Map.Entry<Object, Object> entry : targetDataSources.entrySet()) { DataSource dataSource = resolveSpecifiedDataSource(entry.getValue()); this.putNewDataSource(entry.getKey(), dataSource); } if (defaultTargetDataSource != null) { this.setResolvedDefaultDataSource(resolveSpecifiedDataSource(defaultTargetDataSource)); } this.setSlaveDataSourcesWeight(slaveDataSourcesWeight); this.afterPropertiesSet(); }
@Override public Object determineCurrentLookupKey() { // 使用DataSourceContextHolder保证线程安全,并且得到当前线程中的数据源key if (DataSourceContextHolder.isSlave()) { currentSlaveKey = getSlaveKey(); return currentSlaveKey; } //TODO Object key = "master"; return key; } @Override public void afterPropertiesSet() { try { super.afterPropertiesSet(); Map<Object, DataSource> resolvedDataSources = this.getResolvedDataSources(); //清空从库节点,重新生成 slaveDataSources.clear(); slaveCount = 0; for (Map.Entry<Object, DataSource> entry : resolvedDataSources.entrySet()) { if(slaveDataSourcesWeight.get(entry.getKey())==null){ continue; } for(int i=0; i<slaveDataSourcesWeight.get(entry.getKey());i++){ slaveDataSources.add(entry.getKey()); slaveCount++; } } } catch (Exception e) { logger.error("afterPropertiesSet error! ", e); } } /** * 轮询算法实现 * * @return */ public Object getSlaveKey() { if(slaveCount <= 0 || slaveDataSources == null || slaveDataSources.size() <= 0){ return null; } Integer index = counter.incrementAndGet() % slaveCount; if (counter.get() > 9999) { // 以免超出Integer范围 counter.set(-1); // 还原 } return slaveDataSources.get(index); }
public Map<Object, Integer> getSlaveDataSourcesWeight() { return slaveDataSourcesWeight; }
public void setSlaveDataSourcesWeight(Map<Object, Integer> slaveDataSourcesWeight) { this.slaveDataSourcesWeight = slaveDataSourcesWeight; }
public Object getCurrentSlaveKey() { return currentSlaveKey; }}
2、读库流量分配策略设计
我们所有的数据库连接都是管控起来的,包括每个库的流量配置都是支持动态分配的。
支持读库按不同比例承接读请求。通过配置页面动态调整应用的数据库连接以及比例,支持随机或者顺序的方式将流量分配到相应的读库中去。
这里我们使用的配置管理下发中心是我们公司自己开发的 gconfig,当然替换成开源的 diamond 或者 applo 也是可以的。
当接收到配管中心的调整指令,会动态更新应用数据源连接,然后更新 beanFactory 中的 datasource。核心函数如下:
/**     * 更新beanFactory     * @param properties     */    public void refreshDataSource(String properties) {        YamlDynamicDataSource dataSource;        try {            dataSource = new YamlDynamicDataSource(properties);        } catch (IOException e) {            throw new RuntimeException("convert datasource config failed!");        }
// 验证必须字段是否存在 if (dataSource == null && dataSource.getResolvedDataSources() == null || dataSource.getResolvedDefaultDataSource() == null || dataSource.getSlaveDataSourcesWeight() == null) { throw new RuntimeException("datasource config error!"); } ConcurrentHashMap<Object, DataSource> newDataSource = new ConcurrentHashMap<Object, DataSource>( dataSource.getResolvedDataSources());
//更新数据源的bean DynamicDataSource dynamicDataSource = (DynamicDataSource) ((DefaultListableBeanFactory) beanFactory) .getBean(dataSourceName); dynamicDataSource.setResolvedDefaultDataSource(dataSource.getResolvedDefaultDataSource()); dynamicDataSource.setResolvedDataSources(new HashMap<Object, DataSource>());//将数据源清空,重新添加 for (Entry<Object, DataSource> element : newDataSource.entrySet()) { dynamicDataSource.putNewDataSource(element.getKey(), element.getValue()); } dynamicDataSource.setSlaveDataSourcesWeight(dataSource.getSlaveDataSourcesWeight()); dynamicDataSource.afterPropertiesSet();
}


三、性能

我们经过性能测试,发现 Robustdb 的性能在一定层度上比 Atlas 性能更好。压测结果如下:
图片


图片 你在看吗

转载于:http://mp.weixin.qq.com/s?__biz=MzIwNjg4MzY4NA==&mid=2247519109&idx=2&sn=0984d2a947effebf20b7f29178c2d706&chksm=9718158ea06f9c987ec93ab1ced80568b86e7bb5d42f946e45f20120e428b683f932423b0a64#rd

点击广告,支持我们为你提供更好的服务

js+css3抽奖转盘旋转点餐代码

css鼠标跟随文字模糊特效

HTML5 Canvas竖直流动线条背景动画特效

html5图标下拉搜索框自动匹配代码

html5 svg夜空中星星流星动画场景特效

html5 canvas进度条圆环图表统计动画特效

响应式时尚单品在线商城网站模板

有机水果蔬菜HTML5网站模板

小众时尚单品在线电子商务网站模板

现代时尚家具公司网站模板

css+js实现的颜色渐变数字时钟动画特效

HTML5数字产品服务公司网站模板

html5 canvas彩色碎片组合球形旋转动画特效

HTML5现代家居装潢公司网站模板

jQuery右端悬浮带返回顶部特效

网页设计开发公司网站模板

响应式太阳能能源公司网站模板

响应式咖啡饮品宣传网站模板

canvas炫酷鼠标移动文字粒子特效

中小型创意设计服务公司网站模板

点击广告,支持我们为你提供更好的服务
 工具推荐 更多»
点击广告,支持我们为你提供更好的服务