Skip to content

Commit

Permalink
Improve RDB Loader behavior for table without comment
Browse files Browse the repository at this point in the history
RDB Loader (Redshift) would not emit an easy to understand error message in case the shredded table comment is not a schema key. The app would also retry the transaction, an unnecessary effort.

This commit will make the app emit an easy to understand error message for such cases and will not retry the transaction as there is no benefit in that.
  • Loading branch information
oguzhanunlu committed Jul 10, 2024
1 parent edf594c commit 381ea28
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ sealed trait LoaderError extends Throwable with Product with Serializable {
object LoaderError {

implicit val loaderErrorShow: Show[LoaderError] = {
case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n")
case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}"
case t: TimeoutError => t.message
case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n")
case m: TableCommentError => s"Table comment error. Please check the table comment. ${m.message}"
case t: TimeoutError => t.message
}

/**
Expand All @@ -42,8 +42,11 @@ object LoaderError {
validated.leftMap(errors => DiscoveryError(errors): LoaderError).toEither
}

/** Error happened during DDL-statements execution. Critical */
final case class MigrationError(message: String) extends LoaderError
/**
* Error happened when reading redshift table comment to discover latest schema key that a table
* has been migrated to so far
*/
final case class TableCommentError(message: String) extends LoaderError

/** A timeout has reached, Loader should abort the current operation and recover */
final case class TimeoutError(message: String) extends LoaderError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd
import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction}
import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable
import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError, readSchemaKey}
import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError
import doobie.Fragment

import scala.math.Ordered.orderingToOrdered
Expand Down Expand Up @@ -205,7 +206,7 @@ object Migration {
Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(None), Applicative[F].pure(Some(target.createTable(rm))))
}

def updateGoodTable[F[_]: Monad: DAO, I](
def updateGoodTable[F[_]: MonadThrow: DAO, I](
target: Target[I],
goodModel: ShredModel.GoodModel
): F[List[Block]] =
Expand All @@ -216,7 +217,7 @@ object Migration {
else Monad[F].pure(Nil)
} yield block

def buildBlock[F[_]: Monad: DAO, I](description: Description, target: Target[I]): F[List[Block]] =
def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] =
description match {
case Description.Table(mergeResult) =>
val goodModel = mergeResult.goodModel
Expand Down Expand Up @@ -303,8 +304,14 @@ object Migration {
else List(Monad[F].unit)

/** Find the latest schema version in the table and confirm that it is the latest in `schemas` */
def getVersion[F[_]: DAO](tableName: String): F[SchemaKey] =
DAO[F].executeQuery[SchemaKey](Statement.GetVersion(tableName))(readSchemaKey)
def getVersion[F[_]: MonadThrow: DAO](tableName: String): F[SchemaKey] =
DAO[F]
.executeQuery[Option[SchemaKey]](Statement.GetVersion(tableName))(readSchemaKey)
.flatMap(
_.fold(
MonadThrow[F].raiseError[SchemaKey](TableCommentError(s"Could not read a schema key in table [$tableName] comment"))
)(_.pure[F])
)

sealed trait Entity {
def getName: String = this match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import cats.{Applicative, MonadThrow, Show}
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.snowplow.rdbloader.config.Config.{Retries, Strategy}
import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError
import retry.{RetryDetails, RetryPolicies, RetryPolicy}
import retry._

import scala.concurrent.duration.{Duration, FiniteDuration}

/**
Expand All @@ -32,6 +34,7 @@ object Retry {

/** List of predicates, matching exceptions that should not be retried */
val FatalFailures: List[Throwable => Boolean] = List(
e => e.isInstanceOf[TableCommentError],
e => e.isInstanceOf[IllegalStateException],
e => e.toString.toLowerCase.contains("[amazon](500310) invalid operation"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ package object rdbloader {
implicit val getSchemaKey: Get[SchemaKey] =
Get[String].temap(s => SchemaKey.fromUri(s).leftMap(e => s"Cannot parse $s into Iglu schema key, ${e.code}"))

implicit val readSchemaKey: Read[SchemaKey] =
Read.fromGet(getSchemaKey)
implicit val readSchemaKey: Read[Option[SchemaKey]] =
Read.fromGetOption(getSchemaKey)

// To replace Instant with sql.Timetstamp
implicit val readTimestamps: Read[Timestamps] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ object LoadSpec {

def withExistingRecord(s: TestState)(query: Statement): Any =
query match {
case Statement.GetVersion(_) => SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0))
case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0)))
case Statement.TableExists(_) => false
case Statement.GetColumns(_) => List("some_column")
case Statement.ManifestGet(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object PureDAO {

def getResult(s: TestState)(query: Statement): Any =
query match {
case Statement.GetVersion(_) => SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0))
case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "some_context", "jsonschema", SchemaVer.Full(2, 0, 0)))
case Statement.TableExists(_) => false
case Statement.GetColumns(_) => List("some_column")
case Statement.ManifestGet(_) => List()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ object RedshiftSpec {
def jdbcResults(state: TestState)(statement: Statement): Any = {
val _ = state
statement match {
case Statement.GetVersion(_) => SchemaKey("com.acme", "context", "jsonschema", SchemaVer.Full(1, 0, 0))
case Statement.GetVersion(_) => Some(SchemaKey("com.acme", "context", "jsonschema", SchemaVer.Full(1, 0, 0)))
case Statement.TableExists(_) => true
case Statement.GetColumns(_) => List("some_column")
case Statement.ManifestGet(_) => List()
Expand Down

0 comments on commit 381ea28

Please sign in to comment.