Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: Extract datasource proxy metadata #6722

Open
wants to merge 9 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.seata.rm.datasource;

import java.sql.Connection;
import java.util.Properties;

import javax.sql.DataSource;

Expand Down Expand Up @@ -61,6 +62,7 @@ public void testNotSupportDb() {
dataSource.setDriver(mockDriver);
dataSource.setUsername(username);
dataSource.setPassword("password");
dataSource.setConnectProperties(new Properties());

Throwable throwable = Assertions.assertThrows(IllegalStateException.class, () -> new DataSourceProxy(dataSource));
assertThat(throwable).hasMessageContaining("AT mode don't support the dbtype");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Properties;
import java.util.logging.Logger;

import javax.sql.DataSource;

public class MockDataSource implements DataSource {
@Override
public Connection getConnection() throws SQLException {
return new MockConnection(new MockDriver(), "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true", null);
return new MockConnection(new MockDriver(), "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true", new Properties());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,22 @@
*/
package org.apache.seata.rm.datasource;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.Constants;
import org.apache.seata.common.loader.EnhancedServiceNotFoundException;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.Resource;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import org.apache.seata.rm.datasource.undo.UndoLogManager;
import org.apache.seata.rm.datasource.undo.UndoLogManagerFactory;
import org.apache.seata.rm.datasource.util.JdbcUtils;
import org.apache.seata.sqlparser.util.JdbcConstants;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_TABLE;

/**
* The type Data source proxy.
*
*/
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

Expand All @@ -64,18 +49,7 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource

private String userName;

private String kernelVersion;

private String productVersion;

private final Map<String, String> variables = new HashMap<>();

/**
* POLARDB-X 1.X -> TDDL
* POLARDB-X 2.X & MySQL 5.6 -> PXC
* POLARDB-X 2.X & MySQL 5.7 -> AliSQL-X
*/
private static final String[] POLARDB_X_PRODUCT_KEYWORD = {"TDDL","AliSQL-X","PXC"};
private SeataDataSourceProxyMetadata dataSourceProxyMetadata;

/**
* Instantiates a new Data source proxy.
Expand Down Expand Up @@ -103,21 +77,15 @@ public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {

private void init(DataSource dataSource, String resourceGroupId) {
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
} else if (JdbcConstants.MYSQL.equals(dbType)) {
validMySQLVersion(connection);
checkDerivativeProduct();
}
checkUndoLogTableExist(connection);

try {
dataSourceProxyMetadata = SeataDataSourceProxyMetadataFactory.create(dataSource);
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
throw new IllegalStateException("can not init datasource metadata", e);
}
if (JdbcConstants.SQLSERVER.equals(dbType)) {
jdbcUrl = dataSourceProxyMetadata.getJdbcUrl();
dbType = dataSourceProxyMetadata.getDbType();
userName = dataSourceProxyMetadata.getUserName();
if (JdbcConstants.SQLSERVER.equals(dataSourceProxyMetadata.getDbType())) {
LOGGER.info("SQLServer support in AT mode is currently an experimental function, " +
"if you have any problems in use, please feedback to us");
}
Expand All @@ -128,59 +96,6 @@ private void init(DataSource dataSource, String resourceGroupId) {
RootContext.setDefaultBranchType(this.getBranchType());
}

/**
* Define derivative product version for MySQL Kernel
*
*/
private void checkDerivativeProduct() {
if (!JdbcConstants.MYSQL.equals(dbType)) {
return;
}
// check for polardb-x
if (isPolardbXProduct()) {
dbType = JdbcConstants.POLARDBX;
return;
}
// check for other products base on mysql kernel
}

private boolean isPolardbXProduct() {
if (StringUtils.isBlank(productVersion)) {
return false;
}
for (String keyword : POLARDB_X_PRODUCT_KEYWORD) {
if (productVersion.contains(keyword)) {
return true;
}
}
return false;
}

/**
* check existence of undolog table
*
* if the table not exist fast fail, or else keep silence
*
* @param conn db connection
*/
private void checkUndoLogTableExist(Connection conn) {
UndoLogManager undoLogManager;
try {
undoLogManager = UndoLogManagerFactory.getUndoLogManager(dbType);
} catch (EnhancedServiceNotFoundException e) {
String errMsg = String.format("AT mode don't support the dbtype: %s", dbType);
throw new IllegalStateException(errMsg, e);
}

boolean undoLogTableExist = undoLogManager.hasUndoLogTable(conn);
if (!undoLogTableExist) {
String undoLogTableName = ConfigurationFactory.getInstance()
.getConfig(ConfigurationKeys.TRANSACTION_UNDO_LOG_TABLE, DEFAULT_TRANSACTION_UNDO_LOG_TABLE);
String errMsg = String.format("in AT mode, %s table not exist", undoLogTableName);
throw new IllegalStateException(errMsg);
}
}

/**
* publish tableMeta refresh event
*/
Expand All @@ -207,6 +122,14 @@ public String getDbType() {
return dbType;
}

/**
* Get datasource proxy metadata
* @return seata datasource proxy metadata
*/
public SeataDataSourceProxyMetadata getDataSourceProxyMetadata() {
return dataSourceProxyMetadata;
}

@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
Expand Down Expand Up @@ -365,7 +288,6 @@ private void initPGResourceId() {
* The general form of the connection URL for SqlServer is
* jdbc:sqlserver://[serverName[\instanceName][:portNumber]][;property=value[;property=value]]
* required connection properties: [INSTANCENAME], [databaseName,database]
*
*/
private void initSqlServerResourceId() {
if (jdbcUrl.contains(";")) {
Expand Down Expand Up @@ -399,42 +321,4 @@ public BranchType getBranchType() {
return BranchType.AT;
}

public String getKernelVersion() {
return kernelVersion;
}

public String getVariableValue(String name) {
return variables.get(name);
}

private void validMySQLVersion(Connection connection) {
if (!JdbcConstants.MYSQL.equals(dbType)) {
return;
}
try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW VARIABLES");
ResultSet rs = preparedStatement.executeQuery()) {
while (rs.next()) {
String name = rs.getString(1);
String value = rs.getString(2);
if (StringUtils.isNotBlank(name)) {
variables.put(name.toLowerCase(), value);
}
}
String version = variables.get("version");
if (StringUtils.isBlank(version)) {
return;
}
int dashIdx = version.indexOf('-');
// in mysql: 5.6.45, in polardb-x: 5.6.45-TDDL-xxx
if (dashIdx > 0) {
kernelVersion = version.substring(0, dashIdx);
productVersion = version.substring(dashIdx + 1);
} else {
kernelVersion = version;
productVersion = version;
}
} catch (Exception e) {
LOGGER.error("check mysql version fail error: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.rm.datasource;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Map;

/**
* The type Data source proxy metadata.
*/
public interface SeataDataSourceProxyMetadata {

/**
* Init datasource metadata
*
* @param dataSource the datasource
* @throws SQLException sql exception
*/
void init(DataSource dataSource) throws SQLException;

/**
* Get variable value by name
*
* @param name the name
* @return value
*/
String getVariableValue(String name);

/**
* Get variables
*
* @return all variable
*/
Map<String, String> getVariables();

/**
* Get jdbc url
*
* @return jdbc url
*/
String getJdbcUrl();

/**
* Gets db type.
*
* @return the db type
*/
String getDbType();

/**
* Get database connection username
*
* @return username
*/
String getUserName();

/**
* Get kernel version
*
* @return kernel version
*/
String getKernelVersion();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.rm.datasource;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

import org.apache.seata.rm.datasource.metadata.DefaultDataSourceProxyMetadata;
import org.apache.seata.rm.datasource.metadata.MySQLDataSourceProxyMetadata;
import org.apache.seata.rm.datasource.util.JdbcUtils;
import org.apache.seata.sqlparser.util.JdbcConstants;

/**
* datasource proxy metadata factory
*/
public class SeataDataSourceProxyMetadataFactory {

public static SeataDataSourceProxyMetadata create(DataSource dataSource) throws SQLException {
SeataDataSourceProxyMetadata dataSourceProxyMetadata = null;
try (Connection connection = dataSource.getConnection()) {
String jdbcUrl = connection.getMetaData().getURL();
String dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.MYSQL.equals(dbType)
|| JdbcConstants.MARIADB.equals(dbType)
|| JdbcConstants.POLARDBX.equals(dbType)) {
dataSourceProxyMetadata = new MySQLDataSourceProxyMetadata();
} else {
dataSourceProxyMetadata = new DefaultDataSourceProxyMetadata();
}
}
dataSourceProxyMetadata.init(dataSource);
return dataSourceProxyMetadata;
}

}
Loading
Loading