From 381ea287a8d84bc6d5a98d1494900890553b0ac5 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Tue, 9 Jul 2024 21:53:15 +0300 Subject: [PATCH] Improve RDB Loader behavior for table without comment RDB Loader (Redshift) would not emit an easy to understand error message in case the shredded table comment is not a schema key. The app would also retry the transaction, an unnecessary effort. This commit will make the app emit an easy to understand error message for such cases and will not retry the transaction as there is no benefit in that. --- .../snowplow/rdbloader/LoaderError.scala | 13 ++++++++----- .../snowplow/rdbloader/db/Migration.scala | 15 +++++++++++---- .../snowplow/rdbloader/loading/Retry.scala | 3 +++ .../snowplow/rdbloader/package.scala | 4 ++-- .../snowplow/rdbloader/loading/LoadSpec.scala | 2 +- .../snowplow/rdbloader/test/PureDAO.scala | 2 +- .../snowplow/loader/redshift/RedshiftSpec.scala | 2 +- 7 files changed, 27 insertions(+), 14 deletions(-) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala index d94ad016b..28fdde86f 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala @@ -24,9 +24,9 @@ sealed trait LoaderError extends Throwable with Product with Serializable { object LoaderError { implicit val loaderErrorShow: Show[LoaderError] = { - case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n") - case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}" - case t: TimeoutError => t.message + case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n") + case m: TableCommentError => s"Table comment error. Please check the table comment. ${m.message}" + case t: TimeoutError => t.message } /** @@ -42,8 +42,11 @@ object LoaderError { validated.leftMap(errors => DiscoveryError(errors): LoaderError).toEither } - /** Error happened during DDL-statements execution. Critical */ - final case class MigrationError(message: String) extends LoaderError + /** + * Error happened when reading redshift table comment to discover latest schema key that a table + * has been migrated to so far + */ + final case class TableCommentError(message: String) extends LoaderError /** A timeout has reached, Loader should abort the current operation and recover */ final case class TimeoutError(message: String) extends LoaderError diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index c20642bb8..eea328a32 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError, readSchemaKey} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError import doobie.Fragment import scala.math.Ordered.orderingToOrdered @@ -205,7 +206,7 @@ object Migration { Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(None), Applicative[F].pure(Some(target.createTable(rm)))) } - def updateGoodTable[F[_]: Monad: DAO, I]( + def updateGoodTable[F[_]: MonadThrow: DAO, I]( target: Target[I], goodModel: ShredModel.GoodModel ): F[List[Block]] = @@ -216,7 +217,7 @@ object Migration { else Monad[F].pure(Nil) } yield block - def buildBlock[F[_]: Monad: DAO, I](description: Description, target: Target[I]): F[List[Block]] = + def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] = description match { case Description.Table(mergeResult) => val goodModel = mergeResult.goodModel @@ -303,8 +304,14 @@ object Migration { else List(Monad[F].unit) /** Find the latest schema version in the table and confirm that it is the latest in `schemas` */ - def getVersion[F[_]: DAO](tableName: String): F[SchemaKey] = - DAO[F].executeQuery[SchemaKey](Statement.GetVersion(tableName))(readSchemaKey) + def getVersion[F[_]: MonadThrow: DAO](tableName: String): F[SchemaKey] = + DAO[F] + .executeQuery[Option[SchemaKey]](Statement.GetVersion(tableName))(readSchemaKey) + .flatMap( + _.fold( + MonadThrow[F].raiseError[SchemaKey](TableCommentError(s"Could not read a schema key in table [$tableName] comment")) + )(_.pure[F]) + ) sealed trait Entity { def getName: String = this match { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala index 17e929835..52efcc07d 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala @@ -14,8 +14,10 @@ import cats.{Applicative, MonadThrow, Show} import cats.effect.Clock import cats.implicits._ import com.snowplowanalytics.snowplow.rdbloader.config.Config.{Retries, Strategy} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError import retry.{RetryDetails, RetryPolicies, RetryPolicy} import retry._ + import scala.concurrent.duration.{Duration, FiniteDuration} /** @@ -32,6 +34,7 @@ object Retry { /** List of predicates, matching exceptions that should not be retried */ val FatalFailures: List[Throwable => Boolean] = List( + e => e.isInstanceOf[TableCommentError], e => e.isInstanceOf[IllegalStateException], e => e.toString.toLowerCase.contains("[amazon](500310) invalid operation"), diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala index d84abc6fe..702fc09b9 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala @@ -86,8 +86,8 @@ package object rdbloader { implicit val getSchemaKey: Get[SchemaKey] = Get[String].temap(s => SchemaKey.fromUri(s).leftMap(e => s"Cannot parse $s into Iglu schema key, ${e.code}")) - implicit val readSchemaKey: Read[SchemaKey] = - Read.fromGet(getSchemaKey) + implicit val readSchemaKey: Read[Option[SchemaKey]] = + Read.fromGetOption(getSchemaKey) // To replace Instant with sql.Timetstamp implicit val readTimestamps: Read[Timestamps] = { diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 640e6fa96..28c049b74 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -334,7 +334,7 @@ object LoadSpec { def withExistingRecord(s: TestState)(query: Statement): Any = query match { - case Statement.GetVersion(_) => SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0)) + case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0))) case Statement.TableExists(_) => false case Statement.GetColumns(_) => List("some_column") case Statement.ManifestGet(_) => diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index e87fff0e8..e6fd3f2ff 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -43,7 +43,7 @@ object PureDAO { def getResult(s: TestState)(query: Statement): Any = query match { - case Statement.GetVersion(_) => SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0)) + case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0))) case Statement.TableExists(_) => false case Statement.GetColumns(_) => List("some_column") case Statement.ManifestGet(_) => List() diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala index 32b47f873..6b37db250 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala @@ -156,7 +156,7 @@ object RedshiftSpec { def jdbcResults(state: TestState)(statement: Statement): Any = { val _ = state statement match { - case Statement.GetVersion(_) => SchemaKey("com.acme", "context", "jsonschema", SchemaVer.Full(1, 0, 0)) + case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "context", "jsonschema", SchemaVer.Full(1, 0, 0))) case Statement.TableExists(_) => true case Statement.GetColumns(_) => List("some_column") case Statement.ManifestGet(_) => List()