Skip to content

Commit

Permalink
Improve RDB Loader behavior for table without comment
Browse files Browse the repository at this point in the history
RDB Loader (Redshift) would not emit an easy to understand error message in case the shredded table comment is not a schema key. The app would also retry the transaction, an unnecessary effort.

This commit will make the app emit an easy to understand error message for such cases and will not retry the transaction as there is no benefit in that.
  • Loading branch information
oguzhanunlu committed Jul 9, 2024
1 parent 25d6026 commit 65637df
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object LoaderError {

implicit val loaderErrorShow: Show[LoaderError] = {
case d: DiscoveryError => "Data discovery error with following issues:\n" + d.failures.toList.map(_.getMessage).mkString("\n")
case m: MigrationError => s"Table migration error. Please check the table consistency. ${m.message}"
case m: TableCommentError => s"Table comment error. Please check the table comment. ${m.message}"
case t: TimeoutError => t.message
}

Expand All @@ -43,7 +43,13 @@ object LoaderError {
}

/** Error happened during DDL-statements execution. Critical */
final case class MigrationError(message: String) extends LoaderError
sealed trait MigrationError extends LoaderError

/**
* Error happened when reading redshift table comment to discover latest schema key that a table
* has been migrated to so far
*/
final case class TableCommentError(message: String) extends MigrationError

/** A timeout has reached, Loader should abort the current operation and recover */
final case class TimeoutError(message: String) extends LoaderError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, Shredd
import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction}
import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable
import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError, readSchemaKey}
import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError
import doobie.Fragment

import scala.math.Ordered.orderingToOrdered
Expand Down Expand Up @@ -205,7 +206,7 @@ object Migration {
Control.tableExists[F](rm.tableName).ifM(Applicative[F].pure(None), Applicative[F].pure(Some(target.createTable(rm))))
}

def updateGoodTable[F[_]: Monad: DAO, I](
def updateGoodTable[F[_]: MonadThrow: DAO, I](
target: Target[I],
goodModel: ShredModel.GoodModel
): F[List[Block]] =
Expand All @@ -216,7 +217,7 @@ object Migration {
else Monad[F].pure(Nil)
} yield block

def buildBlock[F[_]: Monad: DAO, I](description: Description, target: Target[I]): F[List[Block]] =
def buildBlock[F[_]: MonadThrow: DAO, I](description: Description, target: Target[I]): F[List[Block]] =
description match {
case Description.Table(mergeResult) =>
val goodModel = mergeResult.goodModel
Expand Down Expand Up @@ -303,8 +304,14 @@ object Migration {
else List(Monad[F].unit)

/** Find the latest schema version in the table and confirm that it is the latest in `schemas` */
def getVersion[F[_]: DAO](tableName: String): F[SchemaKey] =
DAO[F].executeQuery[SchemaKey](Statement.GetVersion(tableName))(readSchemaKey)
def getVersion[F[_]: MonadThrow: DAO](tableName: String): F[SchemaKey] =
DAO[F]
.executeQuery[Option[SchemaKey]](Statement.GetVersion(tableName))(readSchemaKey)
.flatMap(
_.fold(
MonadThrow[F].raiseError[SchemaKey](TableCommentError(s"Could not read a schema key in table [$tableName] comment"))
)(_.pure[F])
)

sealed trait Entity {
def getName: String = this match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import cats.{Applicative, MonadThrow, Show}
import cats.effect.Clock
import cats.implicits._
import com.snowplowanalytics.snowplow.rdbloader.config.Config.{Retries, Strategy}
import com.snowplowanalytics.snowplow.rdbloader.LoaderError.TableCommentError
import retry.{RetryDetails, RetryPolicies, RetryPolicy}
import retry._

import scala.concurrent.duration.{Duration, FiniteDuration}

/**
Expand All @@ -32,6 +34,7 @@ object Retry {

/** List of predicates, matching exceptions that should not be retried */
val FatalFailures: List[Throwable => Boolean] = List(
e => e.isInstanceOf[TableCommentError],
e => e.isInstanceOf[IllegalStateException],
e => e.toString.toLowerCase.contains("[amazon](500310) invalid operation"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ package object rdbloader {
implicit val getSchemaKey: Get[SchemaKey] =
Get[String].temap(s => SchemaKey.fromUri(s).leftMap(e => s"Cannot parse $s into Iglu schema key, ${e.code}"))

implicit val readSchemaKey: Read[SchemaKey] =
Read.fromGet(getSchemaKey)
implicit val readSchemaKey: Read[Option[SchemaKey]] =
Read.fromGetOption(getSchemaKey)

// To replace Instant with sql.Timetstamp
implicit val readTimestamps: Read[Timestamps] = {
Expand Down

0 comments on commit 65637df

Please sign in to comment.