这期内容当中小编将会给大家带来有关ShardingContent的功能有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

创新互联建站从2013年开始,是专业互联网技术服务公司,拥有项目网站制作、网站设计网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元济源做网站,已为上家服务,为济源各地企业和个人服务,联系电话:18980820575
成都创新互联是专业的市南网站建设公司,市南接单;提供成都做网站、成都网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行市南网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
ShardingContent主要做了那些功能呢?主要有两部分:
数据源分片元数据
主要根据数据源连接获取对应的url,通过解析url参数来封装数据源分片元数据;数据源分片元数据主要后续SQL路由DCL(比如:授权、创建用户等)操作使用
表分片元数据
主要根据数据节点来获取真实表的元数据;而表分片元数据主要后续SQL解析填充使用
源码分析
1.ShardingContext构造,主要分析ShardingTableMetaData
public ShardingContext(final MapdataSourceMap, final ShardingRule shardingRule, final DatabaseType databaseType, final Properties props) throws SQLException { this.shardingRule = shardingRule; //获取数据源原始元数据信息 this.cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); //数据源类型 this.databaseType = databaseType; //sharding 配置参数 //比如:sql打印、线程池大小配置等 shardingProperties = new ShardingProperties(null == props ? new Properties() : props); //Statement、PrepareStatement执行线程池大小 //一个分片数据源将使用独立的线程池,它不会在同一个JVM中共享线程池甚至不同的数据源 //默认无限制 int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); //执行引擎 executeEngine = new ShardingExecuteEngine(executorSize); //数据源分片元数据 //以MySQL为例,建立连接获取mysql url,将解析后的url参数信息封装到ShardingDataSourceMetaData ShardingDataSourceMetaData shardingDataSourceMetaData = new ShardingDataSourceMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType); //表分片元数据 //以mysql为例,会建立连接获取表的元信息(字段、字段类型、索引) ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(getTableMetaDataInitializer(dataSourceMap, shardingDataSourceMetaData).load(shardingRule)); //封装数据源分片元数据、表分片元数据 metaData = new ShardingMetaData(shardingDataSourceMetaData, shardingTableMetaData); //解析结果缓存 parsingResultCache = new ParsingResultCache(); } // private TableMetaDataInitializer getTableMetaDataInitializer(final Map dataSourceMap, final ShardingDataSourceMetaData shardingDataSourceMetaData) { return new TableMetaDataInitializer(shardingDataSourceMetaData, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap), shardingProperties. getValue(ShardingPropertiesConstant.MAX_CONNECTIONS_SIZE_PER_QUERY), shardingProperties. getValue(ShardingPropertiesConstant.CHECK_TABLE_METADATA_ENABLED)); }
2.加载TableMetaDataInitializer#load
public TableMetaDataInitializer(final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine executeEngine,
final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) {
//数据源分片元数据
this.shardingDataSourceMetaData = shardingDataSourceMetaData;
//数据源连接管理器
this.connectionManager = connectionManager;
//表元数据加载器
tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData);
}
/**
* Load table meta data.
*
* @param logicTableName logic table name
* @param shardingRule sharding rule
* @return table meta data
*/
@SneakyThrows
public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) {
return tableMetaDataLoader.load(logicTableName, shardingRule);
}
/**
* Load all table meta data.
*
* @param shardingRule sharding rule
* @return all table meta data
*/
@SneakyThrows
public Map load(final ShardingRule shardingRule) {
Map result = new HashMap<>();
//加载分片表
result.putAll(loadShardingTables(shardingRule));
//加载未分片表
result.putAll(loadDefaultTables(shardingRule));
return result;
}
private Map loadShardingTables(final ShardingRule shardingRule) throws SQLException {
Map result = new HashMap<>(shardingRule.getTableRules().size(), 1);
for (TableRule each : shardingRule.getTableRules()) {
//加载逻辑表对应真实表的元数据
//逻辑表:表元数据
result.put(each.getLogicTable(), tableMetaDataLoader.load(each.getLogicTable(), shardingRule));
}
return result;
}
private Map loadDefaultTables(final ShardingRule shardingRule) throws SQLException {
Map result = new HashMap<>(shardingRule.getTableRules().size(), 1);
//查询默认数据源,没有则查找主库
Optional actualDefaultDataSourceName = shardingRule.findActualDefaultDataSourceName();
if (actualDefaultDataSourceName.isPresent()) {
//获取所有表元数据
//真实表:表元数据
for (String each : getAllTableNames(actualDefaultDataSourceName.get())) {
result.put(each, tableMetaDataLoader.load(each, shardingRule));
}
}
return result;
}
private Collection getAllTableNames(final String dataSourceName) throws SQLException {
Collection result = new LinkedHashSet<>();
DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();
try (Connection connection = connectionManager.getConnection(dataSourceName);
ResultSet resultSet = connection.getMetaData().getTables(catalog, getCurrentSchemaName(connection), null, new String[]{"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (!tableName.contains("$") && !tableName.contains("/")) {
result.add(tableName);
}
}
}
return result;
}
private String getCurrentSchemaName(final Connection connection) throws SQLException {
try {
return connection.getSchema();
} catch (final AbstractMethodError | SQLFeatureNotSupportedException ignore) {
return null;
}
} 3.加载表元数据TableMetaDataLoader#load
/**
* Load table meta data.
*
* @param logicTableName logic table name
* @param shardingRule sharding rule
* @return table meta data
* @throws SQLException SQL exception
*/
public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) throws SQLException {
//获取表元数据
List actualTableMetaDataList = load(getDataNodeGroups(logicTableName, shardingRule), shardingRule.getShardingDataSourceNames());
//检查actualTableMetaDataList的元数据
checkUniformed(logicTableName, actualTableMetaDataList);
return actualTableMetaDataList.iterator().next();
}
private List load(final Map> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException {
//将封装好的数据节点组提交给执行引擎执行
return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback() {
@Override
public Collection execute(final Collection dataNodes, final boolean isTrunkThread, final Map shardingExecuteDataMap) throws SQLException {
String dataSourceName = dataNodes.iterator().next().getDataSourceName();
DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();
return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes);
}
});
}
private Collection load(final String dataSourceName, final String catalog, final Collection dataNodes) throws SQLException {
Collection result = new LinkedList<>();
try (Connection connection = connectionManager.getConnection(dataSourceName)) {
for (DataNode each : dataNodes) {
//获取表元数据
result.add(createTableMetaData(connection, catalog, each.getTableName()));
}
}
return result;
}
private Map> getDataNodeGroups(final String logicTableName, final ShardingRule shardingRule) {
//根据逻辑表获取对应的数据源:真实表数据节点
//比如:
//ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]
//ds_1 -> [ds_1.t_order_0, ds_1.t_order_1]
Map> result = shardingRule.getTableRule(logicTableName).getDataNodeGroups();
//默认false,设置为true会处理所有数据节点真实表
if (isCheckingMetaData) {
return result;
}
//返回一个数据节点即可
String firstKey = result.keySet().iterator().next();
return Collections.singletonMap(firstKey, Collections.singletonList(result.get(firstKey).get(0)));
}
/**
* 将数据节点组封装成分片执行组
*
* @param dataNodeGroups 数据节点组
*
* ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]
*
* @return
*/
private Collection> getDataNodeGroups(final Map> dataNodeGroups) {
Collection> result = new LinkedList<>();
//遍历对应数据源下的数据节点
for (Entry> entry : dataNodeGroups.entrySet()) {
//封装分片执行组ShardingExecuteGroup
result.addAll(getDataNodeGroups(entry.getValue()));
}
return result;
}
private Collection> getDataNodeGroups(final List dataNodes) {
Collection> result = new LinkedList<>();
//maxConnectionsSizePerQuery最大查询连接数默认为1
//将dataNodes换分Math.max份
for (List each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) {
result.add(new ShardingExecuteGroup<>(each));
}
return result;
}
private TableMetaData createTableMetaData(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
//判断表是否存在
if (isTableExist(connection, catalog, actualTableName)) {
//封装表元数据
return new TableMetaData(getColumnMetaDataList(connection, catalog, actualTableName), getLogicIndexes(connection, catalog, actualTableName));
}
return new TableMetaData(Collections.emptyList(), Collections.emptySet());
}
private boolean isTableExist(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
try (ResultSet resultSet = connection.getMetaData().getTables(catalog, null, actualTableName, null)) {
return resultSet.next();
}
}
/**
* 获取表字段元数据
*
* @param connection 连接
* @param catalog schema
* @param actualTableName 真实表
* @return
* @throws SQLException
*/
private List getColumnMetaDataList(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
List result = new LinkedList<>();
Collection primaryKeys = getPrimaryKeys(connection, catalog, actualTableName);
try (ResultSet resultSet = connection.getMetaData().getColumns(catalog, null, actualTableName, "%")) {
while (resultSet.next()) {
String columnName = resultSet.getString("COLUMN_NAME");
String columnType = resultSet.getString("TYPE_NAME");
result.add(new ColumnMetaData(columnName, columnType, primaryKeys.contains(columnName)));
}
}
return result;
}
/**
* 获取表主键
*/
private Collection getPrimaryKeys(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
Collection result = new HashSet<>();
try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(catalog, null, actualTableName)) {
while (resultSet.next()) {
result.add(resultSet.getString("COLUMN_NAME"));
}
}
return result;
}
/**
* 获取表索引
*/
private Collection getLogicIndexes(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
Collection result = new HashSet<>();
try (ResultSet resultSet = connection.getMetaData().getIndexInfo(catalog, catalog, actualTableName, false, false)) {
while (resultSet.next()) {
Optional logicIndex = getLogicIndex(resultSet.getString("INDEX_NAME"), actualTableName);
if (logicIndex.isPresent()) {
result.add(logicIndex.get());
}
}
}
return result;
}
private Optional getLogicIndex(final String actualIndexName, final String actualTableName) {
//索引要以`_tableName`命名,比如:
//idx_t_order
String indexNameSuffix = "_" + actualTableName;
if (actualIndexName.contains(indexNameSuffix)) {
return Optional.of(actualIndexName.replace(indexNameSuffix, ""));
}
return Optional.absent();
} 4.执行ShardingExecuteEngine#groupExecute
/** * Execute for group. * * @param inputGroups input groups * @param callback sharding execute callback * @param type of input value * @paramtype of return value * @return execute result * @throws SQLException throw if execute failure */ public List groupExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback callback) throws SQLException { return groupExecute(inputGroups, null, callback, false); } /** * Execute for group. * * @param inputGroups input groups * @param firstCallback first sharding execute callback * @param callback sharding execute callback * @param serial whether using multi thread execute or not * @param type of input value * @param type of return value * @return execute result * @throws SQLException throw if execute failure */ public List groupExecute( final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback, final boolean serial) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } //serial: 串行 //parallel: 并行 return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback); } private List serialExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator > inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup firstInputs = inputGroupsIterator.next(); //单独执行第一个组 //当firstCallback不为空时使用firstCallback,否则使用callback List result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback)); //遍历执行 for (ShardingExecuteGroup each : Lists.newArrayList(inputGroupsIterator)) { result.addAll(syncGroupExecute(each, callback)); } return result; } private List parallelExecute(final Collection > inputGroups, final ShardingGroupExecuteCallback firstCallback, final ShardingGroupExecuteCallback callback) throws SQLException { Iterator > inputGroupsIterator = inputGroups.iterator(); //获取第一个组 ShardingExecuteGroup firstInputs = inputGroupsIterator.next(); //将剩余组提交到线程池中执行 Collection >> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); //执行第一个组,合并同步执行、异步执行结果 return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } /** * 异步执行 */ private Collection >> asyncGroupExecute(final List > inputGroups, final ShardingGroupExecuteCallback callback) { Collection >> result = new LinkedList<>(); for (ShardingExecuteGroup each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; } private ListenableFuture > asyncGroupExecute(final ShardingExecuteGroup inputGroup, final ShardingGroupExecuteCallback callback) { final Map dataMap = ShardingExecuteDataMap.getDataMap(); //提交到线程池 return executorService.submit(new Callable >() { @Override public Collection call() throws SQLException { return callback.execute(inputGroup.getInputs(), false, dataMap); } }); } /** * 同步执行 */ private Collection syncGroupExecute(final ShardingExecuteGroup executeGroup, final ShardingGroupExecuteCallback callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap()); } private List getGroupResults(final Collection firstResults, final Collection >> restFutures) throws SQLException { List result = new LinkedList<>(firstResults); for (ListenableFuture > each : restFutures) { try { result.addAll(each.get()); } catch (final InterruptedException | ExecutionException ex) { return throwException(ex); } } return result; }
上述就是小编为大家分享的ShardingContent的功能有哪些了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
新闻标题:ShardingContent的功能有哪些
文章URL:http://www.jxjierui.cn/article/iijpdh.html


咨询
建站咨询
