diff --git a/modules/aws/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/aws/S3.scala b/modules/aws/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/aws/S3.scala index a2d8ca061..c3f86336e 100644 --- a/modules/aws/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/aws/S3.scala +++ b/modules/aws/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/aws/S3.scala @@ -23,7 +23,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.Key -import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.charset.StandardCharsets object S3 { @@ -43,22 +43,24 @@ object S3 { BlobStorage.BlobObject(key, url.path.representation.size.getOrElse(0L)) } - def get(path: Key): F[Either[Throwable, String]] = { + override def getBytes(path: Key): Stream[F, Byte] = { val (bucketName, keyPath) = BlobStorage.splitKey(path) Authority .parse(bucketName) .fold( - errors => Async[F].delay(new MultipleUrlValidationException(errors).asLeft[String]), + errors => Stream.raiseError[F](new MultipleUrlValidationException(errors)), authority => client .get(Url("s3", authority, Path(keyPath)), 1024) - .compile - .to(Array) - .map(array => new String(array, UTF_8)) - .attempt ) } + def get(path: Key): F[Either[Throwable, String]] = + getBytes(path).compile + .to(Array) + .map(array => new String(array, StandardCharsets.UTF_8)) + .attempt + def list(folder: BlobStorage.Folder, recursive: Boolean): Stream[F, BlobStorage.BlobObject] = { val (bucketName, folderPath) = BlobStorage.splitPath(folder) Authority diff --git a/modules/azure/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/azure/AzureBlobStorage.scala b/modules/azure/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/azure/AzureBlobStorage.scala index 78cc71bc0..cc8fd2d1c 100644 --- a/modules/azure/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/azure/AzureBlobStorage.scala +++ b/modules/azure/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/azure/AzureBlobStorage.scala @@ -20,6 +20,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.{Folder, Key} import fs2.{Pipe, Stream} import java.net.URI +import java.nio.charset.StandardCharsets class AzureBlobStorage[F[_]: Async] private (store: AzureStore[F], configuredPath: AzureBlobStorage.PathParts) extends BlobStorage[F] { @@ -42,16 +43,18 @@ class AzureBlobStorage[F[_]: Async] private (store: AzureStore[F], configuredPat } override def get(key: Key): F[Either[Throwable, String]] = + getBytes(key).compile + .to(Array) + .map(array => new String(array, StandardCharsets.UTF_8)) + .attempt + + override def getBytes(key: Key): Stream[F, Byte] = createStorageUrlFrom(key) match { case Valid(url) => store .get(url, 1024) - .compile - .to(Array) - .map(array => new String(array)) - .attempt case Invalid(errors) => - Async[F].delay(new MultipleUrlValidationException(errors).asLeft[String]) + Stream.raiseError[F](new MultipleUrlValidationException(errors)) } override def keyExists(key: Key): F[Boolean] = diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/ParquetUtils.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/ParquetUtils.scala index f9639cb33..bdf4e5d12 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/ParquetUtils.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/ParquetUtils.scala @@ -7,8 +7,11 @@ */ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common +import cats.effect.IO import com.github.mjakubowski84.parquet4s._ +import com.github.mjakubowski84.parquet4s.parquet.fromParquet import io.circe.Json +import com.github.mjakubowski84.parquet4s.{Path => ParquetPath, RowParquetRecord} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path => HadoopPath} import org.apache.parquet.column.ColumnDescriptor @@ -28,15 +31,15 @@ import java.time.temporal.ChronoUnit import java.time.{Instant, ZoneOffset} import java.util.TimeZone import scala.jdk.CollectionConverters._ - import fs2.io.file.Path +import scala.annotation.nowarn + object ParquetUtils { val config = ValueCodecConfiguration(TimeZone.getTimeZone(ZoneOffset.UTC)) def readParquetColumns(path: Path): Map[File, List[ColumnDescriptor]] = { - val conf = new Configuration(); val parquetFileFilter = new FileFilter { override def accept(pathname: File): Boolean = pathname.toString.endsWith(".parquet") } @@ -44,14 +47,21 @@ object ParquetUtils { new File(path.toString) .listFiles(parquetFileFilter) .map { parquetFile => - @annotation.nowarn("cat=deprecation") - val parquetMetadata = ParquetFileReader.readFooter(conf, new HadoopPath(parquetFile.toString), ParquetMetadataConverter.NO_FILTER) - val columns = parquetMetadata.getFileMetaData.getSchema.getColumns.asScala.toList - (parquetFile, columns) + (parquetFile, readFileColumns(parquetFile)) } .toMap } + @nowarn("cat=deprecation") + def readFileColumns(parquetFile: File): List[ColumnDescriptor] = + ParquetFileReader + .readFooter(new Configuration(), new HadoopPath(parquetFile.toString), ParquetMetadataConverter.NO_FILTER) + .getFileMetaData + .getSchema + .getColumns + .asScala + .toList + def extractColumnsFromSchemaString(schema: String) = MessageTypeParser .parseMessageType(schema) @@ -59,6 +69,18 @@ object ParquetUtils { .asScala .toList + def readParquetRowsAsJsonFrom(path: Path, columns: List[ColumnDescriptor]): IO[List[Json]] = + fromParquet[IO] + .as[RowParquetRecord] + .read(ParquetPath(path.toNioPath.toUri.toString)) + .map { record => + convertParquetRecordToJson(record, List.empty, columns) + } + .compile + .toList + .map(_.sortBy(_.asObject.flatMap(_("event_id")).flatMap(_.asString))) + .map(_.map(_.deepDropNullValues)) + def convertParquetRecordToJson( record: RowParquetRecord, parentPath: List[String], diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/TestApplication.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/TestApplication.scala index cbd78a2b3..b6397d2fc 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/TestApplication.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/TestApplication.scala @@ -135,6 +135,9 @@ object TestApplication { override def get(path: Key): F[Either[Throwable, String]] = Concurrent[F].raiseError(new Exception("readKey isn't implemented for blob storage file type")) + override def getBytes(path: Key): Stream[F, Byte] = + Stream.empty + override def keyExists(key: Key): F[Boolean] = Concurrent[F].raiseError(new Exception(s"keyExists isn't implemented for blob storage file type")) } diff --git a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala index d5852e927..6811a2d12 100644 --- a/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala +++ b/modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/WiderowParquetProcessingSpec.scala @@ -9,18 +9,13 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.proce import cats.effect.IO import cats.effect.unsafe.implicits.global - -import fs2.io.file.Path - -import com.github.mjakubowski84.parquet4s.{Path => ParquetPath, RowParquetRecord} -import com.github.mjakubowski84.parquet4s.parquet.fromParquet import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils.readParquetColumns +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.{AppId, ParquetUtils} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils.{readParquetColumns, readParquetRowsAsJsonFrom} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.WiderowParquetProcessingSpec.{appConfig, igluConfig} +import fs2.io.file.Path import io.circe.syntax.EncoderOps import io.circe.{Json, JsonObject} import org.apache.parquet.column.ColumnDescriptor @@ -54,7 +49,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { expectedParquetColumns <- readParquetColumnsFromResource( "/processing-spec/4/output/good/parquet/schema" ) // the same schema as in resource file used in WideRowParquetSpec for batch transformer - actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns) + actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) actualParquetColumns = readParquetColumns(goodPath) actualBadRows <- readStringRowsFrom(badPath) @@ -90,7 +85,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { for { output <- process(inputStream, config) expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/5/output/good/parquet/schema") - actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns) + actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) actualParquetColumns = readParquetColumns(goodPath) expectedCompletionMessage <- readMessageFromResource("/processing-spec/5/output/good/parquet/completion.json", outputDirectory) expectedParquetRows <- readGoodParquetEventsFromResource( @@ -123,7 +118,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { for { output <- process(inputStream, config) expectedParquetColumns <- readParquetColumnsFromResource("/processing-spec/6/output/good/parquet/schema") - actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns) + actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) actualParquetColumns = readParquetColumns(goodPath) expectedCompletionMessage <- readMessageFromResource("/processing-spec/6/output/good/parquet/completion.json", outputDirectory) expectedParquetRows <- @@ -159,7 +154,7 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { expectedParquetColumns <- readParquetColumnsFromResource( "/processing-spec/7/output/good/parquet/schema" ) // the same schema as in resource file used in WideRowParquetSpec for batch transformer - actualParquetRows <- readParquetRowsFrom(goodPath, expectedParquetColumns) + actualParquetRows <- readParquetRowsAsJsonFrom(goodPath, expectedParquetColumns) actualParquetColumns = readParquetColumns(goodPath) expectedCompletionMessage <- readMessageFromResource("/processing-spec/7/output/good/parquet/completion.json", outputDirectory) } yield { @@ -198,18 +193,6 @@ class WiderowParquetProcessingSpec extends BaseProcessingSpec { .map(transformEventForParquetTest(columnToAdjust.getOrElse("none"))) } - private def readParquetRowsFrom(path: Path, columns: List[ColumnDescriptor]) = - fromParquet[IO] - .as[RowParquetRecord] - .read(ParquetPath(path.toNioPath.toUri.toString)) - .map { record => - ParquetUtils.convertParquetRecordToJson(record, List.empty, columns) - } - .compile - .toList - .map(_.sortBy(_.asObject.flatMap(_("event_id")).flatMap(_.asString))) - .map(_.map(_.deepDropNullValues)) - private def readParquetColumnsFromResource(path: String): IO[List[ColumnDescriptor]] = readLinesFromResource(path) .map(_.mkString) diff --git a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/cloud/BlobStorage.scala b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/cloud/BlobStorage.scala index 47d328ed0..3f70101d6 100644 --- a/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/cloud/BlobStorage.scala +++ b/modules/common/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/common/cloud/BlobStorage.scala @@ -28,6 +28,8 @@ trait BlobStorage[F[_]] { def get(path: BlobStorage.Key): F[Either[Throwable, String]] + def getBytes(path: BlobStorage.Key): Stream[F, Byte] + /** Check if blob storage key exist */ def keyExists(key: BlobStorage.Key): F[Boolean] } @@ -210,6 +212,9 @@ object BlobStorage { override def get(path: Key): F[Either[Throwable, String]] = MonadThrow[F].raiseError(new IllegalArgumentException("noop blobstorage interpreter")) + override def getBytes(path: Key): Stream[F, Byte] = + Stream.empty + override def keyExists(key: Key): F[Boolean] = MonadThrow[F].raiseError(new IllegalArgumentException("noop blobstorage interpreter")) } diff --git a/modules/gcp/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/gcp/GCS.scala b/modules/gcp/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/gcp/GCS.scala index bb8cfbef3..32e007c79 100644 --- a/modules/gcp/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/gcp/GCS.scala +++ b/modules/gcp/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/gcp/GCS.scala @@ -22,6 +22,8 @@ import fs2.{Pipe, Stream} import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.{Folder, Key} +import java.nio.charset.StandardCharsets + object GCS { def blobStorage[F[_]: Async]: Resource[F, BlobStorage[F]] = @@ -58,22 +60,24 @@ object GCS { ) } - override def get(key: Key): F[Either[Throwable, String]] = { + override def getBytes(key: Key): Stream[F, Byte] = { val (bucket, path) = BlobStorage.splitKey(key) Authority .parse(bucket) .fold( - errors => Async[F].delay(new MultipleUrlValidationException(errors).asLeft[String]), + errors => Stream.raiseError[F](new MultipleUrlValidationException(errors)), authority => client .get(Url("gs", authority, Path(path)), 1024) - .compile - .to(Array) - .map(array => new String(array)) - .attempt ) } + override def get(key: Key): F[Either[Throwable, String]] = + getBytes(key).compile + .to(Array) + .map(array => new String(array, StandardCharsets.UTF_8)) + .attempt + override def keyExists(key: Key): F[Boolean] = { val (bucket, path) = BlobStorage.splitKey(key) Authority diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureAWS.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureAWS.scala index 25c80bdf4..863749800 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureAWS.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureAWS.scala @@ -28,6 +28,9 @@ object PureAWS { def get(path: Key): Pure[Either[Throwable, String]] = Pure.pure(Left(new NotImplementedError("Not used in tests"))) + def getBytes(path: Key): Stream[Pure, Byte] = + Stream.empty + def keyExists(key: Key): Pure[Boolean] = Pure.pure(results.keyExists(key)) } diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AppConfiguration.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AppConfiguration.scala new file mode 100644 index 000000000..6e05b6f8e --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AppConfiguration.scala @@ -0,0 +1,25 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression + +final case class AppConfiguration( + compression: Compression, + fileFormat: WideRow.WideRowFormat, + windowFrequencyMinutes: Long +) + +object AppConfiguration { + + /** + * Regarding `windowFrequencyMinutes = 1` - officially the default 'windowing' setting for + * streaming transformer is '10 minutes'. As we don't want to make the tests take too much time, + * we use 1 minute here. It means that for all test scenarios using this default confguration, + * transformer instance under the test needs to be configured with `1 minute` windowing setting. + * + * Compression and file format defaults match the ones from the official reference file. + * + * See reference here -> + * https://github.com/snowplow/snowplow-rdb-loader/blob/master/modules/common-transformer-stream/src/main/resources/application.conf + */ + val default = AppConfiguration(Compression.Gzip, WideRow.WideRowFormat.JSON, windowFrequencyMinutes = 1) +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AppDependencies.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AppDependencies.scala new file mode 100644 index 000000000..b4ddedc15 --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AppDependencies.scala @@ -0,0 +1,17 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental +import cats.effect.IO +import cats.effect.kernel.Resource +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} + +final case class AppDependencies( + blobClient: BlobStorage[IO], + queueConsumer: Queue.Consumer[IO], + producer: Queue.Producer[IO] +) + +object AppDependencies { + + trait Provider { + def createDependencies(): Resource[IO, AppDependencies] + } +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AzureTransformerSpecification.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AzureTransformerSpecification.scala new file mode 100644 index 000000000..94dd86ef9 --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/AzureTransformerSpecification.scala @@ -0,0 +1,95 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental +import cats.effect.IO +import cats.effect.kernel.Resource +import com.snowplowanalytics.snowplow.rdbloader.azure.{AzureBlobStorage, KafkaConsumer, KafkaProducer} +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.AzureTransformerSpecification._ + +import java.net.URI +import java.util.UUID + +// format: off +/** + * Trait providing Azure specific dependencies for the generic `TransformerSpecification`. + * Before running tests with Azure support, it's necessary to: + * + * 1) Launch transformer application in the Azure cloud. + * 2) Set up input (enriched data) and output (for `shredding_compelete.json` message to notify loader) EventHubs. + * 3) Set up Azure Blob Storage account and container for transformed data. + * 4) Set following environment variables with values specific for your cloud setup: + * + * - TEST_TRANSFORMER_BLOB_STORAGE_URL (format like 'https://${accountName}.blob.core.windows.net/${containerName}) + * - TEST_TRANSFORMER_EVENTHUBS_URL (format like '...-namespace.servicebus.windows.net:9093") + * - TEST_TRANSFORMER_INPUT_HUB_NAME + * - TEST_TRANSFORMER_OUTPUT_HUB_NAME + * - TEST_TRANSFORMER_INPUT_HUB_KEY + * - TEST_TRANSFORMER_OUTPUT_HUB_KEY + * - AZURE_TENANT_ID - may be used by the azure-sdk library to communicate with the cloud if you wish to authenticate with Azure CLI. + * It can be extracted e.g. with `az account show | jq -r '.homeTenantId'` executed in command line. + */ +// format: on +trait AzureTransformerSpecification extends TransformerSpecification with AppDependencies.Provider { + skipAllIf(anyEnvironmentVariableMissing()) + + override def createDependencies(): Resource[IO, AppDependencies] = + for { + blobClient <- createBlobClient() + consumer <- createConsumer() + producer <- createProducer() + } yield AppDependencies(blobClient, consumer, producer) + + private def createBlobClient(): Resource[IO, BlobStorage[IO]] = + AzureBlobStorage + .createDefault[IO](URI.create(System.getenv(blobStorageUrlEnv))) + + private def createConsumer(): Resource[IO, Queue.Consumer[IO]] = + KafkaConsumer + .consumer[IO]( + bootstrapServers = System.getenv(eventHubsUrlEnv), + topicName = System.getenv(outputHubNameEnv), + consumerConf = Map( + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "PLAIN", + "sasl.jaas.config" -> s"""org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$$ConnectionString\" password=\"${System + .getenv(outputHubKeyEnv)}\";""", + "group.id" -> s"test-transformer-consumer-${UUID.randomUUID()}", + "enable.auto.commit" -> "true", + "auto.offset.reset" -> "latest" + ) + ) + + private def createProducer(): Resource[IO, Queue.Producer[IO]] = + KafkaProducer + .producer[IO]( + bootstrapServers = System.getenv(eventHubsUrlEnv), + topicName = System.getenv(inputHubNameEnv), + producerConf = Map( + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "PLAIN", + "sasl.jaas.config" -> s"""org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$$ConnectionString\" password=\"${System + .getenv(inputHubKeyEnv)}\";""" + ) + ) +} + +object AzureTransformerSpecification { + val blobStorageUrlEnv = "TEST_TRANSFORMER_BLOB_STORAGE_URL" + val eventHubsUrlEnv = "TEST_TRANSFORMER_EVENTHUBS_URL" + val inputHubNameEnv = "TEST_TRANSFORMER_INPUT_HUB_NAME" + val outputHubNameEnv = "TEST_TRANSFORMER_OUTPUT_HUB_NAME" + val inputHubKeyEnv = "TEST_TRANSFORMER_INPUT_HUB_KEY" + val outputHubKeyEnv = "TEST_TRANSFORMER_OUTPUT_HUB_KEY" + + val requiredEnvironmentVariables = List( + blobStorageUrlEnv, + eventHubsUrlEnv, + inputHubNameEnv, + outputHubNameEnv, + inputHubKeyEnv, + outputHubKeyEnv + ) + + def anyEnvironmentVariableMissing(): Boolean = + requiredEnvironmentVariables.exists(varName => System.getenv(varName) == null) + +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/InputBatch.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/InputBatch.scala new file mode 100644 index 000000000..3d1c6cfa2 --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/InputBatch.scala @@ -0,0 +1,48 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.eventgen.enrich.{SdkEvent => EventGenerator} +import com.snowplowanalytics.snowplow.eventgen.protocol.event.{EventFrequencies, UnstructEventFrequencies} +import org.scalacheck.Gen +import org.scalacheck.rng.Seed + +import java.time.Instant +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.Random + +final case class InputBatch(content: InputBatch.Content, delay: FiniteDuration = 0.minutes) { + def delayed(value: FiniteDuration) = this.copy(delay = value) +} + +object InputBatch { + + sealed trait Content + object Content { + final case class TextLines(lines: List[String]) extends Content + final case class SdkEvents(events: List[Event]) extends Content + } + + def good(count: Int): InputBatch = InputBatch( + Content.SdkEvents( + EventGenerator + .gen( + eventPerPayloadMin = count, + eventPerPayloadMax = count, + now = Instant.now(), + frequencies = EventFrequencies(1, 1, 1, 1, UnstructEventFrequencies(1, 1, 1)) + ) + .apply(Gen.Parameters.default, Seed(Random.nextLong())) + .get + ) + ) + + def bad(count: Int): InputBatch = InputBatch( + Content.TextLines { + (1 to count).map(idx => s"Some broken input - $idx").toList + } + ) + + def asTextLines(content: Content): List[String] = content match { + case Content.TextLines(lines) => lines + case Content.SdkEvents(events) => events.map(_.toTsv) + } +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/OutputDataRowReader.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/OutputDataRowReader.scala new file mode 100644 index 000000000..8ad2d4890 --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/OutputDataRowReader.scala @@ -0,0 +1,40 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental +import cats.effect.IO +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.ParquetUtils +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.{Blob, DataRow} +import fs2.io.file.{Files, Path} +import io.circe.parser + +import java.nio.charset.StandardCharsets + +object OutputDataRowReader { + + def fromJson(blob: Blob): IO[List[DataRow]] = + fs2.Stream + .emits[IO, Byte](blob) + .through(fs2.text.decodeWithCharset(StandardCharsets.UTF_8)) + .through(fs2.text.lines) + .map(parser.parse(_).right.get) + .compile + .toList + + // For parquet we fetch all bytes from remote blob storage and store them in the temporary local output. + // Then we use hadoop API (details in the `ParquetUtils`) to decode it and convert to human-readable JSON format. + def fromParquet(blob: Blob): IO[List[DataRow]] = + Files[IO].tempFile + .use { tempOutput => + for { + _ <- saveParquetDataToTemporaryOutput(tempOutput, blob) + outputParquetColumns = ParquetUtils.readFileColumns(tempOutput.toNioPath.toFile) + parquetRows <- ParquetUtils.readParquetRowsAsJsonFrom(tempOutput, outputParquetColumns) + } yield parquetRows + } + + private def saveParquetDataToTemporaryOutput(outputPath: Path, blob: Blob): IO[Unit] = + fs2.Stream + .emits(blob) + .through(Files[IO].writeAll(outputPath)) + .compile + .drain + +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala new file mode 100644 index 000000000..e3cf3ce5a --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala @@ -0,0 +1,278 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import cats.implicits._ +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.{BlobObject, Folder} +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification._ +import fs2.Stream +import fs2.compression.{Compression => FS2Compression} +import fs2.concurrent.Signal.SignalOps +import fs2.concurrent.{Signal, SignallingRef} +import io.circe.Json +import org.specs2.matcher.MatchResult +import org.specs2.mutable.Specification + +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit +import scala.concurrent.duration.DurationInt + +// format: off +/** + * Generic template for the streaming transformer specification. It produces specified input (could be delayed batches), + * waits for incoming windows (can be more than one) with data and asserts produced by the transformer output. + * It assumes transformer app is already running somewhere with accessible input/output. + * It verifies facts about transformer which should be true for any kind of transformation. + * + * It's an abstract class and can't be run on its own. Specific implementation needs to implement abstract members like: + * - 'description' - details about scenario + * - 'inputBatches' - list of input events. Could be any kind of good/bad data in the TSV format. Delivery of a batch can be delayed if any delay is specified. + * - 'countExpectations' - how many good/bad events are expected on the output. If we reach expected count, test is interrupted and assertions are run. + * - 'requiredAppConfig' - expected configuration details for tested transformer application + * - 'customDataAssertion' - optional, allows to execute more detailed assertions of an output, like checking values for transformed events. + * + * Dependencies like blob storage client and queue consumer/producer need to be supplied by a proper `AppDependencies.Provider` implementation. + * + * What kind of assertions are run: + * - `shredding_complete.json` exists in the storage + * - `shredding_complete.json` consumed from the queue is equal to the one in the storage + * - `shredding_complete.json` count values matches the number of actual output events (good and bad) in the storage (for each window) + * - expected output format is used + * - expected output compression is used + * - windows are emitted with expected frequency + */ +// format: on + +abstract class TransformerSpecification extends Specification with AppDependencies.Provider { + + private val timeout = 15.minutes + + protected def description: String + protected def inputBatches: List[InputBatch] + protected def countExpectations: CountExpectations + protected def requiredAppConfig: AppConfiguration + protected def customDataAssertion: Option[DataAssertion] = None + + s"$description" in { + run().unsafeRunSync() + } + + def run(): IO[MatchResult[Any]] = + createDependencies() + .use { dependencies => + for { + windowsAccumulator <- SignallingRef.of[IO, WindowsAccumulator](WindowsAccumulator(List.empty)) + consuming = consumeAllIncomingWindows(dependencies, countExpectations, windowsAccumulator) + producing = produceInput(inputBatches, dependencies) + _ <- consuming.concurrently(producing).compile.drain + collectedWindows <- windowsAccumulator.get + } yield { + collectedWindows.value.foreach(assertSingleWindow) + assertWindowingFrequency(collectedWindows.value) + + val aggregatedData = aggregateDataFromAllWindows(collectedWindows.value) + assertAggregatedCounts(aggregatedData) + customDataAssertion.fold(ok)(_.apply(aggregatedData)) + } + } + + private def produceInput(batches: List[InputBatch], dependencies: AppDependencies): Stream[IO, Unit] = + Stream.eval { + IO.sleep(10.seconds) *> batches.traverse_ { batch => + IO.sleep(batch.delay) *> InputBatch + .asTextLines(batch.content) + .parTraverse_(dependencies.producer.send) + } + } + + private def consumeAllIncomingWindows( + dependencies: AppDependencies, + countExpectations: CountExpectations, + windowsAccumulator: SignallingRef[IO, WindowsAccumulator] + ): Stream[IO, Unit] = + dependencies.queueConsumer.read + .map(_.content) + .map(parseShreddingCompleteMessage) + .evalMap(readWindowOutput(dependencies.blobClient)) + .evalMap { windowOutput => + windowsAccumulator.update(_.addWindow(windowOutput)) + } + .interruptWhen(allEventsProcessed(windowsAccumulator, countExpectations)) + .interruptAfter(timeout) + + private def assertSingleWindow(output: WindowOutput): MatchResult[Any] = { + output.`shredding_complete.json`.compression must beEqualTo(requiredAppConfig.compression) + output.`shredding_complete.json`.count.get.good must beEqualTo(output.goodEvents.size) + output.`shredding_complete.json`.count.get.bad.get must beEqualTo(output.badEvents.size) + output.`shredding_complete.json`.typesInfo must beLike { case TypesInfo.WideRow(fileFormat, _) => + fileFormat must beEqualTo(requiredAppConfig.fileFormat) + } + } + + private def assertWindowingFrequency(collectedWindows: List[WindowOutput]): Unit = + collectedWindows.groupBy(_.appId).foreach { case (_, windows) => + windows.zip(windows.tail).foreach { case (window1, window2) => + ChronoUnit.MINUTES.between(window1.producedAt, window2.producedAt) must beEqualTo(requiredAppConfig.windowFrequencyMinutes) + } + } + + private def assertAggregatedCounts(aggregatedData: AggregatedData): MatchResult[Any] = { + aggregatedData.good.size must beEqualTo(countExpectations.good) + aggregatedData.bad.size must beEqualTo(countExpectations.bad) + } + + private def aggregateDataFromAllWindows(windows: List[WindowOutput]): AggregatedData = + windows.foldLeft(AggregatedData.empty) { case (aggregated, window) => + val windowTypes = window.`shredding_complete.json`.typesInfo.asInstanceOf[TypesInfo.WideRow].types + AggregatedData( + good = window.goodEvents ::: aggregated.good, + bad = window.badEvents ::: aggregated.bad, + types = windowTypes ::: aggregated.types + ) + } + + private def allEventsProcessed( + windowsAccumulator: SignallingRef[IO, WindowsAccumulator], + countExpectations: CountExpectations + ): Signal[IO, Boolean] = + windowsAccumulator + .map(_.getTotalNumberOfEvents >= countExpectations.total) + + private def readWindowOutput(blobClient: BlobStorage[IO])(message: LoaderMessage.ShreddingComplete): IO[WindowOutput] = + for { + scMessageInStorage <- readSCMessageFromBlobStorage(message, blobClient) + transformedEvents <- readDataRowsFromFolder(scMessageInStorage.base.append("output=good"), blobClient) + badEvents <- readDataRowsFromFolder(scMessageInStorage.base.append("output=bad"), blobClient) + } yield { + + message must beEqualTo(scMessageInStorage) + + // Assuming folder name structure ending like '.../run=yyyy-MM-dd-HH-mm-ss-${UUID}/' + val base = message.base.stripSuffix("/") + val appID = base.takeRight(36) // extract application ID which is represented by UUID at the end of a folder name + val time = base.stripSuffix(appID).stripSuffix("-").takeRight(19) // extract date and time of the window, but without `run=` prefix + val parsedTime = LocalDateTime.parse(time, DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss")) + + println( + s"""Received `shredding_complete.json` message: + |- folder - ${message.base} + |- counts - ${message.count} + |""".stripMargin + ) + + WindowOutput(appID, parsedTime, message, transformedEvents, badEvents) + } + + private def readDataRowsFromFolder( + folder: Folder, + blobClient: BlobStorage[IO] + ): IO[List[DataRow]] = + blobClient + .list(folder, recursive = false) + .evalMap(blob => readDataRowsFromBlob(blobClient, blob)) + .flatMap(dataRows => Stream.emits(dataRows)) + .compile + .toList + + private def readDataRowsFromBlob( + blobClient: BlobStorage[IO], + blob: BlobObject + ): IO[List[DataRow]] = + blobClient + .getBytes(blob.key) + .through(decompressIfNeeded(blob.key)) + .compile + .to(Array) + .flatMap(convertBlobToListOfRows(blob.key)) + + private def convertBlobToListOfRows(blobKey: BlobStorage.Key)(blob: Blob): IO[List[DataRow]] = + if (isBlobGoodParquetData(blobKey)) + OutputDataRowReader.fromParquet(blob) + else + OutputDataRowReader.fromJson(blob) + + // Decompress only for: + // - JSON good/bad output + // - Parquet bad output + // Decompression for parquet good data is handled by hadoop API in parquet-oriented scenarios + private def decompressIfNeeded(blobKey: BlobStorage.Key): fs2.Pipe[IO, Byte, Byte] = + requiredAppConfig.compression match { + case Compression.Gzip if !isBlobGoodParquetData(blobKey) => + FS2Compression[IO].gunzip().andThen(_.flatMap(_.content)) + case _ => + identity + } + + private def isBlobGoodParquetData(blobKey: BlobStorage.Key): Boolean = + requiredAppConfig.fileFormat == WideRowFormat.PARQUET && blobKey.contains("output=good") + + private def readSCMessageFromBlobStorage( + message: LoaderMessage.ShreddingComplete, + blobClient: BlobStorage[IO] + ): IO[LoaderMessage.ShreddingComplete] = + scMessageMustExist(message, blobClient) *> fetchSCMessage(message, blobClient) + + private def scMessageMustExist(message: LoaderMessage.ShreddingComplete, blobClient: BlobStorage[IO]) = + blobClient + .keyExists(message.base.withKey("shredding_complete.json")) + .map(messageExists => messageExists must beTrue) + + private def fetchSCMessage(message: LoaderMessage.ShreddingComplete, blobClient: BlobStorage[IO]): IO[LoaderMessage.ShreddingComplete] = + blobClient + .get(message.base.withKey("shredding_complete.json")) + .map(value => parseShreddingCompleteMessage(value.right.get)) + + private def parseShreddingCompleteMessage(message: String): LoaderMessage.ShreddingComplete = + LoaderMessage.fromString(message) match { + case Right(parsedMessage: LoaderMessage.ShreddingComplete) => parsedMessage + case other => throw new IllegalStateException(s"Provided message is not a valid shredding complete message - $other") + } + +} + +object TransformerSpecification { + + type Blob = Array[Byte] + type DataRow = Json + type DataAssertion = AggregatedData => MatchResult[Any] + + final case class CountExpectations(good: Int, bad: Int) { + def total = good + bad + } + + final case class WindowsAccumulator(value: List[WindowOutput]) { + def addWindow(window: WindowOutput): WindowsAccumulator = + WindowsAccumulator(value :+ window) + + def getTotalNumberOfEvents: Long = + value.map { window => + val good = window.`shredding_complete.json`.count.map(_.good).getOrElse(0L) + val bad = window.`shredding_complete.json`.count.flatMap(_.bad).getOrElse(0L) + good + bad + }.sum + } + + final case class WindowOutput( + appId: String, + producedAt: LocalDateTime, + `shredding_complete.json`: LoaderMessage.ShreddingComplete, + goodEvents: List[DataRow], + badEvents: List[DataRow] + ) + + final case class AggregatedData( + good: List[DataRow], + bad: List[DataRow], + types: List[TypesInfo.WideRow.Type] + ) + + object AggregatedData { + val empty = AggregatedData(List.empty, List.empty, List.empty) + } +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/BadDetailsScenario.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/BadDetailsScenario.scala new file mode 100644 index 000000000..a4f5b696d --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/BadDetailsScenario.scala @@ -0,0 +1,28 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios + +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.Content +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ + AppConfiguration, + AzureTransformerSpecification, + InputBatch +} + +class BadDetailsScenario extends AzureTransformerSpecification { + + private val badEvent = Content.TextLines(List("Some example bad event")) + + override def description = "Asserting details of output single bad row" + override def requiredAppConfig = AppConfiguration.default + override def inputBatches = List(InputBatch(badEvent)) + override def countExpectations = CountExpectations(good = 0, bad = 1) + + override def customDataAssertion = Some { outputData => + val badRow = outputData.bad.head + badRow.hcursor.get[String]("schema").right.get must beEqualTo( + "iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0" + ) + badRow.hcursor.downField("data").get[String]("payload").right.get must beEqualTo("Some example bad event") + badRow.hcursor.downField("data").downField("failure").get[String]("type").right.get must beEqualTo("NotTSV") + } +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala new file mode 100644 index 000000000..e9a862f1b --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala @@ -0,0 +1,244 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ + AppConfiguration, + AzureTransformerSpecification, + InputBatch +} +import io.circe.parser + +import scala.concurrent.duration.DurationInt + +class JsonScenario1 extends AzureTransformerSpecification { + def description = "Input: 1 good, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(good(count = 1)) + def countExpectations = CountExpectations(good = 1, bad = 0) +} + +class JsonScenario2 extends AzureTransformerSpecification { + def description = "Input: 1 bad, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(bad(count = 1)) + def countExpectations = CountExpectations(good = 0, bad = 1) +} + +class JsonScenario3 extends AzureTransformerSpecification { + def description = "Input: 1000 good, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(good(count = 1000)) + def countExpectations = CountExpectations(good = 1000, bad = 0) +} + +class JsonScenario4 extends AzureTransformerSpecification { + def description = "Input: 1000 bad, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(bad(count = 1000)) + def countExpectations = CountExpectations(good = 0, bad = 1000) +} + +class JsonScenario5 extends AzureTransformerSpecification { + def description = """Input: mixed 500 good and 500 bad, config: JSON, compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(good(count = 500), bad(count = 500)) + def countExpectations = CountExpectations(good = 500, bad = 500) +} + +//Changed defualt windowing to 2 minutes +class JsonScenario6 extends AzureTransformerSpecification { + def description = """Input: mixed 500 good and 500 bad, config: JSON, compression, windowing: 2 minutes""" + def requiredAppConfig = AppConfiguration.default.copy(windowFrequencyMinutes = 2) + def inputBatches = List(good(count = 500), good(count = 500).delayed(2.minutes)) // force new window by delaying second input batch + def countExpectations = CountExpectations(good = 1000, bad = 0) +} + +//No compression +class JsonScenario7 extends AzureTransformerSpecification { + def description = """Input: mixed 500 good and 500 bad, config: JSON, no compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default.copy(compression = Compression.None) + def inputBatches = List(good(count = 500), bad(count = 500)) + def countExpectations = CountExpectations(good = 500, bad = 500) +} + +//Checking details of JSON output +class JsonScenario8 extends AzureTransformerSpecification { + + private val goodEvent = Content.TextLines( + List( + """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:16:35.967 unstruct 2b1b25a4-c0df-4859-8201-cf21492ad61b 114221 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 68.42.204.218 1242058182 58df65c46e1ac937 11 437ad25b-2006-455e-b5d8-d664b74df8f3 US MI Holland 49423 42.742294 -86.0661 http://snowplowanalytics.com/blog/ https://www.google.com/ http snowplowanalytics.com 80 /blog/ https www.google.com 80 / search Google {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/WebPage/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]} {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-0","data":{"targetUrl":"http://snowplowanalytics.com/blog/page2","elementClasses":["next"]}}} Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36 Chrome Chrome Browser WEBKIT en-US 1 1 1 0 1 0 0 0 1 1 24 1241 806 Mac OS Mac OS Apple Inc. America/New_York Computer 0 1440 900 UTF-8 """ + ) + ) + override def description = "Asserting details of a single JSON transformed event" + override def requiredAppConfig = AppConfiguration.default + override def inputBatches = List(InputBatch(goodEvent)) + override def countExpectations = CountExpectations(good = 1, bad = 0) + + override def customDataAssertion = Some { outputData => + val transformedEvent = outputData.good.head + val expectedEvent = parser + .parse(""" + |{ + | "page_urlhost": "snowplowanalytics.com", + | "br_features_realplayer": false, + | "etl_tstamp": "2014-06-01T14:04:11.639Z", + | "dvce_ismobile": false, + | "geo_latitude": 42.742294, + | "refr_medium": "search", + | "ti_orderid": null, + | "br_version": null, + | "base_currency": null, + | "v_collector": "clj-0.6.0-tom-0.0.4", + | "mkt_content": null, + | "collector_tstamp": "2014-05-29T18:16:35Z", + | "os_family": "Mac OS", + | "ti_sku": null, + | "event_vendor": null, + | "network_userid": "437ad25b-2006-455e-b5d8-d664b74df8f3", + | "br_renderengine": "WEBKIT", + | "br_lang": "en-US", + | "tr_affiliation": null, + | "ti_quantity": null, + | "ti_currency": null, + | "geo_country": "US", + | "user_fingerprint": "1242058182", + | "mkt_medium": null, + | "page_urlscheme": "http", + | "ti_category": null, + | "pp_yoffset_min": null, + | "br_features_quicktime": true, + | "event": "unstruct", + | "refr_urlhost": "www.google.com", + | "user_ipaddress": "68.42.204.218", + | "br_features_pdf": true, + | "page_referrer": "https://www.google.com/", + | "doc_height": null, + | "refr_urlscheme": "https", + | "geo_region": "MI", + | "geo_timezone": null, + | "page_urlfragment": null, + | "br_features_flash": true, + | "os_manufacturer": "Apple Inc.", + | "mkt_clickid": null, + | "ti_price": null, + | "br_colordepth": "24", + | "event_format": null, + | "tr_total": null, + | "contexts_org_schema_web_page_1": [ + | { + | "datePublished": "2014-07-23T00:00:00Z", + | "author": "Jonathan Almeida", + | "inLanguage": "en-US", + | "genre": "blog", + | "breadcrumb": [ + | "blog", + | "releases" + | ], + | "keywords": [ + | "snowplow", + | "analytics", + | "java", + | "jvm", + | "tracker" + | ] + | } + | ], + | "pp_xoffset_min": null, + | "doc_width": null, + | "geo_zipcode": "49423", + | "br_family": "Chrome", + | "tr_currency": null, + | "useragent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36", + | "event_name": null, + | "os_name": "Mac OS", + | "page_urlpath": "/blog/", + | "br_name": "Chrome", + | "ip_netspeed": null, + | "page_title": null, + | "ip_organization": null, + | "dvce_created_tstamp": "2014-05-29T18:16:35.967Z", + | "br_features_gears": false, + | "dvce_type": "Computer", + | "dvce_sent_tstamp": null, + | "se_action": null, + | "br_features_director": false, + | "se_category": null, + | "ti_name": null, + | "user_id": null, + | "refr_urlquery": null, + | "true_tstamp": null, + | "geo_longitude": -86.0661, + | "mkt_term": null, + | "v_tracker": "js-2.0.0-M2", + | "os_timezone": "America/New_York", + | "br_type": "Browser", + | "br_features_windowsmedia": false, + | "event_version": null, + | "dvce_screenwidth": 1440, + | "refr_dvce_tstamp": null, + | "se_label": null, + | "domain_sessionid": null, + | "domain_userid": "58df65c46e1ac937", + | "page_urlquery": null, + | "geo_location": "42.742294,-86.0661", + | "refr_term": null, + | "name_tracker": "clojure", + | "tr_tax_base": null, + | "dvce_screenheight": 900, + | "mkt_campaign": null, + | "refr_urlfragment": null, + | "tr_shipping": null, + | "tr_shipping_base": null, + | "br_features_java": true, + | "br_viewwidth": 1241, + | "geo_city": "Holland", + | "unstruct_event_com_snowplowanalytics_snowplow_link_click_1": { + | "targetUrl": "http://snowplowanalytics.com/blog/page2", + | "elementClasses": [ + | "next" + | ] + | }, + | "br_viewheight": 806, + | "refr_domain_userid": null, + | "br_features_silverlight": true, + | "ti_price_base": null, + | "tr_tax": null, + | "br_cookies": true, + | "tr_total_base": null, + | "refr_urlport": 80, + | "derived_tstamp": null, + | "app_id": "snowplowweb", + | "ip_isp": null, + | "geo_region_name": null, + | "pp_yoffset_max": null, + | "ip_domain": null, + | "domain_sessionidx": 11, + | "pp_xoffset_max": null, + | "mkt_source": null, + | "page_urlport": 80, + | "se_property": null, + | "platform": "web", + | "event_id": "2b1b25a4-c0df-4859-8201-cf21492ad61b", + | "refr_urlpath": "/", + | "mkt_network": null, + | "se_value": null, + | "page_url": "http://snowplowanalytics.com/blog/", + | "etl_tags": null, + | "tr_orderid": null, + | "tr_state": null, + | "txn_id": 114221, + | "refr_source": "Google", + | "tr_country": null, + | "tr_city": null, + | "doc_charset": "UTF-8", + | "event_fingerprint": null, + | "v_etl": "hadoop-0.5.0-common-0.4.0" + |} + |""".stripMargin) + .right + .get + + transformedEvent must beEqualTo(expectedEvent) + } +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala new file mode 100644 index 000000000..36fa419b9 --- /dev/null +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala @@ -0,0 +1,309 @@ +package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good} +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ + AppConfiguration, + AzureTransformerSpecification, + InputBatch +} +import io.circe.parser + +import scala.concurrent.duration.DurationInt + +class ParquetScenario1 extends AzureTransformerSpecification { + def description = "Input: 1 good, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(good(count = 1)) + def countExpectations = CountExpectations(good = 1, bad = 0) +} + +class ParquetScenario2 extends AzureTransformerSpecification { + def description = "Input: 1 bad, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(bad(count = 1)) + def countExpectations = CountExpectations(good = 0, bad = 1) +} + +class ParquetScenario3 extends AzureTransformerSpecification { + def description = "Input: 1000 good, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(good(count = 1000)) + def countExpectations = CountExpectations(good = 1000, bad = 0) +} + +class ParquetScenario4 extends AzureTransformerSpecification { + def description = "Input: 1000 bad, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(bad(count = 1000)) + def countExpectations = CountExpectations(good = 0, bad = 1000) +} + +class ParquetScenario5 extends AzureTransformerSpecification { + def description = """Input: mixed 500 good and 500 bad, config: PARQUET, compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(good(count = 500), bad(count = 500)) + def countExpectations = CountExpectations(good = 500, bad = 500) +} + +//Changed defualt windowing to 2 minutes +class ParquetScenario6 extends AzureTransformerSpecification { + def description = """Input: mixed 500 good and 500 bad, config: PARQUET, compression, windowing: 2 minutes""" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET, windowFrequencyMinutes = 2) + def inputBatches = List(good(count = 500), good(count = 500).delayed(2.minutes)) // force new window by delaying second input batch + def countExpectations = CountExpectations(good = 1000, bad = 0) +} + +//No compression +class ParquetScenario7 extends AzureTransformerSpecification { + def description = """Input: mixed 500 good and 500 bad, config: PARQUET, no compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET, compression = Compression.None) + def inputBatches = List(good(count = 500), bad(count = 500)) + def countExpectations = CountExpectations(good = 500, bad = 500) +} + +//Checking details of parquet output +class ParquetScenario8 extends AzureTransformerSpecification { + + private val goodEvent = Content.TextLines( + List( + """spirit-walk web 2021-10-05 11:04:24.773579202 2021-10-05 11:04:06.799579202 unstruct a111920f-a2a3-45dc-8ae6-d8d86a383f5b datacap js-2.5.3-m1 ssc-2.2.1-pubsub beam-enrich-1.2.0-common-1.1.0 ada.blackjack@iglu.com 0:7fff:9ad9:7fff:cd64:8000:af61:eb6d a0925757-3894-4631-9e6e-e572820bde76 9462 0791c60d-3c62-49c5-87e4-42581e909a85 http://e.net/lfek Elit do sit consectetur ipsum adipiscing lorem dolor sed amet https://google.fr/morwmjx http e.net lfek https google.fr morwmjx search Google email openemail igloosforall {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"15108107-7c6f-4df1-94cf-9d8d1229095c"}},{"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0","data":{"latitude":13.47832218152621,"longitude":180.0}},{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"requestStart":100000,"chromeFirstPaint":4561,"unloadEventStart":79959,"fetchStart":1,"domainLookupStart":1,"requestEnd":1,"unloadEventEnd":100000,"loadEventStart":7718,"secureConnectionStart":1,"redirectEnd":100000,"domContentLoadedEventStart":96121,"navigationStart":12431,"proxyStart":100000,"responseStart":50588,"proxyEnd":1,"connectStart":53541,"msFirstPaint":100000,"domContentLoadedEventEnd":95815,"loadEventEnd":12163,"responseEnd":1,"connectEnd":1,"domInteractive":1,"redirectStart":97743,"domComplete":78711,"domainLookupEnd":1,"domLoading":3987}},{"schema":"iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-1","data":{"userId":"0a4d7fb4-1b41-4377-b777-43c84df3b432","sessionId":"c57ab828-5186-48f6-880a-2b00d1d68d12","sessionIndex":1299327581,"previousSessionId":"10f52998-686c-4186-83e6-87d4bf044360","storageMechanism":"SQLITE"}}]} {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-1","data":{"targetUrl":"http://www.W.ru/q"}}} Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.1.56 (KHTML, like Gecko) Version/9.0 Safari/601.1.56 {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.mparticle.snowplow/pushregistration_event/jsonschema/1-0-0","data":{"name":"s","registrationToken":"x"}},{"schema":"iglu:com.segment/screen/jsonschema/1-0-0","data":{"name":"nn5y1uOtbmT54qhatagqg6pjpfydxdzv"}},{"schema":"iglu:com.snowplowanalytics.snowplow/consent_withdrawn/jsonschema/1-0-0","data":{"all":false}},{"schema":"iglu:com.mparticle.snowplow/session_context/jsonschema/1-0-0","data":{"id":"w"}},{"schema":"iglu:com.optimizely.optimizelyx/summary/jsonschema/1-0-0","data":{"experimentId":86409,"variationName":"z0gdkswjpyckuA","variation":94199,"visitorId":"guHo9jXgVkkqgvttdbx"}},{"schema":"iglu:com.optimizely/variation/jsonschema/1-0-0","data":{"id":"aygp70c7plcgfFkletcxePqzjsrfg","name":"b","code":"cm"}},{"schema":"iglu:com.optimizely/state/jsonschema/1-0-0","data":{"experimentId":"y","isActive":false,"variationIndex":15901,"variationId":null,"variationName":"uxhiptya1afmklYxovdeWkyym"}},{"schema":"iglu:com.optimizely/visitor/jsonschema/1-0-0","data":{"browser":"H","browserVersion":null,"device":"dhqxkb2xhcxif6kb","deviceType":"bwajz3uPbvgkolzu","mobile":true}},{"schema":"iglu:com.google.analytics/private/jsonschema/1-0-0","data":{"v":null,"s":561759,"u":"4SRvufkxnwvdvacwEedknvvtihtv7t4thdwffzbkt9nGpxhLcvliwCKpasetbbylw9hoxdajvbsci00ujotb2ntK3kvgrjqwoT7chiyvoxviqawkgdmmZe8shxiuplspCgki8kliptqnjsjpasFmdkhTzfmnlMGspowzbNawTlfegkfezEadqlmnbvv3qjtrEueqsagjbrucamlmwndnw2skrabwwT7hvreyckyvwgpchjAgzuml4rfxji7je233chSsmeutdxlbZonaFtoywafl1gyaeZl77odhhd9xretxiVndvrqgcxusmelrio6xowtkqfoyuwmeasls4DzmqesVt6igsesvxRRjyu6YqymoPpwfyf3idankobecpm5nndrhyiwc37p2oqku1yirYxqawehvsv3nlr0pzizcR9vorhdbwfbe2nqhi8wvwd","gid":"n7yse4eCgtm","r":103569}},{"schema":"iglu:com.google.analytics/cookies/jsonschema/1-0-0","data":{"__utma":"E","__utmb":"lWcbfgEoyr","__utmc":"rqqjwkixOpg","__utmv":"l","__utmz":"k4Kn","_ga":"tvjkHopok"}},{"schema":"iglu:org.ietf/http_cookie/jsonschema/1-0-0","data":{"name":"NoygypmvyarvZbsknVdbskuwoBaU3qcL","value":"jlhaofcMybVj33qrdumlnt5qoktdabaw"}},{"schema":"iglu:org.ietf/http_header/jsonschema/1-0-0","data":{"name":"7r72a6d4","value":"o5cVtuyjtorn4vfo"}},{"schema":"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0","data":{"useragentMinor":"6","useragentFamily":"Firefox","useragentMajor":"7","osFamily":"Mac OS X","deviceFamily":"Mac"}},{"schema":"iglu:com.snowplowanalytics.snowplow/desktop_context/jsonschema/1-0-0","data":{"osType":"Linux","osVersion":"49365250426432071725495","osIs64Bit":false,"osServicePack":"c","deviceManufacturer":"176003457107290991384576784569789099","deviceModel":"203936335876967844347234142039077985","deviceProcessorCount":15}},{"schema":"iglu:com.snowplowanalytics.snowplow/consent_document/jsonschema/1-0-0","data":{"id":"pfprxpoi","version":"b","description":"c"}},{"schema":"iglu:com.snowplowanalytics.snowplow/client_session/jsonschema/1-0-1","data":{"userId":"7707e20a-7ddd-4535-b6be-b27bcd2a9557","sessionId":"ec98ef04-d521-4c1c-adb4-9a5878ac6ca9","sessionIndex":203173097,"previousSessionId":"bf76d087-4890-4434-b8fb-86a7ef5c6def","storageMechanism":"SQLITE"}},{"schema":"iglu:com.snowplowanalytics.snowplow/change_form/jsonschema/1-0-0","data":{"formId":"mxzy","elementId":"dtcfxsmqgwv","nodeName":"TEXTAREA","type":"text","value":"f"}}]} 90ea775c-1aa0-4d40-a473-0b5796b44509 2021-09-17 09:05:28.590000001 com.snowplowanalytics.snowplow link_click jsonschema 1-0-1 2d3de1febeeaf6bdcfcbbdfddd0e2b9b """ + ) + ) + + override def description = "Asserting details of a single PARQUET transformed event" + override def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + override def inputBatches = List(InputBatch(goodEvent)) + override def countExpectations = CountExpectations(good = 1, bad = 0) + + override def customDataAssertion = Some { outputData => + outputData.good.head must beEqualTo(expectedOutputParquetDataAsJson) + } + + lazy val expectedOutputParquetDataAsJson = parser + .parse( + """ + |{ + | "app_id" : "spirit-walk", + | "platform" : "web", + | "etl_tstamp" : "2021-10-05T11:04:24.773Z", + | "collector_tstamp" : "2021-10-05T11:04:06.799Z", + | "event" : "unstruct", + | "event_id" : "a111920f-a2a3-45dc-8ae6-d8d86a383f5b", + | "name_tracker" : "datacap", + | "v_tracker" : "js-2.5.3-m1", + | "v_collector" : "ssc-2.2.1-pubsub", + | "v_etl" : "beam-enrich-1.2.0-common-1.1.0", + | "user_id" : "ada.blackjack@iglu.com", + | "user_ipaddress" : "0:7fff:9ad9:7fff:cd64:8000:af61:eb6d", + | "domain_userid" : "a0925757-3894-4631-9e6e-e572820bde76", + | "domain_sessionidx" : 9462, + | "network_userid" : "0791c60d-3c62-49c5-87e4-42581e909a85", + | "page_url" : "http://e.net/lfek", + | "page_title" : "Elit do sit consectetur ipsum adipiscing lorem dolor sed amet", + | "page_referrer" : "https://google.fr/morwmjx", + | "page_urlscheme" : "http", + | "page_urlhost" : "e.net", + | "page_urlpath" : "lfek", + | "refr_urlscheme" : "https", + | "refr_urlhost" : "google.fr", + | "refr_urlpath" : "morwmjx", + | "refr_medium" : "search", + | "refr_source" : "Google", + | "mkt_medium" : "email", + | "mkt_source" : "openemail", + | "mkt_campaign" : "igloosforall", + | "useragent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/601.1.56 (KHTML, like Gecko) Version/9.0 Safari/601.1.56", + | "domain_sessionid" : "90ea775c-1aa0-4d40-a473-0b5796b44509", + | "derived_tstamp" : "2021-09-17T09:05:28.590Z", + | "event_vendor" : "com.snowplowanalytics.snowplow", + | "event_name" : "link_click", + | "event_format" : "jsonschema", + | "event_version" : "1-0-1", + | "event_fingerprint" : "2d3de1febeeaf6bdcfcbbdfddd0e2b9b", + | "contexts_com_google_analytics_cookies_1" : [ + | { + | "__utma" : "E", + | "__utmb" : "lWcbfgEoyr", + | "__utmc" : "rqqjwkixOpg", + | "__utmv" : "l", + | "__utmz" : "k4Kn", + | "_ga" : "tvjkHopok" + | } + | ], + | "contexts_com_google_analytics_private_1" : [ + | { + | "gid" : "n7yse4eCgtm", + | "r" : 103569, + | "s" : 561759, + | "u" : "4SRvufkxnwvdvacwEedknvvtihtv7t4thdwffzbkt9nGpxhLcvliwCKpasetbbylw9hoxdajvbsci00ujotb2ntK3kvgrjqwoT7chiyvoxviqawkgdmmZe8shxiuplspCgki8kliptqnjsjpasFmdkhTzfmnlMGspowzbNawTlfegkfezEadqlmnbvv3qjtrEueqsagjbrucamlmwndnw2skrabwwT7hvreyckyvwgpchjAgzuml4rfxji7je233chSsmeutdxlbZonaFtoywafl1gyaeZl77odhhd9xretxiVndvrqgcxusmelrio6xowtkqfoyuwmeasls4DzmqesVt6igsesvxRRjyu6YqymoPpwfyf3idankobecpm5nndrhyiwc37p2oqku1yirYxqawehvsv3nlr0pzizcR9vorhdbwfbe2nqhi8wvwd" + | } + | ], + | "contexts_com_mparticle_snowplow_pushregistration_event_1" : [ + | { + | "name" : "s", + | "registration_token" : "x" + | } + | ], + | "contexts_com_mparticle_snowplow_session_context_1" : [ + | { + | "id" : "w" + | } + | ], + | "contexts_com_optimizely_state_1" : [ + | { + | "experiment_id" : "y", + | "is_active" : false, + | "variation_index" : 15901, + | "variation_name" : "uxhiptya1afmklYxovdeWkyym" + | } + | ], + | "contexts_com_optimizely_variation_1" : [ + | { + | "code" : "cm", + | "id" : "aygp70c7plcgfFkletcxePqzjsrfg", + | "name" : "b" + | } + | ], + | "contexts_com_optimizely_visitor_1" : [ + | { + | "browser" : "H", + | "device" : "dhqxkb2xhcxif6kb", + | "device_type" : "bwajz3uPbvgkolzu", + | "mobile" : true + | } + | ], + | "contexts_com_optimizely_optimizelyx_summary_1" : [ + | { + | "experiment_id" : 86409, + | "variation" : 94199, + | "variation_name" : "z0gdkswjpyckuA", + | "visitor_id" : "guHo9jXgVkkqgvttdbx" + | } + | ], + | "contexts_com_segment_screen_1" : [ + | { + | "name" : "nn5y1uOtbmT54qhatagqg6pjpfydxdzv" + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_change_form_1" : [ + | { + | "element_id" : "dtcfxsmqgwv", + | "form_id" : "mxzy", + | "node_name" : "TEXTAREA", + | "type" : "text", + | "value" : "f" + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_client_session_1" : [ + | { + | "previous_session_id" : "10f52998-686c-4186-83e6-87d4bf044360", + | "session_id" : "c57ab828-5186-48f6-880a-2b00d1d68d12", + | "session_index" : 1299327581, + | "storage_mechanism" : "SQLITE", + | "user_id" : "0a4d7fb4-1b41-4377-b777-43c84df3b432" + | }, + | { + | "previous_session_id" : "bf76d087-4890-4434-b8fb-86a7ef5c6def", + | "session_id" : "ec98ef04-d521-4c1c-adb4-9a5878ac6ca9", + | "session_index" : 203173097, + | "storage_mechanism" : "SQLITE", + | "user_id" : "7707e20a-7ddd-4535-b6be-b27bcd2a9557" + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_consent_document_1" : [ + | { + | "description" : "c", + | "id" : "pfprxpoi", + | "version" : "b" + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_consent_withdrawn_1" : [ + | { + | "all" : false + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_desktop_context_1" : [ + | { + | "device_manufacturer" : "176003457107290991384576784569789099", + | "device_model" : "203936335876967844347234142039077985", + | "device_processor_count" : 15.0, + | "os_is64_bit" : false, + | "os_service_pack" : "c", + | "os_type" : "Linux", + | "os_version" : "49365250426432071725495" + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_geolocation_context_1" : [ + | { + | "latitude" : 13.47832218152621, + | "longitude" : 180.0 + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_ua_parser_context_1" : [ + | { + | "device_family" : "Mac", + | "os_family" : "Mac OS X", + | "useragent_family" : "Firefox", + | "useragent_major" : "7", + | "useragent_minor" : "6" + | } + | ], + | "contexts_com_snowplowanalytics_snowplow_web_page_1" : [ + | { + | "id" : "15108107-7c6f-4df1-94cf-9d8d1229095c" + | } + | ], + | "contexts_org_ietf_http_cookie_1" : [ + | { + | "name" : "NoygypmvyarvZbsknVdbskuwoBaU3qcL", + | "value" : "jlhaofcMybVj33qrdumlnt5qoktdabaw" + | } + | ], + | "contexts_org_ietf_http_header_1" : [ + | { + | "name" : "7r72a6d4", + | "value" : "o5cVtuyjtorn4vfo" + | } + | ], + | "contexts_org_w3_performance_timing_1" : [ + | { + | "chrome_first_paint" : 4561, + | "connect_end" : 1, + | "connect_start" : 53541, + | "dom_complete" : 78711, + | "dom_content_loaded_event_end" : 95815, + | "dom_content_loaded_event_start" : 96121, + | "dom_interactive" : 1, + | "dom_loading" : 3987, + | "domain_lookup_end" : 1, + | "domain_lookup_start" : 1, + | "fetch_start" : 1, + | "load_event_end" : 12163, + | "load_event_start" : 7718, + | "ms_first_paint" : 100000, + | "navigation_start" : 12431, + | "proxy_end" : 1, + | "proxy_start" : 100000, + | "redirect_end" : 100000, + | "redirect_start" : 97743, + | "request_end" : 1, + | "request_start" : 100000, + | "response_end" : 1, + | "response_start" : 50588, + | "secure_connection_start" : 1, + | "unload_event_end" : 100000, + | "unload_event_start" : 79959 + | } + | ], + | "unstruct_event_com_snowplowanalytics_snowplow_link_click_1" : { + | "target_url" : "http://www.W.ru/q" + | } + |} + |""".stripMargin + ) + .right + .get +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5d1102ce9..d9eb74ebd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -72,13 +72,15 @@ object Dependencies { val catsTesting = "1.5.0" val catsEffectTestkit = "3.4.5" val scalaCheck = "1.17.0" + val eventGenerator = "0.4.0" val betterMonadicFor = "0.3.1" } val resolutionRepos = Seq( // Redshift native driver - ("redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release").withAllowInsecureProtocol(true) + ("redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release").withAllowInsecureProtocol(true), + ("Snowplow Analytics Maven repo" at "http://maven.snplow.com/releases/").withAllowInsecureProtocol(true) ) // Scala (Common) @@ -184,14 +186,14 @@ object Dependencies { val nettyCodec = "io.netty" % "netty-codec" % V.nettyCodec // Scala (test only) - val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test - val specs2ScalaCheck = "org.specs2" %% "specs2-scalacheck" % V.specs2 % Test - val scalaCheck = "org.scalacheck" %% "scalacheck" % V.scalaCheck % Test - val catsTesting = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsTesting % Test - val catsEffectTestkit = "org.typelevel" %% "cats-effect-testkit" % V.catsEffectTestkit % Test - val catsEffectLaws = "org.typelevel" %% "cats-effect-laws" % V.catsEffect % Test - val fs2BlobstoreCore = "com.github.fs2-blobstore" %% "core" % V.fs2Blobstore % Test - + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val specs2ScalaCheck = "org.specs2" %% "specs2-scalacheck" % V.specs2 % Test + val scalaCheck = "org.scalacheck" %% "scalacheck" % V.scalaCheck % Test + val catsTesting = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsTesting % Test + val catsEffectTestkit = "org.typelevel" %% "cats-effect-testkit" % V.catsEffectTestkit % Test + val catsEffectLaws = "org.typelevel" %% "cats-effect-laws" % V.catsEffect % Test + val fs2BlobstoreCore = "com.github.fs2-blobstore" %% "core" % V.fs2Blobstore % Test + val eventGenerator = "com.snowplowanalytics" %% "snowplow-event-generator-core" % V.eventGenerator % Test // compiler plugins val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor @@ -342,7 +344,8 @@ object Dependencies { ) val transformerKafkaDependencies = Seq( - hadoopAzure + hadoopAzure, + eventGenerator ) val commonStreamTransformerExclusions =