Skip to content

Commit

Permalink
Redshift loader: send statsd metrics for recovery tables (close #1331)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jan 3, 2024
1 parent 02da1fc commit f1c6ae8
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,8 @@ object Loader {
Load.load[F, C, I](setStageC, incrementAttemptsC, discovery, initQueryResult, target, config.featureFlags.disableMigration)
attempts <- control.getAndResetAttempts
_ <- result match {
case Load.LoadSuccess(ingested, recoveryTableNames) =>
val now = Logging[F].warning("No ingestion timestamp available") *> Clock[F].realTimeInstant
for {
loaded <- ingested.map(Monad[F].pure).getOrElse(now)
_ <- Load.congratulate[F](attempts, start, loaded, discovery.origin, recoveryTableNames)
_ <- control.incrementLoaded
} yield ()
case loadSuccess: Load.LoadSuccess =>
Load.congratulate[F](attempts, start, loadSuccess, discovery.origin) *> control.incrementLoaded
case fal: Load.FolderAlreadyLoaded =>
Monitoring[F].alert(fal.toAlert)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ sealed trait StorageTarget extends Product with Serializable {
def properties: Properties
def eventsLoadAuthMethod: StorageTarget.LoadAuthMethod
def foldersLoadAuthMethod: StorageTarget.LoadAuthMethod

def reportRecoveryTableMetrics: Boolean
}

object StorageTarget {
Expand Down Expand Up @@ -92,6 +94,8 @@ object StorageTarget {

override def eventsLoadAuthMethod: LoadAuthMethod = loadAuthMethod
override def foldersLoadAuthMethod: LoadAuthMethod = loadAuthMethod

override def reportRecoveryTableMetrics: Boolean = true
}

final case class Databricks(
Expand Down Expand Up @@ -131,6 +135,8 @@ object StorageTarget {

override def eventsLoadAuthMethod: LoadAuthMethod = loadAuthMethod
override def foldersLoadAuthMethod: LoadAuthMethod = loadAuthMethod

override def reportRecoveryTableMetrics: Boolean = false
}

final case class Snowflake(
Expand Down Expand Up @@ -209,6 +215,8 @@ object StorageTarget {
transformedStage.fold(loadAuthMethod)(_ => LoadAuthMethod.NoCreds)
override def foldersLoadAuthMethod: LoadAuthMethod =
folderMonitoringStage.fold(loadAuthMethod)(_ => LoadAuthMethod.NoCreds)

override def reportRecoveryTableMetrics: Boolean = false
}

object Snowflake {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,15 @@ object Environment {
reporters = List(statsdReporter, stdoutReporter)
periodicMetrics <- Resource.eval(Metrics.PeriodicMetrics.init[F](reporters, cli.config.monitoring.metrics.period))
implicit0(monitoring: Monitoring[F]) =
Monitoring.monitoringInterpreter[F](tracker, sentry, reporters, cli.config.monitoring.webhook, httpClient, periodicMetrics)
Monitoring.monitoringInterpreter[F](
tracker,
sentry,
reporters,
cli.config.monitoring.webhook,
httpClient,
periodicMetrics,
cli.config.storage.reportRecoveryTableMetrics
)
implicit0(secretStore: SecretStore[F]) = cloudServices.secretStore
implicit0(dispatcher: Dispatcher[F]) <- Dispatcher.parallel[F]
transaction <- Transaction.interpreter[F](cli.config.storage, cli.config.timeouts, cli.config.readyCheck)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
import com.snowplowanalytics.snowplow.rdbloader.config.Config
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Metrics.PeriodicMetrics
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.{Metrics, Reporter}
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.LoadSuccess
import org.http4s.FormDataDecoder.formEntityDecoder

trait Monitoring[F[_]] { self =>
Expand Down Expand Up @@ -133,9 +134,9 @@ object Monitoring {
attempts: Int,
start: Instant,
ingestion: Instant,
recoveryTableNames: List[String]
loadResult: LoadSuccess
): SuccessPayload = {
val tableNames = if (recoveryTableNames.isEmpty) None else recoveryTableNames.some
val tableNames = if (loadResult.recoveryTableNames.isEmpty) None else loadResult.recoveryTableNames.some
SuccessPayload(shredding, Application, attempts, start, ingestion, tableNames, Map.empty)
}
}
Expand All @@ -146,7 +147,8 @@ object Monitoring {
reporters: List[Reporter[F]],
webhookConfig: Option[Config.Webhook],
httpClient: Client[F],
pm: PeriodicMetrics[F]
pm: PeriodicMetrics[F],
reportRecoveryTableMetrics: Boolean
)(implicit E: EntityDecoder[F, String]
): Monitoring[F] =
new Monitoring[F] {
Expand Down Expand Up @@ -177,7 +179,7 @@ object Monitoring {
sentryClient.fold(Sync[F].unit)(s => Sync[F].delay(s.sendException(e)))

def reportMetrics(metrics: Metrics.KVMetrics): F[Unit] =
reporters.traverse_(r => r.report(metrics.toList))
reporters.traverse_(r => r.report(metrics.toList(reportRecoveryTableMetrics)))

def success(payload: SuccessPayload): F[Unit] = {
val webhookRequest = viaWebhook[SuccessPayload](payload, (p, c) => p.copy(tags = p.tags ++ c.tags)) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import cats.implicits._
import cats.effect.{Async, Clock, Sync}
import cats.effect.kernel.Ref
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.LoadSuccess

object Metrics {

Expand Down Expand Up @@ -71,6 +72,12 @@ object Metrics {
val metricType = MetricType.Gauge
}

final case class RecoveryTablesLoaded(v: Int) extends KVMetric {
val key = "recovery_tables_loaded"
val value = v.toString
val metricType = MetricType.Count
}

final case class DestinationHealthy(value: String) extends KVMetric {
val key = "destination_healthy"
val metricType = MetricType.Gauge
Expand Down Expand Up @@ -113,7 +120,7 @@ object Metrics {
for {
_ <- Stream.fixedDelay[F](period)
snapshot <- Stream.eval(Metrics.PeriodicMetricsRefs.snapshot(refs))
_ <- Stream.eval(reporters.traverse_(r => r.report(snapshot.toList)))
_ <- Stream.eval(reporters.traverse_(r => r.report(snapshot.toList(false))))
} yield ()

def setMaxTstampOfLoadedData(tstamp: Instant): F[Unit] =
Expand Down Expand Up @@ -149,9 +156,25 @@ object Metrics {
}

sealed trait KVMetrics {
def toList: List[KVMetric] = this match {
case KVMetrics.LoadingCompleted(countGood, countBad, minTstamp, maxTstamp, shredderStart, shredderEnd) =>
List(Some(countGood), Some(countBad), minTstamp, maxTstamp, Some(shredderStart), Some(shredderEnd)).unite
def toList(reportRecoveryTableMetrics: Boolean): List[KVMetric] = this match {
case KVMetrics.LoadingCompleted(
countGood,
countBad,
minTstamp,
maxTstamp,
shredderStart,
shredderEnd,
recoveryTablesLoaded
) =>
List(
Some(countGood),
Some(countBad),
minTstamp,
maxTstamp,
Some(shredderStart),
Some(shredderEnd),
if (reportRecoveryTableMetrics) Some(recoveryTablesLoaded) else None
).unite
case KVMetrics.PeriodicMetricsSnapshot(minAgeOfLoadedData) =>
List(minAgeOfLoadedData)
case KVMetrics.HealthCheck(healthy) =>
Expand All @@ -167,7 +190,8 @@ object Metrics {
collectorLatencyMin: Option[KVMetric.CollectorLatencyMin],
collectorLatencyMax: Option[KVMetric.CollectorLatencyMax],
shredderStartLatency: KVMetric.ShredderLatencyStart,
shredderEndLatency: KVMetric.ShredderLatencyEnd
shredderEndLatency: KVMetric.ShredderLatencyEnd,
recoveryTablesLoaded: KVMetric.RecoveryTablesLoaded
) extends KVMetrics

final case class PeriodicMetricsSnapshot(
Expand All @@ -178,13 +202,23 @@ object Metrics {

implicit val kvMetricsShow: Show[KVMetrics] =
Show.show {
case LoadingCompleted(countGood, countBad, minTstamp, maxTstamp, shredderStart, shredderEnd) =>
case LoadingCompleted(
countGood,
countBad,
minTstamp,
maxTstamp,
shredderStart,
shredderEnd,
recoveryTablesLoaded
) =>
s"""${countGood.value} good events were loaded.
| ${countBad.value} bad events were in this batch.
| It took minimum ${minTstamp.map(_.value).getOrElse("unknown")} seconds and maximum
| ${maxTstamp.map(_.value).getOrElse("unknown")} seconds between the collector and warehouse for these events.
| It took ${shredderStart.value} seconds between the start of transformer and warehouse
| and ${shredderEnd.value} seconds between the completion of transformer and warehouse""".stripMargin.replaceAll("\n", " ")
| and ${shredderEnd.value} seconds between the completion of transformer and warehouse.
| ${recoveryTablesLoaded.value} recovery tables wered loaded with data.""".stripMargin
.replaceAll("\n", " ")
case PeriodicMetricsSnapshot(minAgeOfLoadedData) =>
s"Minimum age of loaded data in seconds: ${minAgeOfLoadedData.value}"
case HealthCheck(destinationHealthy) =>
Expand All @@ -193,15 +227,19 @@ object Metrics {
}
}

def getCompletedMetrics[F[_]: Clock: Functor](loaded: LoaderMessage.ShreddingComplete): F[KVMetrics.LoadingCompleted] =
def getCompletedMetrics[F[_]: Clock: Functor](
shreddingComplete: LoaderMessage.ShreddingComplete,
loadResult: LoadSuccess
): F[KVMetrics.LoadingCompleted] =
Clock[F].realTimeInstant.map { now =>
KVMetrics.LoadingCompleted(
KVMetric.CountGood(loaded.count.map(_.good).getOrElse(0)),
KVMetric.CountBad(loaded.count.flatMap(_.bad).getOrElse(0)),
loaded.timestamps.max.map(max => Duration.between(max, now).toSeconds()).map(l => KVMetric.CollectorLatencyMin(l)),
loaded.timestamps.min.map(min => Duration.between(min, now).toSeconds()).map(l => KVMetric.CollectorLatencyMax(l)),
KVMetric.ShredderLatencyStart(Duration.between(loaded.timestamps.jobStarted, now).toSeconds()),
KVMetric.ShredderLatencyEnd(Duration.between(loaded.timestamps.jobCompleted, now).toSeconds())
KVMetric.CountGood(shreddingComplete.count.map(_.good).getOrElse(0)),
KVMetric.CountBad(shreddingComplete.count.flatMap(_.bad).getOrElse(0)),
shreddingComplete.timestamps.max.map(max => Duration.between(max, now).toSeconds()).map(l => KVMetric.CollectorLatencyMin(l)),
shreddingComplete.timestamps.min.map(min => Duration.between(min, now).toSeconds()).map(l => KVMetric.CollectorLatencyMax(l)),
KVMetric.ShredderLatencyStart(Duration.between(shreddingComplete.timestamps.jobStarted, now).toSeconds()),
KVMetric.ShredderLatencyEnd(Duration.between(shreddingComplete.timestamps.jobCompleted, now).toSeconds()),
KVMetric.RecoveryTablesLoaded(loadResult.recoveryTableNames.length)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object Load {

sealed trait LoadResult

case class LoadSuccess(ingestionTimestamp: Option[Instant], recoveryTableNames: List[String]) extends LoadResult
case class LoadSuccess(recoveryTableNames: List[String]) extends LoadResult

case class FolderAlreadyLoaded(folder: BlobStorage.Folder) extends LoadResult {
def toAlert: Alert =
Expand Down Expand Up @@ -146,11 +146,10 @@ object Load {
inTransactionMigrations *>
run[F, I](setLoading, discovery.discovery, initQueryResult, target, disableMigration).flatMap {
loadedRecoveryTableNames =>
setStage(Stage.Committing) *>
Manifest.add[F](discovery.origin.toManifestItem) *>
Manifest
.get[F](discovery.discovery.base)
.map(opt => LoadSuccess(opt.map(_.ingestion), loadedRecoveryTableNames))
for {
_ <- setStage(Stage.Committing)
_ <- Manifest.add[F](discovery.origin.toManifestItem)
} yield LoadSuccess(loadedRecoveryTableNames)
}
}
} yield result
Expand Down Expand Up @@ -228,17 +227,17 @@ object Load {
def congratulate[F[_]: Clock: Monad: Logging: Monitoring](
attempts: Int,
started: Instant,
ingestion: Instant,
loaded: LoaderMessage.ShreddingComplete,
recoveryTableNames: List[String]
loadResult: LoadSuccess,
shreddingComplete: LoaderMessage.ShreddingComplete
): F[Unit] = {
val attemptsSuffix = if (attempts > 0) s" after ${attempts} attempts" else ""
for {
_ <- Logging[F].info(s"Folder ${loaded.base} loaded successfully$attemptsSuffix")
success = Monitoring.SuccessPayload.build(loaded, attempts, started, ingestion, recoveryTableNames)
_ <- Monitoring[F].success(success)
metrics <- Metrics.getCompletedMetrics[F](loaded)
_ <- loaded.timestamps.max.map(t => Monitoring[F].periodicMetrics.setMaxTstampOfLoadedData(t)).sequence.void
now <- Clock[F].realTimeInstant
_ <- Logging[F].info(s"Folder ${shreddingComplete.base} loaded successfully$attemptsSuffix")
monitoringPayload = Monitoring.SuccessPayload.build(shreddingComplete, attempts, started, now, loadResult)
_ <- Monitoring[F].success(monitoringPayload)
metrics <- Metrics.getCompletedMetrics[F](shreddingComplete, loadResult)
_ <- shreddingComplete.timestamps.max.map(t => Monitoring[F].periodicMetrics.setMaxTstampOfLoadedData(t)).sequence.void
_ <- Monitoring[F].reportMetrics(metrics)
_ <- Logging[F].info((metrics: Metrics.KVMetrics).show)
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.{Semver, Transform
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage._
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Metrics.{KVMetric, PeriodicMetrics}
import com.snowplowanalytics.snowplow.rdbloader.loading.Load.LoadSuccess

import java.util.concurrent.TimeUnit
import cats.effect.unsafe.implicits.global
Expand All @@ -47,7 +48,7 @@ class MetricsSpec extends Specification {
val shredderStartLatency = 50L
val shredderEndLatency = 10L

val loaded = ShreddingComplete(
val shreddingComplete = ShreddingComplete(
BlobStorage.Folder.coerce("s3://shredded/run_id/"),
TypesInfo.Shredded(Nil),
Timestamps(
Expand All @@ -61,16 +62,24 @@ class MetricsSpec extends Specification {
Some(Count(countGood, Some(countBad)))
)

val loadResult = LoadSuccess(
List(
"contexts_com_snowplowanalytics_snowplow_test_schema_broken_1_recovered_1_1_1_737559706",
"contexts_com_snowplowanalytics_snowplow_test_schema_broken_1_recovered_1_0_1_1837344102"
)
)

val expected = Metrics.KVMetrics.LoadingCompleted(
KVMetric.CountGood(countGood),
KVMetric.CountBad(countBad),
Some(KVMetric.CollectorLatencyMin(collectorLatencyMin)),
Some(KVMetric.CollectorLatencyMax(collectorLatencyMax)),
KVMetric.ShredderLatencyStart(shredderStartLatency),
KVMetric.ShredderLatencyEnd(shredderEndLatency)
KVMetric.ShredderLatencyEnd(shredderEndLatency),
KVMetric.RecoveryTablesLoaded(2)
)

val actual = Metrics.getCompletedMetrics[Id](loaded)
val actual = Metrics.getCompletedMetrics[Id](shreddingComplete, loadResult)

actual === expected
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class LoadSpec extends Specification {
),
LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName, false)),
LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)),
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
PureTransaction.CommitMessage
)

Expand Down Expand Up @@ -231,7 +230,6 @@ class LoadSpec extends Specification {
),
LogEntry.Sql(Statement.ShreddedCopy(info, Compression.Gzip, LoadAuthMethod.NoCreds, model, model.tableName, false)),
LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)),
LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)),
PureTransaction.CommitMessage
)
val result = Load
Expand Down

0 comments on commit f1c6ae8

Please sign in to comment.