Skip to content

Commit

Permalink
Add incomplete events (close #)
Browse files Browse the repository at this point in the history
Before this change, any error in the enriching workflow would short
circuit and a bad row would be emitted. After this change, if incomplete events
are enabled, the enriching goes to the end with what is possible,
accumulating errors as it goes. Errors get attached in derived_contexts.

There are now 3 main steps :

- Mapping and validating the input. This includes mapping fields of
  payload_data to the atomic event (e.g. tr_tt to tr_total while converting
  from string to number) and validating the contexts and unstruct event.
  Everything that goes wrong gets wrapped up in a SchemaViolations bad row.

- Running the enrichments. Everything that goes wrong gets wrapped up in an
  EnrichmentFailures bad row.

- Validating the output. This includes validating the enrichments contexts
  and the atomic fields lengths. Everything that goes wrong gets wrapped up
  in an EnrichmentFailures bad row.
  • Loading branch information
benjben committed Feb 16, 2024
1 parent 4b26619 commit b404c0a
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.util.Base64

import org.joda.time.DateTime

import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.{Ior, NonEmptyList, ValidatedNel}
import cats.{Monad, Parallel}
import cats.implicits._

Expand Down Expand Up @@ -170,7 +170,7 @@ object Enrich {
case None =>
Sync[F].unit
}
} yield (List(badRow.invalid), collectorTstamp)
} yield (List(Ior.left(badRow)), collectorTstamp)

