Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public final class DataNode {
public DataNode(final String dataNode) {
validateDataNodeFormat(dataNode);
List<String> segments = Splitter.on(DELIMITER).splitToList(dataNode);
boolean isIncludeInstance = 3 == segments.size();
dataSourceName = isIncludeInstance ? segments.get(0) + DELIMITER + segments.get(1) : segments.get(0);
schemaName = null;
tableName = segments.get(isIncludeInstance ? 2 : 1);
boolean isIncludeSchema = 3 == segments.size();
dataSourceName = segments.get(0);
schemaName = isIncludeSchema ? segments.get(1) : null;
tableName = segments.get(isIncludeSchema ? 2 : 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,24 @@ public static ShardingSphereDatabase create(final String name, final DatabaseTyp
Collection<ShardingSphereRule> rules = DatabaseRulesBuilder.build(name, protocolType, databaseConfig, computeNodeInstanceContext, resourceMetaData);
return new ShardingSphereDatabase(name, protocolType, resourceMetaData, new RuleMetaData(rules), schemas);
}

/**
* Create database without system schema.
*
* @param name database name
* @param protocolType database protocol type
* @param databaseConfig database configuration
* @param props configuration properties
* @param computeNodeInstanceContext compute node instance context
* @return created database
* @throws SQLException SQL exception
*/
public static ShardingSphereDatabase createWithoutSystemSchema(final String name, final DatabaseType protocolType, final DatabaseConfiguration databaseConfig,
final ConfigurationProperties props, final ComputeNodeInstanceContext computeNodeInstanceContext) throws SQLException {
ResourceMetaData resourceMetaData = new ResourceMetaData(databaseConfig.getDataSources(), databaseConfig.getStorageUnits());
Collection<ShardingSphereRule> databaseRules = DatabaseRulesBuilder.build(name, protocolType, databaseConfig, computeNodeInstanceContext, resourceMetaData);
Map<String, ShardingSphereSchema> schemas = new ConcurrentHashMap<>(GenericSchemaBuilder.build(protocolType,
new GenericSchemaBuilderMaterial(resourceMetaData.getStorageUnits(), databaseRules, props, new DatabaseTypeRegistry(protocolType).getDefaultSchemaName(name))));
return new ShardingSphereDatabase(name, protocolType, resourceMetaData, new RuleMetaData(databaseRules), schemas.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void assertHashCode() {
void assertToString() {
assertThat(new DataNode("ds_0.tbl_0").toString(), is("DataNode(dataSourceName=ds_0, schemaName=null, tableName=tbl_0)"));
assertThat(new DataNode("ds", "schema", "tbl").toString(), is("DataNode(dataSourceName=ds, schemaName=schema, tableName=tbl)"));
assertThat(new DataNode("ds_0.db_0.tbl_0").toString(), is("DataNode(dataSourceName=ds_0.db_0, schemaName=null, tableName=tbl_0)"));
assertThat(new DataNode("ds_0.schema_0.tbl_0").toString(), is("DataNode(dataSourceName=ds_0, schemaName=schema_0, tableName=tbl_0)"));
}

@Test
Expand All @@ -110,9 +110,10 @@ void assertEmptyTableDataNode() {

@Test
void assertNewValidDataNodeIncludeInstance() {
DataNode dataNode = new DataNode("ds_0.db_0.tbl_0");
assertThat(dataNode.getDataSourceName(), is("ds_0.db_0"));
DataNode dataNode = new DataNode("ds_0.schema_0.tbl_0");
assertThat(dataNode.getDataSourceName(), is("ds_0"));
assertThat(dataNode.getTableName(), is("tbl_0"));
assertThat(dataNode.getSchemaName(), is("schema_0"));
}

@Test
Expand Down Expand Up @@ -174,7 +175,7 @@ void assertNewDataNodeWithNumbers() {
@Test
void assertNewDataNodeWithMixedFormat() {
DataNode dataNode = new DataNode("prod-db-01.schema_01.users");
assertThat(dataNode.getDataSourceName(), is("prod-db-01.schema_01"));
assertThat(dataNode.getDataSourceName(), is("prod-db-01"));
assertThat(dataNode.getTableName(), is("users"));
}

Expand All @@ -197,14 +198,14 @@ void assertNewDataNodeWithSingleCharacterNames() {
@Test
void assertNewDataNodeWithInstanceFormat() {
DataNode dataNode = new DataNode("instance1.database1.table1");
assertThat(dataNode.getDataSourceName(), is("instance1.database1"));
assertThat(dataNode.getDataSourceName(), is("instance1"));
assertThat(dataNode.getTableName(), is("table1"));
}

@Test
void assertNewDataNodeWithComplexInstanceFormat() {
DataNode dataNode = new DataNode("prod-cluster-01.mysql-master.users");
assertThat(dataNode.getDataSourceName(), is("prod-cluster-01.mysql-master"));
assertThat(dataNode.getDataSourceName(), is("prod-cluster-01"));
assertThat(dataNode.getTableName(), is("users"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@

package org.apache.shardingsphere.mode.metadata.factory.init.type;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.database.connector.core.metadata.database.system.SystemDatabase;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import org.apache.shardingsphere.infra.config.database.impl.DataSourceGeneratedDatabaseConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.database.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.datasource.pool.config.DataSourceConfiguration;
import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabaseFactory;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabasesFactory;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilderParameter;
Expand All @@ -36,17 +42,22 @@
import org.apache.shardingsphere.mode.metadata.persist.config.global.PropertiesPersistService;
import org.apache.shardingsphere.mode.metadata.persist.version.VersionPersistService;
import org.apache.shardingsphere.mode.spi.repository.PersistRepository;
import org.apache.shardingsphere.single.config.SingleRuleConfiguration;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Register center meta data contexts init factory.
*/
@Slf4j
public final class RegisterCenterMetaDataContextsInitFactory extends MetaDataContextsInitFactory {

private final MetaDataPersistFacade persistFacade;
Expand All @@ -68,7 +79,8 @@ public MetaDataContexts create(final ContextManagerBuilderParameter param) throw
// TODO load global data sources from persist service
Map<String, DataSource> globalDataSources = param.getGlobalDataSources();
ConfigurationProperties props = new ConfigurationProperties(persistFacade.getPropsService().load());
Map<String, Collection<ShardingSphereSchema>> schemas = loadSchemas(effectiveDatabaseConfigs.keySet());
DatabaseType protocolType = DatabaseTypeEngine.getProtocolType(effectiveDatabaseConfigs, props);
Map<String, Collection<ShardingSphereSchema>> schemas = loadSchemas(effectiveDatabaseConfigs, protocolType, props);
Collection<ShardingSphereDatabase> databases;
if (persistSchemasEnabled) {
// TODO merge schemas with local
Expand Down Expand Up @@ -109,14 +121,58 @@ private void closeGeneratedDataSources(final String databaseName, final Map<Stri
}
}

private Map<String, Collection<ShardingSphereSchema>> loadSchemas(final Collection<String> databaseNames) {
private Map<String, Collection<ShardingSphereSchema>> loadSchemas(final Map<String, DatabaseConfiguration> effectiveDatabaseConfigs,
final DatabaseType protocolType, final ConfigurationProperties props) {
Collection<String> sysDatabaseNames = new SystemDatabase(protocolType).getSystemDatabases();
Collection<String> databaseNames = effectiveDatabaseConfigs.keySet();
Map<String, Collection<ShardingSphereSchema>> result = new HashMap<>(databaseNames.size());
for (String dbName : databaseNames) {
for (Map.Entry<String, DatabaseConfiguration> entry : effectiveDatabaseConfigs.entrySet()) {
String dbName = entry.getKey();
Collection<ShardingSphereSchema> schemas = persistFacade.getDatabaseMetaDataFacade().getSchema().load(dbName);
if (null != schemas) {
result.put(dbName, schemas);
if (sysDatabaseNames.contains(dbName)) {
if (null != schemas) {
result.put(dbName, schemas);
}
} else {
Collection<String> missedSingleTables = getMissedSingleTables(entry.getValue(), schemas, dbName);
if (missedSingleTables.isEmpty()) {
result.put(dbName, schemas);
} else {
log.info("Repository missed single tables: {} of database: {}, start to reload", missedSingleTables, dbName);
DataSourceGeneratedDatabaseConfiguration databaseConfig = new DataSourceGeneratedDatabaseConfiguration(persistFacade.loadDataSourceConfigurations(dbName),
Collections.singleton(new SingleRuleConfiguration(missedSingleTables, null)));
try {
ShardingSphereDatabase database = ShardingSphereDatabaseFactory.createWithoutSystemSchema(dbName, protocolType, databaseConfig, props, instanceContext);
database.getAllSchemas().forEach(schema -> persistFacade.getDatabaseMetaDataFacade().getTable().persist(dbName, schema.getName(), schema.getAllTables()));
result.put(dbName, persistFacade.getDatabaseMetaDataFacade().getSchema().load(dbName));
} catch (final SQLException ex) {
result.put(dbName, schemas);
log.info("Reload reposotiry missed single tables: {} of database : {} failed", missedSingleTables, dbName, ex);
}
}
}
}
return result;
}

private Collection<String> getMissedSingleTables(final DatabaseConfiguration databaseConfiguration, final Collection<ShardingSphereSchema> schemas,
final String dbName) {
Collection<String> result = new LinkedList<>();
Optional<SingleRuleConfiguration> singleRuleConfig = databaseConfiguration.getRuleConfigurations().stream().filter(each -> each instanceof SingleRuleConfiguration)
.map(each -> (SingleRuleConfiguration) each).findAny();
singleRuleConfig.ifPresent(singleRuleConfiguration -> singleRuleConfiguration.getTables().forEach(table -> {
DataNode dataNode = new DataNode(table);
String logicTableName = new DataNode(table).getTableName();
String schemaName = null != dataNode.getSchemaName() ? dataNode.getSchemaName() : dbName;
Optional<ShardingSphereSchema> schema = findSchema(schemas, schemaName);
if (!schema.isPresent() || !schema.get().containsTable(logicTableName)) {
result.add(table);
}
}));
return result;
}

private Optional<ShardingSphereSchema> findSchema(final Collection<ShardingSphereSchema> schemas, final String schemaName) {
return schemas.stream().filter(each -> each.getName().equals(schemaName)).findFirst();
}
}