Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EXPB-2299 Speed up table lookup for huge schemas #4

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
set -e
VERSION=1.37.0

# Java 21 doesn't suppport Java 8
if [ -d /Library/Java/JavaVirtualMachines/sapmachine-jdk-17.0.11.jdk/Contents/Home ]; then
export JAVA_HOME=/Library/Java/JavaVirtualMachines/sapmachine-jdk-17.0.11.jdk/Contents/Home
export PATH=$JAVA_HOME/bin:$PATH
fi

./gradlew clean publishToMavenLocal

for module in core linq4j;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.schema.LikePattern;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
Expand Down Expand Up @@ -324,7 +325,7 @@ private void addMaterializedViews() {
query = buf.toString();

// Add the view for this query
String viewName = "$" + getTableNames().size();
String viewName = "$" + tables().getNames(LikePattern.any()).size();
SchemaPlus schema = parentSchema.getSubSchema(name);
if (schema == null) {
throw new IllegalStateException("Cannot find schema " + name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.LikePattern;
import org.apache.calcite.schema.Lookup;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
Expand Down Expand Up @@ -68,8 +70,9 @@ public CloneSchema(SchemaPlus sourceSchema) {

@Override protected Map<String, Table> getTableMap() {
final Map<String, Table> map = new LinkedHashMap<>();
for (String name : sourceSchema.getTableNames()) {
final Table table = sourceSchema.getTable(name);
final Lookup<Table> tables = sourceSchema.tables();
for (String name : tables.getNames(LikePattern.any())) {
final Table table = tables.get(name);
if (table instanceof QueryableTable) {
final QueryableTable sourceTable = (QueryableTable) table;
map.put(name,
Expand Down
212 changes: 116 additions & 96 deletions core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
package org.apache.calcite.adapter.jdbc;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import com.google.common.util.concurrent.UncheckedExecutionException;

import org.apache.calcite.avatica.AvaticaUtils;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.SqlType;
Expand All @@ -26,14 +32,9 @@
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.Wrapper;
import org.apache.calcite.schema.*;
import org.apache.calcite.schema.impl.CachingLookup;
import org.apache.calcite.schema.impl.IgnoreCaseLookup;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
Expand Down Expand Up @@ -63,8 +64,17 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.sql.DataSource;

import static java.util.Objects.requireNonNull;
Expand All @@ -78,14 +88,29 @@
*/
public class JdbcSchema implements Schema, Wrapper {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSchema.class);
public class TableNotFoundException extends RuntimeException {

}
final DataSource dataSource;
final @Nullable String catalog;
final @Nullable String schema;
public final SqlDialect dialect;
final JdbcConvention convention;
private @Nullable ImmutableMap<String, JdbcTable> tableMap;
private final boolean snapshot;
private final Lookup<Table> tables = new CachingLookup<Table>(new IgnoreCaseLookup<Table>() {
@Override
public @Nullable Table get(String name) {
try (Stream<MetaImpl.MetaTable> s = getMetaTableStream(name)) {
return s.findFirst().map(it -> jdbcTableMapper(it) ).orElse(null);
}
}

@Override
public Set<String> getNames(LikePattern pattern) {
try( Stream<MetaImpl.MetaTable> s = getMetaTableStream(pattern.pattern)) {
return s.map(it -> it.tableName).collect(Collectors.toSet());
}
}
});

@Experimental
public static final ThreadLocal<@Nullable Foo> THREAD_METADATA = new ThreadLocal<>();
Expand All @@ -104,19 +129,11 @@ public class JdbcSchema implements Schema, Wrapper {
*/
public JdbcSchema(DataSource dataSource, SqlDialect dialect,
JdbcConvention convention, @Nullable String catalog, @Nullable String schema) {
this(dataSource, dialect, convention, catalog, schema, null);
}

private JdbcSchema(DataSource dataSource, SqlDialect dialect,
JdbcConvention convention, @Nullable String catalog, @Nullable String schema,
@Nullable ImmutableMap<String, JdbcTable> tableMap) {
this.dataSource = requireNonNull(dataSource, "dataSource");
this.dialect = requireNonNull(dialect, "dialect");
this.convention = convention;
this.catalog = catalog;
this.schema = schema;
this.tableMap = tableMap;
this.snapshot = tableMap != null;
}

public static JdbcSchema create(
Expand Down Expand Up @@ -217,13 +234,16 @@ public static DataSource dataSource(String url, @Nullable String driverClassName
password);
}

@Override public Lookup<Table> tables() {
return tables;
}

@Override public boolean isMutable() {
return false;
}

@Override public Schema snapshot(SchemaVersion version) {
return new JdbcSchema(dataSource, dialect, convention, catalog, schema,
tableMap);
return this;
}

// Used by generated code.
Expand All @@ -249,49 +269,56 @@ protected Multimap<String, Function> getFunctions() {
return getFunctions().keySet();
}

private ImmutableMap<String, JdbcTable> computeTables() {
private Stream<MetaImpl.MetaTable> getMetaTableStream(String tableNamePattern) {
final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema();
final Stream<MetaImpl.MetaTable> tableDefs;
Connection connection = null;
ResultSet resultSet = null;
try {
connection = dataSource.getConnection();
final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
final String catalog = catalogSchema.left;
final String schema = catalogSchema.right;
final Iterable<MetaImpl.MetaTable> tableDefs;
Foo threadMetadata = THREAD_METADATA.get();
if (threadMetadata != null) {
tableDefs = threadMetadata.apply(catalog, schema);
} else {
final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
final DatabaseMetaData metaData = connection.getMetaData();
resultSet = metaData.getTables(catalog, schema, null, null);
while (resultSet.next()) {
final String catalogName = intern(resultSet.getString(1));
final String schemaName = intern(resultSet.getString(2));
final String tableName = intern(resultSet.getString(3));
final String tableTypeName = intern(resultSet.getString(4));
tableDefList.add(
new MetaImpl.MetaTable(catalogName, schemaName, tableName,
tableTypeName));
}
tableDefs = tableDefList;
}

final ImmutableMap.Builder<String, JdbcTable> builder =
ImmutableMap.builder();
for (MetaImpl.MetaTable tableDef : tableDefs) {
final JdbcTable table =
new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
tableDef.tableName, getTableType(tableDef.tableType));
builder.put(tableDef.tableName, table);
}
return builder.build();
final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
final DatabaseMetaData metaData = connection.getMetaData();
resultSet = metaData.getTables(catalogSchema.left, catalogSchema.right, tableNamePattern, null);
tableDefs = asStream(connection, resultSet)
.map(JdbcSchema::metaDataMapper);
} catch (SQLException e) {
close(connection, null, resultSet);
throw new RuntimeException(
"Exception while reading tables", e);
} finally {
close(connection, null, resultSet);
}
return tableDefs;
}

private static Stream<ResultSet> asStream(Connection connection, ResultSet resultSet) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<ResultSet>(
Long.MAX_VALUE, Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super ResultSet> action) {
try {
if(!resultSet.next()) return false;
action.accept(resultSet);
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false).onClose(() -> {
close(connection, null, resultSet);
}) ;
}

private JdbcTable jdbcTableMapper(MetaImpl.MetaTable tableDef) {
return new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
tableDef.tableName, getTableType(tableDef.tableType));
}

private static MetaImpl.MetaTable metaDataMapper(ResultSet resultSet){
try {
return new MetaImpl.MetaTable(intern(resultSet.getString(1)), intern(resultSet.getString(2)), intern(resultSet.getString(3)),
intern(resultSet.getString(4)));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

private static String intern(@Nullable String string) {
Expand Down Expand Up @@ -329,49 +356,44 @@ private static List<Integer> version(DatabaseMetaData metaData) throws SQLExcept
}

/** Returns a pair of (catalog, schema) for the current connection. */
private Pair<@Nullable String, @Nullable String> getCatalogSchema(Connection connection)
throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
final List<Integer> version41 = ImmutableList.of(4, 1); // JDBC 4.1
String catalog = this.catalog;
String schema = this.schema;
final boolean jdbc41OrAbove =
VERSION_ORDERING.compare(version(metaData), version41) >= 0;
if (catalog == null && jdbc41OrAbove) {
// From JDBC 4.1, catalog and schema can be retrieved from the connection
// object, hence try to get it from there if it was not specified by user
catalog = connection.getCatalog();
}
if (schema == null && jdbc41OrAbove) {
schema = connection.getSchema();
if ("".equals(schema)) {
schema = null; // PostgreSQL returns useless "" sometimes
private Pair<@Nullable String, @Nullable String> getCatalogSchema() {
try(Connection connection = dataSource.getConnection()) {
final DatabaseMetaData metaData = connection.getMetaData();
final List<Integer> version41 = ImmutableList.of(4, 1); // JDBC 4.1
String catalog = this.catalog;
String schema = this.schema;
final boolean jdbc41OrAbove =
VERSION_ORDERING.compare(version(metaData), version41) >= 0;
if (catalog == null && jdbc41OrAbove) {
// From JDBC 4.1, catalog and schema can be retrieved from the connection
// object, hence try to get it from there if it was not specified by user
catalog = connection.getCatalog();
}
}
if ((catalog == null || schema == null)
&& metaData.getDatabaseProductName().equals("PostgreSQL")) {
final String sql = "select current_database(), current_schema()";
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
if (resultSet.next()) {
catalog = resultSet.getString(1);
schema = resultSet.getString(2);
if (schema == null && jdbc41OrAbove) {
schema = connection.getSchema();
if ("".equals(schema)) {
schema = null; // PostgreSQL returns useless "" sometimes
}
}
if ((catalog == null || schema == null)
&& metaData.getDatabaseProductName().equals("PostgreSQL")) {
final String sql = "select current_database(), current_schema()";
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
if (resultSet.next()) {
catalog = resultSet.getString(1);
schema = resultSet.getString(2);
}
}
}
return Pair.of(catalog, schema);
} catch (SQLException e) {
throw new RuntimeException(e);
}
return Pair.of(catalog, schema);
}

@Override public @Nullable Table getTable(String name) {
return getTableMap(false).get(name);
}

private synchronized ImmutableMap<String, JdbcTable> getTableMap(
boolean force) {
if (force || tableMap == null) {
tableMap = computeTables();
}
return tableMap;
@Deprecated @Override public @Nullable Table getTable(String name) {
return tables.get(name);
}

RelProtoDataType getRelDataType(String catalogName, String schemaName,
Expand Down Expand Up @@ -494,10 +516,8 @@ private static RelDataType parseTypeString(RelDataTypeFactory typeFactory,
}
}

@Override public Set<String> getTableNames() {
// This method is called during a cache refresh. We can take it as a signal
// that we need to re-build our own cache.
return getTableMap(!snapshot).keySet();
@Deprecated @Override public Set<String> getTableNames() {
return tables.getNames(LikePattern.any());
}

protected Map<String, RelProtoDataType> getTypes() {
Expand Down
Loading
Loading