diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index 46ff13a95..39f5ecc59 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -164,8 +164,11 @@ object Migration { target: Target[I], disableRecovery: List[SchemaCriterion] ): F[Migration[C]] = { + val nonAtomicTypes = discovery.shreddedTypes.filterNot(_.isAtomic) + val maxSchemaKeysPerTableName = DataDiscovery.getMaxSchemaKeyPerTableName(nonAtomicTypes).values.toList + val filteredTypes = nonAtomicTypes.filter(s => maxSchemaKeysPerTableName.contains(s.info.getSchemaKey)) val descriptions: LoaderAction[F, List[Description]] = - discovery.shreddedTypes.filterNot(_.isAtomic).traverse { + filteredTypes.traverse { case s: ShreddedType.Tabular => if (!disableRecovery.contains(s.info.toCriterion)) EitherT.rightT[F, LoaderError](Description.Table(discovery.shredModels(s.info.getSchemaKey).mergeRedshiftSchemasResult)) diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala index bd8bb1cae..d182e4f32 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala @@ -123,6 +123,119 @@ class MigrationSpec extends Specification { } } + "build Migration for only max schema keys" in { + implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter + implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init) + implicit val iglu: Iglu[Pure] = PureIglu.interpreter + implicit val logging: Logging[Pure] = PureLogging.interpreter() + def createTestData(testNum: Int) = { + val s1 = ShreddedType.Tabular( + ShreddedType.Info( + BlobStorage.Folder.coerce("s3://shredded/archive"), + "com.acme", + s"some_context", + SchemaVer.Full(testNum, 0, 0), + SnowplowEntity.Context + ) + ) + val s2 = s1.copy(info = s1.info.copy(version = SchemaVer.Full(testNum, 0, 1))) + val s3 = s1.copy(info = s1.info.copy(version = SchemaVer.Full(testNum, 0, 2))) + val types = List(s1, s2, s3) + val schema1 = SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()) + val schema2 = SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()) + val schema3 = SelfDescribingSchema(SchemaMap(s3.info.getSchemaKey), Schema()) + val shredModels = Map( + schema1.self.schemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas(NonEmptyList.of(schema1))( + schema1.self.schemaKey + ), + foldMapMergeRedshiftSchemas(NonEmptyList.of(schema1)) + ), + schema2.self.schemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas(NonEmptyList.of(schema1, schema2))( + schema2.self.schemaKey + ), + foldMapMergeRedshiftSchemas(NonEmptyList.of(schema1, schema2)) + ), + schema3.self.schemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas(NonEmptyList.of(schema1, schema2, schema3))( + schema3.self.schemaKey + ), + foldMapMergeRedshiftSchemas(NonEmptyList.of(schema1, schema2, schema3)) + ) + ) + + val createToDdl = + s"""CREATE TABLE IF NOT EXISTS public.com_acme_some_context_$testNum ( + | "schema_vendor" VARCHAR(128) ENCODE ZSTD NOT NULL, + | "schema_name" VARCHAR(128) ENCODE ZSTD NOT NULL, + | "schema_format" VARCHAR(128) ENCODE ZSTD NOT NULL, + | "schema_version" VARCHAR(128) ENCODE ZSTD NOT NULL, + | "root_id" CHAR(36) ENCODE RAW NOT NULL, + | "root_tstamp" TIMESTAMP ENCODE ZSTD NOT NULL, + | "ref_root" VARCHAR(255) ENCODE ZSTD NOT NULL, + | "ref_tree" VARCHAR(1500) ENCODE ZSTD NOT NULL, + | "ref_parent" VARCHAR(255) ENCODE ZSTD NOT NULL, + | FOREIGN KEY (root_id) REFERENCES public.events(event_id) + |) + |DISTSTYLE KEY + |DISTKEY (root_id) + |SORTKEY (root_tstamp); + | + |COMMENT ON TABLE public.com_acme_some_context_$testNum IS 'iglu:com.acme/some_context/jsonschema/$testNum-0-2'; + |""".stripMargin + + val expectedSql = LogEntry.Sql(Statement.TableExists(s"com_acme_some_context_$testNum")) + val expectedMigrations = List( + LogEntry.Message(s"Creating public.com_acme_some_context_$testNum table for iglu:com.acme/some_context/jsonschema/$testNum-0-2"), + LogEntry.Sql(Statement.CreateTable(Fragment.const0(createToDdl))), + LogEntry.Sql( + Statement.CommentOn(s"public.com_acme_some_context_$testNum", s"iglu:com.acme/some_context/jsonschema/$testNum-0-2") + ), + LogEntry.Message(s"Table public.com_acme_some_context_$testNum created") + ) + + (types, shredModels, expectedSql, expectedMigrations) + } + + val (types, shredModels, expectedSqls, expectedMigration) = (1 to 3) + .map(createTestData) + .foldLeft( + (List.empty[ShreddedType.Tabular], Map.empty[SchemaKey, DiscoveredShredModels], List.empty[LogEntry], List.empty[LogEntry]) + ) { + case ( + (accTypes, accShredModels, accExpectedSqls, accExpectedMigrations), + (types, shredModels, expectedSql, expectedMigrations) + ) => + ( + accTypes ::: types, + accShredModels ++ shredModels, + accExpectedSqls :+ expectedSql, + accExpectedMigrations ::: expectedMigrations + ) + } + + val input = + DataDiscovery( + BlobStorage.Folder.coerce("s3://shredded/archive"), + types, + Compression.Gzip, + TypesInfo.Shredded(List.empty), + Nil, + shredModels + ) + + val expected = PureTransaction.NoTransactionMessage :: expectedSqls + + val (state, value) = Migration.build[Pure, Pure, Unit](input, PureDAO.DummyTarget, Nil).run + + state.getLog must beEqualTo(expected) + value must beRight.like { case Migration(preTransaction, inTransaction) => + preTransaction mustEqual Nil + inTransaction.runS.getLog must beEqualTo(expectedMigration) + } + } + "ignore atomic schema" in { implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init)