From eb84c16c6423a7365c6f2d7bdf0c191caecdfa4b Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 8 Feb 2024 21:17:11 +0100 Subject: [PATCH] Add incomplete events (close #) --- .../snowplow/enrich/common/fs2/Enrich.scala | 16 +- .../snowplow/enrich/common/fs2/package.scala | 6 +- .../common/EtlPipeline.scala | 19 +- .../AtomicFieldsLengthValidator.scala | 2 +- .../enrichments/EnrichmentManager.scala | 205 ++++++++++++------ .../common/utils/IgluUtils.scala | 177 +++++++-------- 6 files changed, 247 insertions(+), 178 deletions(-) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala index 89753726b..a0771c4c2 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala @@ -170,7 +170,7 @@ object Enrich { case None => Sync[F].unit } - } yield (List(badRow.invalid), collectorTstamp) + } yield (List(Left((badRow, None))), collectorTstamp) /** Build a `generic_error` bad row for unhandled runtime errors */ def genericBadRow( @@ -189,11 +189,19 @@ object Enrich { chunk: List[Result], env: Environment[F, A] ): F[Unit] = { - val (bad, enriched) = + //val (bad, enriched, incomplete) = + val (bad, enriched, _) = chunk .flatMap(_._1) - .map(_.toEither) - .separate + .foldLeft((List.empty[BadRow], List.empty[EnrichedEvent], List.empty[EnrichedEvent])) { + case (previous, item) => + val (bad, enriched, incomplete) = previous + item match { + case Right(e) => (bad, e :: enriched, incomplete) + case Left((br, Some(i))) => (br :: bad, enriched, i :: incomplete) + case Left((br, _)) => (br :: bad, enriched, incomplete) + } + } val (moreBad, good) = enriched.map { e => serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala index a6a0a2337..5bf77e30b 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/package.scala @@ -10,7 +10,7 @@ */ package com.snowplowanalytics.snowplow.enrich.common -import cats.data.{EitherT, Validated, ValidatedNel} +import cats.data.{EitherT, ValidatedNel} import com.snowplowanalytics.snowplow.badrows.BadRow @@ -25,8 +25,8 @@ package object fs2 { type ByteSink[F[_]] = List[Array[Byte]] => F[Unit] type AttributedByteSink[F[_]] = List[AttributedData[Array[Byte]]] => F[Unit] - /** Enrichment result, containing list of (valid and invalid) results as well as the collector timestamp */ - type Result = (List[Validated[BadRow, EnrichedEvent]], Option[Long]) + type Enriched = Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent] + type Result = (List[Enriched], Option[Long]) /** Function to transform an origin raw payload into good and/or bad rows */ type Enrich[F[_]] = Array[Byte] => F[Result] diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala index 7bb4b4794..51c1ded5b 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/EtlPipeline.scala @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common import cats.Monad import cats.data.{Validated, ValidatedNel} -import cats.effect.Clock +import cats.effect.kernel.Sync import cats.implicits._ import org.joda.time.DateTime @@ -41,10 +41,6 @@ object EtlPipeline { case class FeatureFlags(acceptInvalid: Boolean, legacyEnrichmentOrder: Boolean) /** - * A helper method to take a ValidatedMaybeCanonicalInput and transform it into a List (possibly - * empty) of ValidatedCanonicalOutputs. - * We have to do some unboxing because enrichEvent expects a raw CanonicalInput as its argument, - * not a MaybeCanonicalInput. * @param adapterRegistry Contains all of the events adapters * @param enrichmentRegistry Contains configuration for all enrichments to apply * @param client Our Iglu client, for schema lookups and validation @@ -53,10 +49,8 @@ object EtlPipeline { * @param input The ValidatedMaybeCanonicalInput * @param featureFlags The feature flags available in the current version of Enrich * @param invalidCount Function to increment the count of invalid events - * @return the ValidatedMaybeCanonicalOutput. Thanks to flatMap, will include any validation - * errors contained within the ValidatedMaybeCanonicalInput */ - def processEvents[F[_]: Clock: Monad]( + def processEvents[F[_]: Sync]( adapterRegistry: AdapterRegistry[F], enrichmentRegistry: EnrichmentRegistry[F], client: IgluCirceClient[F], @@ -67,7 +61,7 @@ object EtlPipeline { invalidCount: F[Unit], registryLookup: RegistryLookup[F], atomicFields: AtomicFields - ): F[List[Validated[BadRow, EnrichedEvent]]] = + ): F[List[(Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent])]] = input match { case Validated.Valid(Some(payload)) => adapterRegistry @@ -87,14 +81,13 @@ object EtlPipeline { registryLookup, atomicFields ) - .toValidated } case Validated.Invalid(badRow) => - Monad[F].pure(List(badRow.invalid[EnrichedEvent])) + Monad[F].pure(List((Left((badRow, None))))) } case Validated.Invalid(badRows) => - Monad[F].pure(badRows.map(_.invalid[EnrichedEvent])).map(_.toList) + Monad[F].pure(badRows.toList.map(br => (Left((br, None))))) case Validated.Valid(None) => - Monad[F].pure(List.empty[Validated[BadRow, EnrichedEvent]]) + Monad[F].pure(Nil) } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala index 5f2f98582..f1e43c8b0 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala @@ -40,7 +40,7 @@ object AtomicFieldsLengthValidator { acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): F[Either[BadRow, Unit]] = + ): F[Either[BadRow.EnrichmentFailures, Unit]] = atomicFields.value .map(validateField(event)) .combineAll match { diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 1d238fdc2..cdbec9c8d 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -16,8 +16,8 @@ import java.time.Instant import org.joda.time.DateTime import io.circe.Json import cats.{Applicative, Monad} -import cats.data.{EitherT, NonEmptyList, OptionT, StateT} -import cats.effect.Clock +import cats.data.{EitherT, NonEmptyList, OptionT, StateT, Validated} +import cats.effect.kernel.Sync import cats.implicits._ import com.snowplowanalytics.refererparser._ @@ -54,9 +54,8 @@ object EnrichmentManager { * @param raw Canonical input event to enrich * @param featureFlags The feature flags available in the current version of Enrich * @param invalidCount Function to increment the count of invalid events - * @return Enriched event or bad row if a problem occured */ - def enrichEvent[F[_]: Monad: Clock]( + def enrichEvent[F[_]: Sync]( registry: EnrichmentRegistry[F], client: IgluCirceClient[F], processor: Processor, @@ -65,44 +64,103 @@ object EnrichmentManager { featureFlags: EtlPipeline.FeatureFlags, invalidCount: F[Unit], registryLookup: RegistryLookup[F], - atomicFields: AtomicFields - ): EitherT[F, BadRow, EnrichedEvent] = + atomicFields: AtomicFields, + emitIncomplete: Boolean = true + ): F[Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent]] = for { - enriched <- EitherT.fromEither[F](setupEnrichedEvent(raw, etlTstamp, processor)) - extractResult <- IgluUtils.extractAndValidateInputJsons(enriched, client, raw, processor, registryLookup) - _ = { - ME.formatUnstructEvent(extractResult.unstructEvent).foreach(e => enriched.unstruct_event = e) - ME.formatContexts(extractResult.contexts).foreach(c => enriched.contexts = c) - } - enrichmentsContexts <- runEnrichments( - registry, - processor, - raw, - enriched, - extractResult.contexts, - extractResult.unstructEvent, - featureFlags.legacyEnrichmentOrder - ) - _ = ME.formatContexts(enrichmentsContexts ::: extractResult.validationInfoContexts).foreach(c => enriched.derived_contexts = c) - _ <- IgluUtils - .validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched, registryLookup) - _ <- EitherT.rightT[F, BadRow]( - anonIp(enriched, registry.anonIp).foreach(enriched.user_ipaddress = _) - ) - _ <- EitherT.rightT[F, BadRow] { - piiTransform(enriched, registry.piiPseudonymizer).foreach { pii => - enriched.pii = pii.asString - } - } - _ <- validateEnriched(enriched, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields) - } yield enriched + validatedInput <- mapAndValidateInput(raw, etlTstamp, processor, client, registryLookup) + (schemaViolations, enrichedEvent, extractResult) = validatedInput + enriched <- if (schemaViolations.isEmpty || emitIncomplete) + enrich( + enrichedEvent, + registry, + client, + processor, + raw, + extractResult.contexts, + extractResult.unstructEvent, + featureFlags.legacyEnrichmentOrder, + registryLookup + ) + else + Sync[F].pure((None, Nil)) + (enrichFailures, enrichmentsContexts) = enriched + validationFailures <- if ((schemaViolations.isEmpty && enrichFailures.isEmpty) || emitIncomplete) + validateEnriched(enrichedEvent, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields) + else + Sync[F].pure(None) + badRows = List(schemaViolations, enrichFailures, validationFailures).flatten + output = badRows match { + case Nil => + Right(enrichedEvent) + case head :: _ => + if (!emitIncomplete) + Left((head, None)) + else { + val failuresContext = createFailuresContext(badRows) + ME.formatContexts(failuresContext :: enrichmentsContexts ::: extractResult.validationInfoContexts) + .foreach(c => enrichedEvent.derived_contexts = c) + Left((head, Some(enrichedEvent))) + } + } + } yield output + + // TODO: aggregate all the errors inside same SchemaViolations + def mapAndValidateInput[F[_]: Sync]( + raw: RawEvent, + etlTstamp: DateTime, + processor: Processor, + client: IgluCirceClient[F], + registryLookup: RegistryLookup[F] + ): F[(Option[BadRow], EnrichedEvent, IgluUtils.EventExtractResult)] = + for { + mapped <- Sync[F].delay(setupEnrichedEvent(raw, etlTstamp, processor)) + (enrichmentFailures, enrichedEvent) = mapped + validated <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, raw, processor, registryLookup) + (schemaViolations, sdjs) = validated + maybeBadRow = aggregateBadRows(List(enrichmentFailures, schemaViolations)) + } yield (maybeBadRow, enrichedEvent, sdjs) + + private def aggregateBadRows(badRows: List[Option[BadRow]]): Option[BadRow] = + badRows.flatten.headOption /** - * Run all the enrichments and aggregate the errors if any + * @return Valid enrichments contexts and bad row if one or several contexts are invalid + * and/or if there was an error with at least one enrichment. + */ + def enrich[F[_]: Sync]( + enrichedEvent: EnrichedEvent, + registry: EnrichmentRegistry[F], + client: IgluCirceClient[F], + processor: Processor, + raw: RawEvent, + inputContexts: List[SelfDescribingData[Json]], + unstructEvent: Option[SelfDescribingData[Json]], + legacyOrder: Boolean, + registryLookup: RegistryLookup[F] + ): F[(Option[BadRow.EnrichmentFailures], List[SelfDescribingData[Json]])] = + for { + enriched <- runEnrichments( + registry, + processor, + raw, + enrichedEvent, + inputContexts, + unstructEvent, + legacyOrder + ) + (enrichmentFailures, derivedContexts) = enriched + validated <- IgluUtils.validateEnrichmentsContexts[F](client, derivedContexts, raw, processor, enrichedEvent, registryLookup) + (moreFailures, validContexts) = validated + _ <- Sync[F].delay(anonIp(enrichedEvent, registry.anonIp).foreach(enrichedEvent.user_ipaddress = _)) + _ <- Sync[F].delay(piiTransform(enrichedEvent, registry.piiPseudonymizer).foreach(pii => enrichedEvent.pii = pii.asString)) + } yield (List(enrichmentFailures, moreFailures).flatten.headOption, validContexts) + + // TODO return List[FailureDetails.EnrichmentFailure] rather than BadRow + /** + * Run all the enrichments. * @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\ - * @return List of contexts to attach to the enriched event if all the enrichments went well - * or [[BadRow.EnrichmentFailures]] if something wrong happened - * with at least one enrichment + * @return Enrichments contexts and bad row if there was an error with at least one enrichment. */ private def runEnrichments[F[_]: Monad]( registry: EnrichmentRegistry[F], @@ -112,25 +170,24 @@ object EnrichmentManager { inputContexts: List[SelfDescribingData[Json]], unstructEvent: Option[SelfDescribingData[Json]], legacyOrder: Boolean - ): EitherT[F, BadRow.EnrichmentFailures, List[SelfDescribingData[Json]]] = - EitherT { - accState(registry, raw, inputContexts, unstructEvent, legacyOrder) - .runS(Accumulation(enriched, Nil, Nil)) - .map { - case Accumulation(_, failures, contexts) => - failures.toNel match { - case Some(nel) => - buildEnrichmentFailuresBadRow( - nel, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ).asLeft - case None => - contexts.asRight - } - } - } + ): F[(Option[BadRow.EnrichmentFailures], List[SelfDescribingData[Json]])] = + accState(registry, raw, inputContexts, unstructEvent, legacyOrder) + .runS(Accumulation(enriched, Nil, Nil)) + .map { + case Accumulation(_, failures, contexts) => + failures.toNel match { + case Some(nel) => + val badRow = buildEnrichmentFailuresBadRow( + nel, + EnrichedEvent.toPartiallyEnrichedEvent(enriched), + RawEvent.toRawEvent(raw), + processor + ) + (Some(badRow), contexts) + case None => + (None, contexts) + } + } private[enrichments] case class Accumulation( event: EnrichedEvent, @@ -249,11 +306,12 @@ object EnrichmentManager { } /** Create the mutable [[EnrichedEvent]] and initialize it. */ + // TODO create SchemaViolations instead of EnrichmentsFailures private def setupEnrichedEvent( raw: RawEvent, etlTstamp: DateTime, processor: Processor - ): Either[BadRow.EnrichmentFailures, EnrichedEvent] = { + ): (Option[BadRow.EnrichmentFailures], EnrichedEvent) = { val e = new EnrichedEvent() e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter @@ -271,17 +329,18 @@ object EnrichmentManager { // Map/validate/transform input fields to enriched event fields val transformed = Transform.transform(raw, e) - (collectorTstamp |+| transformed) - .leftMap { enrichmentFailures => - EnrichmentManager.buildEnrichmentFailuresBadRow( + (collectorTstamp |+| transformed) match { + case Validated.Invalid(enrichmentFailures) => + val badRow = EnrichmentManager.buildEnrichmentFailuresBadRow( enrichmentFailures, EnrichedEvent.toPartiallyEnrichedEvent(e), RawEvent.toRawEvent(raw), processor ) - } - .as(e) - .toEither + (Some(badRow), e) + case _ => + (None, e) + } } def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[FailureDetails.EnrichmentFailure, Unit] = @@ -773,9 +832,17 @@ object EnrichmentManager { acceptInvalid: Boolean, invalidCount: F[Unit], atomicFields: AtomicFields - ): EitherT[F, BadRow, Unit] = - EitherT { - //We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608 - AtomicFieldsLengthValidator.validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields) - } + ): F[Option[BadRow.EnrichmentFailures]] = + // We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608 + AtomicFieldsLengthValidator + .validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields) + .map { + case Left(br) => Some(br) + case _ => None + } + + // TODO + private def createFailuresContext( + badRows: List[BadRow] + ): SelfDescribingData[Json] = ??? } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala index d18936818..484d5f7e4 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/IgluUtils.scala @@ -11,7 +11,7 @@ package com.snowplowanalytics.snowplow.enrich.common.utils import cats.Monad -import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel} +import cats.data.{EitherT, NonEmptyList} import cats.effect.Clock import cats.implicits._ @@ -48,10 +48,8 @@ object IgluUtils { * @param client Iglu client used to validate the SDJs * @param raw Raw input event, used only to put in the bad row in case of problem * @param processor Meta data to put in the bad row - * @return Extracted unstructured event and input contexts if any and if everything valid, - * `BadRow.SchemaViolations` if something went wrong. For instance if the - * unstructured event is invalid and has a context that is invalid, - * the bad row will contain the 2 associated `FailureDetails.SchemaViolation`s + * @return List of valid contexts and unstructured event if any. + * If there were any error it's in the bad row. */ def extractAndValidateInputJsons[F[_]: Monad: Clock]( enriched: EnrichedEvent, @@ -59,33 +57,36 @@ object IgluUtils { raw: RawEvent, processor: Processor, registryLookup: RegistryLookup[F] - ): EitherT[ - F, - BadRow.SchemaViolations, - EventExtractResult - ] = - EitherT { - for { - contexts <- IgluUtils.extractAndValidateInputContexts(enriched, client, registryLookup) - unstruct <- IgluUtils - .extractAndValidateUnstructEvent(enriched, client, registryLookup) - .map(_.toValidatedNel) - } yield (contexts, unstruct) - .mapN { (c, ue) => - val validationInfoContexts = (c.flatMap(_.validationInfo) ::: ue.flatMap(_.validationInfo).toList).distinct - .map(_.toSdj) - EventExtractResult(contexts = c.map(_.sdj), unstructEvent = ue.map(_.sdj), validationInfoContexts = validationInfoContexts) - } - .leftMap { schemaViolations => - buildSchemaViolationsBadRow( - schemaViolations, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - } - .toEither - } + ): F[(Option[BadRow], EventExtractResult)] = + for { + contexts <- IgluUtils.extractAndValidateInputContexts(enriched, client, registryLookup) + (invalidContexts, validContexts) = contexts.separate + unstruct <- IgluUtils.extractAndValidateUnstructEvent(enriched, client, registryLookup) + unstructEvent = unstruct match { + case Right(u) => u + case _ => None + } + invalidUnstruct = unstruct match { + case Left(err) => List(err) + case _ => Nil + } + maybeBadRow = (invalidContexts ::: invalidUnstruct) match { + case Nil => None + case head :: tail => + buildSchemaViolationsBadRow( + NonEmptyList.of(head, tail: _*), + EnrichedEvent.toPartiallyEnrichedEvent(enriched), + RawEvent.toRawEvent(raw), + processor + ).some + } + validationInfo = (validContexts.flatMap(_.validationInfo) ::: unstructEvent.flatMap(_.validationInfo).toList).distinct.map(_.toSdj) + output = EventExtractResult( + contexts = validContexts.map(_.sdj), + unstructEvent = unstructEvent.map(_.sdj), + validationInfoContexts = validationInfo + ) + } yield (maybeBadRow, output) /** * Extract unstructured event from event and validate against its schema @@ -94,7 +95,8 @@ object IgluUtils { * @param field Name of the field containing the unstructured event, to put in the bad row * in case of failure * @param criterion Expected schema for the JSON containing the unstructured event - * @return Valid unstructured event if the input event has one + * @return Valid unstructured event if the input event has one. + * Schema violation if the unstruct event is invalid. */ private[common] def extractAndValidateUnstructEvent[F[_]: Monad: Clock]( enriched: EnrichedEvent, @@ -102,27 +104,27 @@ object IgluUtils { registryLookup: RegistryLookup[F], field: String = "ue_properties", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0) - ): F[Validated[FailureDetails.SchemaViolation, Option[SdjExtractResult]]] = - (Option(enriched.unstruct_event) match { + ): F[Either[FailureDetails.SchemaViolation, Option[SdjExtractResult]]] = + Option(enriched.unstruct_event) match { case Some(rawUnstructEvent) => - for { + (for { // Validate input Json string and extract unstructured event unstruct <- extractInputData(rawUnstructEvent, field, criterion, client, registryLookup) // Parse Json unstructured event as SelfDescribingData[Json] unstructSDJ <- parseAndValidateSDJ_sv(unstruct, client, registryLookup) - } yield unstructSDJ.some + } yield unstructSDJ.some).value case None => - EitherT.rightT[F, FailureDetails.SchemaViolation](none[SdjExtractResult]) - }).toValidated + Monad[F].pure(Right(none[SdjExtractResult])) + } /** - * Extract list of custom contexts from event and validate each against its schema + * Extract list of contexts from event and validate each against its schema * @param enriched Snowplow enriched event from which to extract custom contexts (in String) * @param client Iglu client used for SDJ validation * @param field Name of the field containing the contexts, to put in the bad row * in case of failure * @param criterion Expected schema for the JSON containing the contexts - * @return List will all contexts provided that they are all valid + * @return List of all valid contexts and schema violations. */ private[common] def extractAndValidateInputContexts[F[_]: Monad: Clock]( enriched: EnrichedEvent, @@ -130,27 +132,20 @@ object IgluUtils { registryLookup: RegistryLookup[F], field: String = "contexts", criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0) - ): F[ValidatedNel[FailureDetails.SchemaViolation, List[SdjExtractResult]]] = - (Option(enriched.contexts) match { + ): F[List[Either[FailureDetails.SchemaViolation, SdjExtractResult]]] = + Option(enriched.contexts) match { case Some(rawContexts) => - for { - // Validate input Json string and extract contexts - contexts <- extractInputData(rawContexts, field, criterion, client, registryLookup) - .map(_.asArray.get.toList) // .get OK because SDJ wrapping the contexts valid - .leftMap(NonEmptyList.one) - // Parse and validate each SDJ and merge the errors - contextsSDJ <- EitherT( - contexts - .map(parseAndValidateSDJ_sv(_, client, registryLookup).toValidatedNel) - .sequence - .map(_.sequence.toEither) - ) - } yield contextsSDJ + extractInputData(rawContexts, field, criterion, client, registryLookup).value.flatMap { + case Left(err) => Monad[F].pure(List(err.asLeft[SdjExtractResult])) + case Right(json) => + json.asArray.get.toList // .get OK because SDJ wrapping the contexts valid + .traverse { sdj => + parseAndValidateSDJ_sv(sdj, client, registryLookup).value + } + } case None => - EitherT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]]( - List.empty[SdjExtractResult] - ) - }).toValidated + Monad[F].pure(Nil) + } /** * Validate each context added by the enrichments against its schema @@ -159,7 +154,7 @@ object IgluUtils { * @param raw Input event to put in the bad row if at least one context is invalid * @param processor Meta data for the bad row * @param enriched Partially enriched event to put in the bad row - * @return Unit if all the contexts are valid + * @return Valid enrichments contexts and bad row if one or several contexts are invalid */ private[common] def validateEnrichmentsContexts[F[_]: Monad: Clock]( client: IgluCirceClient[F], @@ -168,26 +163,31 @@ object IgluUtils { processor: Processor, enriched: EnrichedEvent, registryLookup: RegistryLookup[F] - ): EitherT[F, BadRow.EnrichmentFailures, Unit] = + ): F[(Option[BadRow.EnrichmentFailures], List[SelfDescribingData[Json]])] = checkList(client, sdjs, registryLookup) - .leftMap( - _.map { - case (schemaKey, clientError) => - val enrichmentInfo = - FailureDetails.EnrichmentInformation(schemaKey, "enrichments-contexts-validation") - FailureDetails.EnrichmentFailure( - enrichmentInfo.some, - FailureDetails.EnrichmentFailureMessage.IgluError(schemaKey, clientError) - ) - } - ) - .leftMap { enrichmentFailures => - EnrichmentManager.buildEnrichmentFailuresBadRow( - enrichmentFailures, - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) + .map(_.separate) + .map { + case (errors, valid) => + val maybeBadRow = errors match { + case Nil => None + case head :: tail => + val enrichmentFailures = NonEmptyList.of(head, tail: _*).map { + case (schemaKey, clientError) => + FailureDetails.EnrichmentFailure( + FailureDetails.EnrichmentInformation(schemaKey, "enrichments-contexts-validation").some, + FailureDetails.EnrichmentFailureMessage.IgluError(schemaKey, clientError) + ) + } + EnrichmentManager + .buildEnrichmentFailuresBadRow( + enrichmentFailures, + EnrichedEvent.toPartiallyEnrichedEvent(enriched), + RawEvent.toRawEvent(raw), + processor + ) + .some + } + (maybeBadRow, valid) } /** Used to extract .data for input custom contexts and input unstructured event */ @@ -209,7 +209,7 @@ object IgluUtils { .parse(json) .leftMap(FailureDetails.SchemaViolation.NotIglu(json, _)) .toEitherT[F] - // Check that the schema of SelfDescribingData[Json] is the expected one + // Check thant the schema of SelfDescribingData[Json] is the expected one _ <- if (validateCriterion(sdj, expectedCriterion)) EitherT.rightT[F, FailureDetails.SchemaViolation](sdj) else @@ -248,13 +248,14 @@ object IgluUtils { client: IgluCirceClient[F], sdjs: List[SelfDescribingData[Json]], registryLookup: RegistryLookup[F] - ): EitherT[F, NonEmptyList[(SchemaKey, ClientError)], Unit] = - EitherT { - sdjs - .map(check(client, _, registryLookup).toValidatedNel) - .sequence - .map(_.sequence_.toEither) - } + ): F[List[Either[(SchemaKey, ClientError), SelfDescribingData[Json]]]] = + sdjs + .traverse(check(client, _, registryLookup).value) + .map(_.zip(sdjs)) + .map(_.map { + case (Left(err), _) => Left(err) + case (_, sdj) => Right(sdj) + }) /** Parse a Json as a SDJ and check that it's valid */ private def parseAndValidateSDJ_sv[F[_]: Monad: Clock]( // _sv for SchemaViolation