Skip to content

Commit

Permalink
[Managed Iceberg] support BQMS catalog (apache#33511)
Browse files Browse the repository at this point in the history
* Add BQMS catalog

* trigger integration tests

* build fix

* use shaded jar

* shadowClosure

* use global timeout for tests

* define version in BeamModulePlugin

* address comments
  • Loading branch information
ahmedabu98 committed Jan 9, 2025
1 parent 48e18c4 commit e9cba68
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ class BeamModulePlugin implements Plugin<Project> {
def influxdb_version = "2.19"
def httpclient_version = "4.5.13"
def httpcore_version = "4.4.14"
def iceberg_bqms_catalog_version = "1.5.2-0.1.0"
def jackson_version = "2.15.4"
def jaxb_api_version = "2.3.3"
def jsr305_version = "3.0.2"
Expand Down Expand Up @@ -651,6 +652,10 @@ class BeamModulePlugin implements Plugin<Project> {

// Export Spark versions, so they are defined in a single place only
project.ext.spark3_version = spark3_version
// version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms)
// TODO: remove this and download the jar normally when the catalog gets
// open-sourced (https://github.com/apache/iceberg/pull/11039)
project.ext.iceberg_bqms_catalog_version = iceberg_bqms_catalog_version

// A map of maps containing common libraries used per language. To use:
// dependencies {
Expand Down
6 changes: 4 additions & 2 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ dependencies {
// **** IcebergIO runtime dependencies ****
runtimeOnly library.java.hadoop_auth
runtimeOnly library.java.hadoop_client
// Needed when using GCS as the warehouse location.
// For writing to GCS
runtimeOnly library.java.bigdataoss_gcs_connector
// Needed for HiveCatalog
// HiveCatalog
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
runtimeOnly project(path: ":sdks:java:io:iceberg:hive")
// BigQueryMetastoreCatalog (Java 11+)
runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")

runtimeOnly library.java.kafka_clients
runtimeOnly library.java.slf4j_jdk14
Expand Down
63 changes: 63 additions & 0 deletions sdks/java/io/iceberg/bqms/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins {
id 'org.apache.beam.module'
}

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms',
shadowClosure: {},
exportJavadoc: false,
publish: false, // it's an intermediate jar for io-expansion-service
validateShadowJar: false
)

def libDir = "$buildDir/libs"
def bqmsFileName = "iceberg-bqms-catalog-${iceberg_bqms_catalog_version}.jar"
task downloadBqmsJar(type: Copy) {
// TODO: remove this workaround and downlooad normally when the catalog gets open-sourced:
// (https://github.com/apache/iceberg/pull/11039)
def jarUrl = "https://storage.googleapis.com/spark-lib/bigquery/iceberg-bigquery-catalog-${iceberg_bqms_catalog_version}.jar"
def outputDir = file("$libDir")
outputDir.mkdirs()
def destFile = new File(outputDir, bqmsFileName)

if (!destFile.exists()) {
try {
ant.get(src: jarUrl, dest: destFile)
println "Successfully downloaded BQMS catalog jar: $destFile"
} catch (Exception e) {
println "Could not download $jarUrl: ${e.message}"
}
}
}

repositories {
flatDir {
dirs "$libDir"
}
}

compileJava.dependsOn downloadBqmsJar

dependencies {
implementation files("$libDir/$bqmsFileName")
}

description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery Metastore"
ext.summary = "A copy of the BQMS catalog."
37 changes: 29 additions & 8 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ def hadoopVersions = [

hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")}

def iceberg_version = "1.4.2"
def iceberg_version = "1.6.1"
def parquet_version = "1.12.0"
def orc_version = "1.9.2"
def hive_version = "3.1.3"

dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation library.java.avro
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
Expand All @@ -53,18 +55,37 @@ dependencies {
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
implementation library.java.hadoop_common
implementation library.java.jackson_core
implementation library.java.jackson_databind

testImplementation project(":sdks:java:managed")
testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
testImplementation library.java.bigdataoss_util_hadoop
testImplementation "org.apache.parquet:parquet-avro:$parquet_version"
testImplementation "org.apache.parquet:parquet-common:$parquet_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(":sdks:java:extensions:google-cloud-platform-core")
testImplementation library.java.junit

// Hive catalog test dependencies
testImplementation project(path: ":sdks:java:io:iceberg:hive")
testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
testImplementation ("org.apache.hive:hive-metastore:$hive_version")
testImplementation "org.assertj:assertj-core:3.11.1"
testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") {
exclude group: "org.apache.hive", module: "hive-exec"
exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
}

// BigQueryMetastore catalog dep
testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")

testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
Expand Down Expand Up @@ -108,7 +129,7 @@ hadoopVersions.each { kv ->
task integrationTest(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://managed-iceberg-integration-tests'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--project=${gcpProject}",
"--tempLocation=${gcpTempLocation}",
Expand All @@ -118,17 +139,17 @@ task integrationTest(type: Test) {
outputs.upToDateWhen { false }

include '**/*IT.class'
// BQ metastore catalog doesn't support java 8
if (project.findProperty('testJavaVersion') == '8' ||
JavaVersion.current().equals(JavaVersion.VERSION_1_8)) {
exclude '**/BigQueryMetastoreCatalogIT.class'
}

maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
}

tasks.register('catalogTests') {
dependsOn integrationTest
dependsOn ":sdks:java:io:iceberg:hive:integrationTest"
}

task loadTest(type: Test) {
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests/temp-lt'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.io.IOException;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_no_delete";
static final long SALT = System.nanoTime();

@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + SALT;
}

@Override
public Catalog createCatalog() {
return CatalogUtil.loadCatalog(
BQMS_CATALOG,
"bqms_" + catalogName,
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.build(),
new Configuration());
}

@Override
public void catalogCleanup() throws IOException {
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
catalog.dropTable(tableIdentifier);
}
}
}

@Override
public Map<String, Object> managedIcebergConfig(String tableId) {
return ImmutableMap.<String, Object>builder()
.put("table", tableId)
.put(
"catalog_properties",
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.put("catalog-impl", BQMS_CATALOG)
.build())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.iceberg.catalog.hiveutils.HiveMetastoreExtension;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;

/**
* Read and write tests using {@link HiveCatalog}.
*
* <p>Spins up a local Hive metastore to manage the Iceberg table. Warehouse path is set to a GCS
* bucket.
*/
public class HiveCatalogIT extends IcebergCatalogBaseIT {
private static HiveMetastoreExtension hiveMetastoreExtension;
private static final String TEST_DB = "test_db";

@Override
public String tableId() {
return String.format("%s.%s", TEST_DB, testName.getMethodName());
}

@Override
public void catalogSetup() throws Exception {
hiveMetastoreExtension = new HiveMetastoreExtension(warehouse);
String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap());
hiveMetastoreExtension.metastoreClient().createDatabase(db);
}

@Override
public Catalog createCatalog() {
return CatalogUtil.loadCatalog(
HiveCatalog.class.getName(),
"hive_" + catalogName,
ImmutableMap.of(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
String.valueOf(TimeUnit.SECONDS.toMillis(10))),
hiveMetastoreExtension.hiveConf());
}

@Override
public void catalogCleanup() throws Exception {
if (hiveMetastoreExtension != null) {
hiveMetastoreExtension.cleanup();
}
}

@Override
public Map<String, Object> managedIcebergConfig(String tableId) {
String metastoreUri = hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS);

Map<String, String> confProperties =
ImmutableMap.<String, String>builder()
.put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
.build();

return ImmutableMap.<String, Object>builder()
.put("table", tableId)
.put("name", "hive_" + catalogName)
.put("config_properties", confProperties)
.build();
}
}
Loading

0 comments on commit e9cba68

Please sign in to comment.