diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ef934efa94be..1e908c5c93ba 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -619,6 +619,7 @@ class BeamModulePlugin implements Plugin { def influxdb_version = "2.19" def httpclient_version = "4.5.13" def httpcore_version = "4.4.14" + def iceberg_bqms_catalog_version = "1.5.2-0.1.0" def jackson_version = "2.15.4" def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" @@ -651,6 +652,10 @@ class BeamModulePlugin implements Plugin { // Export Spark versions, so they are defined in a single place only project.ext.spark3_version = spark3_version + // version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms) + // TODO: remove this and download the jar normally when the catalog gets + // open-sourced (https://github.com/apache/iceberg/pull/11039) + project.ext.iceberg_bqms_catalog_version = iceberg_bqms_catalog_version // A map of maps containing common libraries used per language. To use: // dependencies { diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 38bee450e752..433000922b61 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -59,11 +59,13 @@ dependencies { // **** IcebergIO runtime dependencies **** runtimeOnly library.java.hadoop_auth runtimeOnly library.java.hadoop_client - // Needed when using GCS as the warehouse location. + // For writing to GCS runtimeOnly library.java.bigdataoss_gcs_connector - // Needed for HiveCatalog + // HiveCatalog runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2") runtimeOnly project(path: ":sdks:java:io:iceberg:hive") + // BigQueryMetastoreCatalog (Java 11+) + runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/bqms/build.gradle b/sdks/java/io/iceberg/bqms/build.gradle new file mode 100644 index 000000000000..e42aafc5f424 --- /dev/null +++ b/sdks/java/io/iceberg/bqms/build.gradle @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +plugins { + id 'org.apache.beam.module' +} + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms', + shadowClosure: {}, + exportJavadoc: false, + publish: false, // it's an intermediate jar for io-expansion-service + validateShadowJar: false +) + +def libDir = "$buildDir/libs" +def bqmsFileName = "iceberg-bqms-catalog-${iceberg_bqms_catalog_version}.jar" +task downloadBqmsJar(type: Copy) { + // TODO: remove this workaround and downlooad normally when the catalog gets open-sourced: + // (https://github.com/apache/iceberg/pull/11039) + def jarUrl = "https://storage.googleapis.com/spark-lib/bigquery/iceberg-bigquery-catalog-${iceberg_bqms_catalog_version}.jar" + def outputDir = file("$libDir") + outputDir.mkdirs() + def destFile = new File(outputDir, bqmsFileName) + + if (!destFile.exists()) { + try { + ant.get(src: jarUrl, dest: destFile) + println "Successfully downloaded BQMS catalog jar: $destFile" + } catch (Exception e) { + println "Could not download $jarUrl: ${e.message}" + } + } +} + +repositories { + flatDir { + dirs "$libDir" + } +} + +compileJava.dependsOn downloadBqmsJar + +dependencies { + implementation files("$libDir/$bqmsFileName") +} + +description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery Metastore" +ext.summary = "A copy of the BQMS catalog." diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 0cfa8da4eb7d..1775dfc5b77b 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -37,14 +37,16 @@ def hadoopVersions = [ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} -def iceberg_version = "1.4.2" +def iceberg_version = "1.6.1" def parquet_version = "1.12.0" def orc_version = "1.9.2" +def hive_version = "3.1.3" dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:pipeline", configuration: "shadow") + implementation library.java.avro implementation library.java.slf4j_api implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" @@ -53,18 +55,37 @@ dependencies { implementation "org.apache.iceberg:iceberg-api:$iceberg_version" implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" - implementation library.java.hadoop_common runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" + implementation library.java.hadoop_common + implementation library.java.jackson_core + implementation library.java.jackson_databind testImplementation project(":sdks:java:managed") testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_gcs_connector testImplementation library.java.bigdataoss_util_hadoop + testImplementation "org.apache.parquet:parquet-avro:$parquet_version" + testImplementation "org.apache.parquet:parquet-common:$parquet_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:extensions:google-cloud-platform-core") testImplementation library.java.junit + + // Hive catalog test dependencies + testImplementation project(path: ":sdks:java:io:iceberg:hive") + testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version" + testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version") + testImplementation ("org.apache.hive:hive-metastore:$hive_version") + testImplementation "org.assertj:assertj-core:3.11.1" + testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") { + exclude group: "org.apache.hive", module: "hive-exec" + exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle" + } + + // BigQueryMetastore catalog dep + testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java") @@ -108,7 +129,7 @@ hadoopVersions.each { kv -> task integrationTest(type: Test) { group = "Verification" def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' - def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests' + def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://managed-iceberg-integration-tests' systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ "--project=${gcpProject}", "--tempLocation=${gcpTempLocation}", @@ -118,17 +139,17 @@ task integrationTest(type: Test) { outputs.upToDateWhen { false } include '**/*IT.class' + // BQ metastore catalog doesn't support java 8 + if (project.findProperty('testJavaVersion') == '8' || + JavaVersion.current().equals(JavaVersion.VERSION_1_8)) { + exclude '**/BigQueryMetastoreCatalogIT.class' + } maxParallelForks 4 classpath = sourceSets.test.runtimeClasspath testClassesDirs = sourceSets.test.output.classesDirs } -tasks.register('catalogTests') { - dependsOn integrationTest - dependsOn ":sdks:java:io:iceberg:hive:integrationTest" -} - task loadTest(type: Test) { def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests/temp-lt' diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java new file mode 100644 index 000000000000..39920e66199b --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BigQueryMetastoreCatalogIT.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.catalog; + +import java.io.IOException; +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + +public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT { + static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"; + static final String DATASET = "managed_iceberg_bqms_tests_no_delete"; + static final long SALT = System.nanoTime(); + + @Override + public String tableId() { + return DATASET + "." + testName.getMethodName() + "_" + SALT; + } + + @Override + public Catalog createCatalog() { + return CatalogUtil.loadCatalog( + BQMS_CATALOG, + "bqms_" + catalogName, + ImmutableMap.builder() + .put("gcp_project", options.getProject()) + .put("gcp_location", "us-central1") + .put("warehouse", warehouse) + .build(), + new Configuration()); + } + + @Override + public void catalogCleanup() throws IOException { + for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) { + // only delete tables that were created in this test run + if (tableIdentifier.name().contains(String.valueOf(SALT))) { + catalog.dropTable(tableIdentifier); + } + } + } + + @Override + public Map managedIcebergConfig(String tableId) { + return ImmutableMap.builder() + .put("table", tableId) + .put( + "catalog_properties", + ImmutableMap.builder() + .put("gcp_project", options.getProject()) + .put("gcp_location", "us-central1") + .put("warehouse", warehouse) + .put("catalog-impl", BQMS_CATALOG) + .build()) + .build(); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java new file mode 100644 index 000000000000..076d3f4f9db8 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.catalog; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.io.iceberg.catalog.hiveutils.HiveMetastoreExtension; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hive.HiveCatalog; + +/** + * Read and write tests using {@link HiveCatalog}. + * + *