/** Build a `generic_error` bad row for unhandled runtime errors */
def genericBadRow(
Expand All @@ -189,11 +189,19 @@ object Enrich {
chunk: List[Result],
env: Environment[F, A]
): F[Unit] = {
val (bad, enriched) =
//val (bad, enriched, incomplete) =
val (bad, enriched, _) =
chunk
.flatMap(_._1)
.map(_.toEither)
.separate
.foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) {
case (previous, item) =>
val (bad, enriched, incomplete) = previous
item match {
case Ior.Right(e) => (bad, e :: enriched, incomplete)
case Ior.Left(br) => (br :: bad, enriched, incomplete)
case Ior.Both(br, i) => (br :: bad, enriched, i :: incomplete)
}
}

val (moreBad, good) = enriched.map { e =>
serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common

import cats.data.{EitherT, Validated, ValidatedNel}
import cats.data.{EitherT, Ior, ValidatedNel}

import com.snowplowanalytics.snowplow.badrows.BadRow

Expand All @@ -25,8 +25,8 @@ package object fs2 {
type ByteSink[F[_]] = List[Array[Byte]] => F[Unit]
type AttributedByteSink[F[_]] = List[AttributedData[Array[Byte]]] => F[Unit]

/** Enrichment result, containing list of (valid and invalid) results as well as the collector timestamp */
type Result = (List[Validated[BadRow, EnrichedEvent]], Option[Long])
type Enriched = Ior[BadRow, EnrichedEvent]
type Result = (List[Enriched], Option[Long])

/** Function to transform an origin raw payload into good and/or bad rows */
type Enrich[F[_]] = Array[Byte] => F[Result]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
package com.snowplowanalytics.snowplow.enrich.common

import cats.Monad
import cats.data.{Validated, ValidatedNel}
import cats.data.{Ior, Validated, ValidatedNel}
import cats.effect.Clock
import cats.implicits._

Expand Down Expand Up @@ -41,10 +41,6 @@ object EtlPipeline {
case class FeatureFlags(acceptInvalid: Boolean, legacyEnrichmentOrder: Boolean)

/**
* A helper method to take a ValidatedMaybeCanonicalInput and transform it into a List (possibly
* empty) of ValidatedCanonicalOutputs.
* We have to do some unboxing because enrichEvent expects a raw CanonicalInput as its argument,
* not a MaybeCanonicalInput.
* @param adapterRegistry Contains all of the events adapters
* @param enrichmentRegistry Contains configuration for all enrichments to apply
* @param client Our Iglu client, for schema lookups and validation
Expand All @@ -53,8 +49,6 @@ object EtlPipeline {
* @param input The ValidatedMaybeCanonicalInput
* @param featureFlags The feature flags available in the current version of Enrich
* @param invalidCount Function to increment the count of invalid events
* @return the ValidatedMaybeCanonicalOutput. Thanks to flatMap, will include any validation
* errors contained within the ValidatedMaybeCanonicalInput
*/
def processEvents[F[_]: Clock: Monad](
adapterRegistry: AdapterRegistry[F],
Expand All @@ -67,7 +61,7 @@ object EtlPipeline {
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
): F[List[Validated[BadRow, EnrichedEvent]]] =
): F[List[Ior[BadRow, EnrichedEvent]]] =
input match {
case Validated.Valid(Some(payload)) =>
adapterRegistry
Expand All @@ -87,14 +81,14 @@ object EtlPipeline {
registryLookup,
atomicFields
)
.toValidated
.value
}
case Validated.Invalid(badRow) =>
Monad[F].pure(List(badRow.invalid[EnrichedEvent]))
Monad[F].pure(List(Ior.left(badRow)))
}
case Validated.Invalid(badRows) =>
Monad[F].pure(badRows.map(_.invalid[EnrichedEvent])).map(_.toList)
Monad[F].pure(badRows.toList.map(br => Ior.left(br)))
case Validated.Valid(None) =>
Monad[F].pure(List.empty[Validated[BadRow, EnrichedEvent]])
Monad[F].pure(Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ import org.slf4j.LoggerFactory

import cats.Monad
import cats.data.Validated.{Invalid, Valid}
import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.{Ior, IorT, NonEmptyList, ValidatedNel}

import cats.implicits._

import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure
import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor}
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

Expand All @@ -35,21 +33,24 @@ object AtomicFieldsLengthValidator {

def validate[F[_]: Monad](
event: EnrichedEvent,
rawEvent: RawEvent,
processor: Processor,
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): F[Either[BadRow, Unit]] =
atomicFields.value
.map(validateField(event))
.combineAll match {
case Invalid(errors) if acceptInvalid =>
handleAcceptableBadRow(invalidCount, event, errors) *> Monad[F].pure(Right(()))
case Invalid(errors) =>
Monad[F].pure(buildBadRow(event, rawEvent, processor, errors).asLeft)
case Valid(()) =>
Monad[F].pure(Right(()))
): IorT[F, NonEmptyList[FailureDetails.EnrichmentFailure], Unit] =
IorT {
atomicFields.value
.map(validateField(event))
.combineAll match {
case Invalid(errors) if acceptInvalid =>
handleAcceptableBadRow(invalidCount, event, errors) *> Monad[F].pure(Ior.Right(()))
case Invalid(errors) =>
val allErrors = errors
.prepend("Enriched event does not conform to atomic schema field's length restrictions")
.map(asEnrichmentFailure)
Monad[F].pure(Ior.Both(allErrors, ()))
case Valid(()) =>
Monad[F].pure(Ior.Right(()))
}
}

private def validateField(
Expand All @@ -64,22 +65,6 @@ object AtomicFieldsLengthValidator {
Valid(())
}

private def buildBadRow(
event: EnrichedEvent,
rawEvent: RawEvent,
processor: Processor,
errors: NonEmptyList[String]
): BadRow.EnrichmentFailures =
EnrichmentManager.buildEnrichmentFailuresBadRow(
NonEmptyList(
asEnrichmentFailure("Enriched event does not conform to atomic schema field's length restrictions"),
errors.toList.map(asEnrichmentFailure)
),
EnrichedEvent.toPartiallyEnrichedEvent(event),
RawEvent.toRawEvent(rawEvent),
processor
)

private def handleAcceptableBadRow[F[_]: Monad](
invalidCount: F[Unit],
event: EnrichedEvent,
Expand All @@ -92,8 +77,8 @@ object AtomicFieldsLengthValidator {
)
)

private def asEnrichmentFailure(errorMessage: String): EnrichmentFailure =
EnrichmentFailure(
private def asEnrichmentFailure(errorMessage: String): FailureDetails.EnrichmentFailure =
FailureDetails.EnrichmentFailure(
enrichment = None,
FailureDetails.EnrichmentFailureMessage.Simple(errorMessage)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object ClientEnrichments {
* @param res The packed string holding the screen dimensions
* @return the ResolutionTuple or an error message, boxed in a Scalaz Validation
*/
val extractViewDimensions: (String, String) => Either[FailureDetails.EnrichmentFailure, (JInteger, JInteger)] =
val extractViewDimensions: (String, String) => Either[FailureDetails.SchemaViolation, (JInteger, JInteger)] =
(field, res) =>
(res match {
case ResRegex(width, height) =>
Expand All @@ -45,12 +45,8 @@ object ClientEnrichments {
.leftMap(_ => "could not be converted to java.lang.Integer s")
case _ => s"does not conform to regex ${ResRegex.toString}".asLeft
}).leftMap { msg =>
val f = FailureDetails.EnrichmentFailureMessage.InputData(
field,
Option(res),
msg
)
FailureDetails.EnrichmentFailure(None, f)
FailureDetails.SchemaViolation
.NotJson(field, Option(res), msg)
}

}
Loading

0 comments on commit b404c0a

Please sign in to comment.