diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala index d94ad016b..28fdde86f 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/LoaderError.scala @@ -24,9 +24,9 @@ sealed trait LoaderError extends Throwable with Product with Serializable { object LoaderError { implicit val loaderErrorShow: Show[LoaderError] = { - case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n") - case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}" - case t: TimeoutError => t.message + case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n") + case m: TableCommentError => s"Table comment error. Please check the table comment. ${m.message}" + case t: TimeoutError => t.message } /** @@ -42,8 +42,11 @@ object LoaderError { validated.leftMap(errors => DiscoveryError(errors): LoaderError).toEither } - /** Error happened during DDL-statements execution. Critical */ - final case class MigrationError(message: String) extends LoaderError + /** + * Error happened when reading redshift table comment to discover latest schema key that a table + * has been migrated to so far + */ + final case class TableCommentError(message: String) extends LoaderError /** A timeout has reached, Loader should abort the current operation and recover */ final case class TimeoutError(message: String) extends LoaderError diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index c20642bb8..eea328a32 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError, readSchemaKey} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError import doobie.Fragment import scala.math.Ordered.orderingToOrdered @@ -205,7 +206,7 @@ object Migration { Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(None), Applicative[F].pure(Some(target.createTable(rm)))) } - def updateGoodTable[F[_]: Monad: DAO, I]( + def updateGoodTable[F[_]: MonadThrow: DAO, I]( target: Target[I], goodModel: ShredModel.GoodModel ): F[List[Block]] = @@ -216,7 +217,7 @@ object Migration { else Monad[F].pure(Nil) } yield block - def buildBlock[F[_]: Monad: DAO, I](description: Description, target: Target[I]): F[List[Block]] = + def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] = description match { case Description.Table(mergeResult) => val goodModel = mergeResult.goodModel @@ -303,8 +304,14 @@ object Migration { else List(Monad[F].unit) /** Find the latest schema version in the table and confirm that it is the latest in `schemas` */ - def getVersion[F[_]: DAO](tableName: String): F[SchemaKey] = - DAO[F].executeQuery[SchemaKey](Statement.GetVersion(tableName))(readSchemaKey) + def getVersion[F[_]: MonadThrow: DAO](tableName: String): F[SchemaKey] = + DAO[F] + .executeQuery[Option[SchemaKey]](Statement.GetVersion(tableName))(readSchemaKey) + .flatMap( + _.fold( + MonadThrow[F].raiseError[SchemaKey](TableCommentError(s"Could not read a schema key in table [$tableName] comment")) + )(_.pure[F]) + ) sealed trait Entity { def getName: String = this match { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala index 17e929835..52efcc07d 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Retry.scala @@ -14,8 +14,10 @@ import cats.{Applicative, MonadThrow, Show} import cats.effect.Clock import cats.implicits._ import com.snowplowanalytics.snowplow.rdbloader.config.Config.{Retries, Strategy} +import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError import retry.{RetryDetails, RetryPolicies, RetryPolicy} import retry._ + import scala.concurrent.duration.{Duration, FiniteDuration} /** @@ -32,6 +34,7 @@ object Retry { /** List of predicates, matching exceptions that should not be retried */ val FatalFailures: List[Throwable => Boolean] = List( + e => e.isInstanceOf[TableCommentError], e => e.isInstanceOf[IllegalStateException], e => e.toString.toLowerCase.contains("[amazon](500310) invalid operation"), diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala index d84abc6fe..702fc09b9 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/package.scala @@ -86,8 +86,8 @@ package object rdbloader { implicit val getSchemaKey: Get[SchemaKey] = Get[String].temap(s => SchemaKey.fromUri(s).leftMap(e => s"Cannot parse $s into Iglu schema key, ${e.code}")) - implicit val readSchemaKey: Read[SchemaKey] = - Read.fromGet(getSchemaKey) + implicit val readSchemaKey: Read[Option[SchemaKey]] = + Read.fromGetOption(getSchemaKey) // To replace Instant with sql.Timetstamp implicit val readTimestamps: Read[Timestamps] = { diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 640e6fa96..28c049b74 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -334,7 +334,7 @@ object LoadSpec { def withExistingRecord(s: TestState)(query: Statement): Any = query match { - case Statement.GetVersion(_) => SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0)) + case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0))) case Statement.TableExists(_) => false case Statement.GetColumns(_) => List("some_column") case Statement.ManifestGet(_) => diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index e87fff0e8..e6fd3f2ff 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -43,7 +43,7 @@ object PureDAO { def getResult(s: TestState)(query: Statement): Any = query match { - case Statement.GetVersion(_) => SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0)) + case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0))) case Statement.TableExists(_) => false case Statement.GetColumns(_) => List("some_column") case Statement.ManifestGet(_) => List() diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala index 32b47f873..6b37db250 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala @@ -156,7 +156,7 @@ object RedshiftSpec { def jdbcResults(state: TestState)(statement: Statement): Any = { val _ = state statement match { - case Statement.GetVersion(_) => SchemaKey("com.acme", "context", "jsonschema", SchemaVer.Full(1, 0, 0)) + case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "context", "jsonschema", SchemaVer.Full(1, 0, 0))) case Statement.TableExists(_) => true case Statement.GetColumns(_) => List("some_column") case Statement.ManifestGet(_) => List()