Spins up a local Hive metastore to manage the Iceberg table. Warehouse path is set to a GCS + * bucket. + */ +public class HiveCatalogIT extends IcebergCatalogBaseIT { + private static HiveMetastoreExtension hiveMetastoreExtension; + private static final String TEST_DB = "test_db"; + + @Override + public String tableId() { + return String.format("%s.%s", TEST_DB, testName.getMethodName()); + } + + @Override + public void catalogSetup() throws Exception { + hiveMetastoreExtension = new HiveMetastoreExtension(warehouse); + String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB); + Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap()); + hiveMetastoreExtension.metastoreClient().createDatabase(db); + } + + @Override + public Catalog createCatalog() { + return CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), + "hive_" + catalogName, + ImmutableMap.of( + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + String.valueOf(TimeUnit.SECONDS.toMillis(10))), + hiveMetastoreExtension.hiveConf()); + } + + @Override + public void catalogCleanup() throws Exception { + if (hiveMetastoreExtension != null) { + hiveMetastoreExtension.cleanup(); + } + } + + @Override + public Map managedIcebergConfig(String tableId) { + String metastoreUri = hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS); + + Map confProperties = + ImmutableMap.builder() + .put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri) + .build(); + + return ImmutableMap.builder() + .put("table", tableId) + .put("name", "hive_" + catalogName) + .put("config_properties", confProperties) + .build(); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java similarity index 77% rename from sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java rename to sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index a060bc16d6c7..a062f5be9d4f 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -15,9 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.iceberg; +package org.apache.beam.sdk.io.iceberg.catalog; -import static org.apache.beam.sdk.schemas.Schema.FieldType; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -40,7 +39,9 @@ import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -56,14 +57,10 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; @@ -72,7 +69,6 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.encryption.InputFilesDecryptor; -import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.InputFile; @@ -84,44 +80,128 @@ import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Integration tests for {@link IcebergIO} source and sink. */ -@RunWith(JUnit4.class) -public class IcebergIOIT implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(IcebergIOIT.class); +/** + * Base class for {@link Managed} {@link org.apache.beam.sdk.io.iceberg.IcebergIO} read and write + * tests. + * + *

To test a new catalog, create a subclass of this test class and implement the following two + * methods: + * + *

    + *
  • {@link #createCatalog()} + *
  • {@link #managedIcebergConfig(String)} + *
+ * + *

If the catalog needs further logic to set up and tear down, you can override and implement + * these methods: + * + *

    + *
  • {@link #catalogSetup()} + *
  • {@link #catalogCleanup()} + *
+ * + *

1,000 records are used for each test by default. You can change this by overriding {@link + * #numRecords()}. + */ +public abstract class IcebergCatalogBaseIT implements Serializable { + public abstract Catalog createCatalog(); + + public abstract Map managedIcebergConfig(String tableId); + + public void catalogSetup() throws Exception {} + + public void catalogCleanup() throws Exception {} + + public Integer numRecords() { + return 1000; + } + + public String tableId() { + return testName.getMethodName() + ".test_table"; + } + + public String catalogName = "test_catalog_" + System.nanoTime(); + + @Before + public void setUp() throws Exception { + options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); + warehouse = + String.format( + "%s/%s/%s", + TestPipeline.testingPipelineOptions().getTempLocation(), + getClass().getSimpleName(), + RANDOM); + catalogSetup(); + catalog = createCatalog(); + } + + @After + public void cleanUp() throws Exception { + try { + catalogCleanup(); + } catch (Exception e) { + LOG.warn("Catalog cleanup failed.", e); + } + + try { + GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); + GcsPath path = GcsPath.fromUri(warehouse); + + Objects objects = + gcsUtil.listObjects( + path.getBucket(), + getClass().getSimpleName() + "/" + path.getFileName().toString(), + null); + List filesToDelete = + objects.getItems().stream() + .map(obj -> "gs://" + path.getBucket() + "/" + obj.getName()) + .collect(Collectors.toList()); + + gcsUtil.remove(filesToDelete); + } catch (Exception e) { + LOG.warn("Failed to clean up GCS files.", e); + } + } - private static final org.apache.beam.sdk.schemas.Schema DOUBLY_NESTED_ROW_SCHEMA = - org.apache.beam.sdk.schemas.Schema.builder() + protected static String warehouse; + public Catalog catalog; + protected GcpOptions options; + private static final String RANDOM = UUID.randomUUID().toString(); + @Rule public TestPipeline pipeline = TestPipeline.create(); + @Rule public TestName testName = new TestName(); + @Rule public transient Timeout globalTimeout = Timeout.seconds(300); + private static final int NUM_SHARDS = 10; + private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogBaseIT.class); + private static final Schema DOUBLY_NESTED_ROW_SCHEMA = + Schema.builder() .addStringField("doubly_nested_str") .addInt64Field("doubly_nested_float") .build(); - private static final org.apache.beam.sdk.schemas.Schema NESTED_ROW_SCHEMA = - org.apache.beam.sdk.schemas.Schema.builder() + private static final Schema NESTED_ROW_SCHEMA = + Schema.builder() .addStringField("nested_str") .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) .addInt32Field("nested_int") .addFloatField("nested_float") .build(); - private static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA = - org.apache.beam.sdk.schemas.Schema.builder() + private static final Schema BEAM_SCHEMA = + Schema.builder() .addStringField("str") .addStringField("char") .addInt64Field("modulo_5") .addBooleanField("bool") .addInt32Field("int") .addRowField("row", NESTED_ROW_SCHEMA) - .addArrayField("arr_long", FieldType.INT64) + .addArrayField("arr_long", Schema.FieldType.INT64) .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) .addNullableInt64Field("nullable_long") .addDateTimeField("datetime_tz") @@ -174,65 +254,16 @@ public Record apply(Row input) { return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input); } }; - private static final Integer NUM_RECORDS = 1000; - private static final Integer NUM_SHARDS = 10; - - @Rule public TestPipeline pipeline = TestPipeline.create(); - - static GcpOptions options; - - static Configuration catalogHadoopConf; - - @Rule public TestName testName = new TestName(); - - private static String warehouseLocation; - - private String tableId; - private static Catalog catalog; - - @BeforeClass - public static void beforeClass() { - options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); - warehouseLocation = - String.format("%s/IcebergIOIT/%s", options.getTempLocation(), UUID.randomUUID()); - catalogHadoopConf = new Configuration(); - catalogHadoopConf.set("fs.gs.project.id", options.getProject()); - catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT"); - catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); - } - - @Before - public void setUp() { - tableId = testName.getMethodName() + ".test_table"; - } - - @AfterClass - public static void afterClass() { - try { - GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); - GcsPath path = GcsPath.fromUri(warehouseLocation); - - Objects objects = - gcsUtil.listObjects( - path.getBucket(), "IcebergIOIT/" + path.getFileName().toString(), null); - List filesToDelete = - objects.getItems().stream() - .map(obj -> "gs://" + path.getBucket() + "/" + obj.getName()) - .collect(Collectors.toList()); - - gcsUtil.remove(filesToDelete); - } catch (Exception e) { - LOG.warn("Failed to clean up files.", e); - } - } + private final List inputRows = + LongStream.range(0, numRecords()).boxed().map(ROW_FUNC::apply).collect(Collectors.toList()); /** Populates the Iceberg table and Returns a {@link List} of expected elements. */ private List populateTable(Table table) throws IOException { - double recordsPerShardFraction = NUM_RECORDS.doubleValue() / NUM_SHARDS; + double recordsPerShardFraction = numRecords().doubleValue() / NUM_SHARDS; long maxRecordsPerShard = Math.round(Math.ceil(recordsPerShardFraction)); AppendFiles appendFiles = table.newAppend(); - List expectedRows = new ArrayList<>(NUM_RECORDS); + List expectedRows = new ArrayList<>(numRecords()); int totalRecords = 0; for (int shardNum = 0; shardNum < NUM_SHARDS; ++shardNum) { String filepath = table.location() + "/" + UUID.randomUUID(); @@ -246,7 +277,7 @@ private List populateTable(Table table) throws IOException { .build(); for (int recordNum = 0; - recordNum < maxRecordsPerShard && totalRecords < NUM_RECORDS; + recordNum < maxRecordsPerShard && totalRecords < numRecords(); ++recordNum, ++totalRecords) { Row expectedBeamRow = ROW_FUNC.apply((long) recordNum); @@ -264,7 +295,7 @@ private List populateTable(Table table) throws IOException { } private List readRecords(Table table) { - Schema tableSchema = table.schema(); + org.apache.iceberg.Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); List writtenRecords = new ArrayList<>(); for (CombinedScanTask task : tableScan.planTasks()) { @@ -289,31 +320,13 @@ private List readRecords(Table table) { return writtenRecords; } - private Map managedIcebergConfig(String tableId) { - return ImmutableMap.builder() - .put("table", tableId) - .put("catalog_name", "test-name") - .put( - "catalog_properties", - ImmutableMap.builder() - .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse", warehouseLocation) - .build()) - .build(); - } - - /** - * Test of a predetermined moderate number of records written directly to Iceberg then read via a - * Beam pipeline. Table initialization is done on a single process using the Iceberg APIs so the - * data cannot be "big". - */ @Test public void testRead() throws Exception { - Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA); + Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA); List expectedRows = populateTable(table); - Map config = managedIcebergConfig(tableId); + Map config = managedIcebergConfig(tableId()); PCollection rows = pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); @@ -322,33 +335,26 @@ public void testRead() throws Exception { pipeline.run().waitUntilFinish(); } - private static final List INPUT_ROWS = - LongStream.range(0, NUM_RECORDS).boxed().map(ROW_FUNC::apply).collect(Collectors.toList()); - - /** - * Test of a predetermined moderate number of records written to Iceberg using a Beam pipeline, - * then read directly using Iceberg API. - */ @Test public void testWrite() { // Write with Beam // Expect the sink to create the table - Map config = managedIcebergConfig(tableId); - PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); + Map config = managedIcebergConfig(tableId()); + PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); - Table table = catalog.loadTable(TableIdentifier.parse(tableId)); + Table table = catalog.loadTable(TableIdentifier.parse(tableId())); assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA)); // Read back and check records are correct List returnedRecords = readRecords(table); assertThat( - returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } @Test - public void testWritePartitionedData() { + public void testWriteToPartitionedTable() { // For an example row where bool=true, modulo_5=3, str=value_303, // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ PartitionSpec partitionSpec = @@ -358,34 +364,35 @@ public void testWritePartitionedData() { .truncate("str", "value_x".length()) .build(); Table table = - catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); + catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec); // Write with Beam - Map config = managedIcebergConfig(tableId); - PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); + Map config = managedIcebergConfig(tableId()); + PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); // Read back and check records are correct List returnedRecords = readRecords(table); assertThat( - returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } private PeriodicImpulse getStreamingSource() { return PeriodicImpulse.create() - .stopAfter(Duration.millis(NUM_RECORDS - 1)) + .stopAfter(Duration.millis(numRecords() - 1)) .withInterval(Duration.millis(1)); } @Test public void testStreamingWrite() { + int numRecords = numRecords(); PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); Table table = - catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); + catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec); - Map config = new HashMap<>(managedIcebergConfig(tableId)); + Map config = new HashMap<>(managedIcebergConfig(tableId())); config.put("triggering_frequency_seconds", 4); // create elements from longs in range [0, 1000) @@ -394,7 +401,7 @@ public void testStreamingWrite() { .apply(getStreamingSource()) .apply( MapElements.into(TypeDescriptors.rows()) - .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS))) + .via(instant -> ROW_FUNC.apply(instant.getMillis() % numRecords))) .setRowSchema(BEAM_SCHEMA); assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED)); @@ -404,17 +411,18 @@ public void testStreamingWrite() { List returnedRecords = readRecords(table); assertThat( - returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } @Test public void testStreamingWriteWithPriorWindowing() { + int numRecords = numRecords(); PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA).identity("bool").identity("modulo_5").build(); Table table = - catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); + catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec); - Map config = new HashMap<>(managedIcebergConfig(tableId)); + Map config = new HashMap<>(managedIcebergConfig(tableId())); config.put("triggering_frequency_seconds", 4); // over a span of 10 seconds, create elements from longs in range [0, 1000) @@ -426,7 +434,7 @@ public void testStreamingWriteWithPriorWindowing() { .accumulatingFiredPanes()) .apply( MapElements.into(TypeDescriptors.rows()) - .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS))) + .via(instant -> ROW_FUNC.apply(instant.getMillis() % numRecords))) .setRowSchema(BEAM_SCHEMA); assertThat(input.isBounded(), equalTo(PCollection.IsBounded.UNBOUNDED)); @@ -436,7 +444,7 @@ public void testStreamingWriteWithPriorWindowing() { List returnedRecords = readRecords(table); assertThat( - returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } private void writeToDynamicDestinations(@Nullable String filterOp) { @@ -450,7 +458,8 @@ private void writeToDynamicDestinations(@Nullable String filterOp) { */ private void writeToDynamicDestinations( @Nullable String filterOp, boolean streaming, boolean partitioning) { - String tableIdentifierTemplate = tableId + "_{modulo_5}_{char}"; + int numRecords = numRecords(); + String tableIdentifierTemplate = tableId() + "_{modulo_5}_{char}"; Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate)); List fieldsToFilter = Arrays.asList("row", "str", "int", "nullable_long"); @@ -475,13 +484,14 @@ private void writeToDynamicDestinations( } } - Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema()); + org.apache.iceberg.Schema tableSchema = + IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema()); - TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId + "_0_a"); - TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId + "_1_b"); - TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId + "_2_c"); - TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId + "_3_d"); - TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId + "_4_e"); + TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId() + "_0_a"); + TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId() + "_1_b"); + TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId() + "_2_c"); + TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId() + "_3_d"); + TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId() + "_4_e"); // the sink doesn't support creating partitioned tables yet, // so we need to create it manually for this test case if (partitioning) { @@ -504,10 +514,11 @@ private void writeToDynamicDestinations( .apply(getStreamingSource()) .apply( MapElements.into(TypeDescriptors.rows()) - .via(instant -> ROW_FUNC.apply(instant.getMillis() % NUM_RECORDS))); + .via(instant -> ROW_FUNC.apply(instant.getMillis() % numRecords))); } else { - input = pipeline.apply(Create.of(INPUT_ROWS)); + input = pipeline.apply(Create.of(inputRows)); } + input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig)); pipeline.run().waitUntilFinish(); @@ -537,7 +548,7 @@ private void writeToDynamicDestinations( List records = returnedRecords.get(i); long l = i; Stream expectedRecords = - INPUT_ROWS.stream() + inputRows.stream() .filter(rec -> checkStateNotNull(rec.getInt64("modulo_5")) == l) .map(rowFilter::filter) .map(recordFunc::apply); @@ -556,11 +567,6 @@ public void testWriteToDynamicDestinationsAndDropFields() { writeToDynamicDestinations("drop"); } - @Test - public void testWriteToDynamicDestinationsAndKeepFields() { - writeToDynamicDestinations("keep"); - } - @Test public void testWriteToDynamicDestinationsWithOnlyRecord() { writeToDynamicDestinations("only"); diff --git a/settings.gradle.kts b/settings.gradle.kts index a8bee45a05ac..c214d5988da1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -357,3 +357,5 @@ include("sdks:java:extensions:combiners") findProject(":sdks:java:extensions:combiners")?.name = "combiners" include("sdks:java:io:iceberg:hive") findProject(":sdks:java:io:iceberg:hive")?.name = "hive" +include("sdks:java:io:iceberg:bqms") +findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms"