diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/InputBatch.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/integrationtestutils/InputBatch.scala similarity index 65% rename from modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/InputBatch.scala rename to modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/integrationtestutils/InputBatch.scala index 3d1c6cfa2..cc01b38dc 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/InputBatch.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/integrationtestutils/InputBatch.scala @@ -1,14 +1,31 @@ -package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental -import com.snowplowanalytics.snowplow.analytics.scalasdk.Event -import com.snowplowanalytics.snowplow.eventgen.enrich.{SdkEvent => EventGenerator} -import com.snowplowanalytics.snowplow.eventgen.protocol.event.{EventFrequencies, UnstructEventFrequencies} -import org.scalacheck.Gen -import org.scalacheck.rng.Seed +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils import java.time.Instant + import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.Random +import org.scalacheck.Gen +import org.scalacheck.rng.Seed + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.eventgen.enrich.{SdkEvent => EventGenerator} +import com.snowplowanalytics.snowplow.eventgen.protocol.event.{EventFrequencies, UnstructEventFrequencies} + final case class InputBatch(content: InputBatch.Content, delay: FiniteDuration = 0.minutes) { def delayed(value: FiniteDuration) = this.copy(delay = value) } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/integrationtestutils/ItUtils.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/integrationtestutils/ItUtils.scala new file mode 100644 index 000000000..4af66ca58 --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.rdbloader.common/integrationtestutils/ItUtils.scala @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils + +import scala.concurrent.duration.DurationInt + +import cats.implicits._ +import cats.effect.IO + +import retry._ + +import fs2.Stream +import fs2.concurrent.{Signal, SignallingRef} + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.Queue + +object ItUtils { + + private val timeout = 15.minutes + + def produceInput(batches: List[InputBatch], producer: Queue.Producer[IO]): Stream[IO, Unit] = + Stream.eval { + IO.sleep(10.seconds) *> batches.traverse_ { batch => + IO.sleep(batch.delay) *> InputBatch + .asTextLines(batch.content) + .parTraverse_(producer.send) + } + } + + def consumeAllIncomingWindows[A <: GetShreddingComplete]( + queueConsumer: Queue.Consumer[IO], + countExpectations: CountExpectations, + windowsAccumulator: SignallingRef[IO, WindowsAccumulator[A]], + getWindowOutput: LoaderMessage.ShreddingComplete => IO[A] + ): Stream[IO, Unit] = + queueConsumer.read + .map(_.content) + .map(parseShreddingCompleteMessage) + .evalMap(getWindowOutput(_)) + .evalMap { windowOutput => + windowsAccumulator.update(_.addWindow(windowOutput)) + } + .interruptWhen(allEventsProcessed(windowsAccumulator, countExpectations)) + .interruptAfter(timeout) + + private def allEventsProcessed[A <: GetShreddingComplete]( + windowsAccumulator: SignallingRef[IO, WindowsAccumulator[A]], + countExpectations: CountExpectations + ): Signal[IO, Boolean] = + windowsAccumulator + .map(_.getTotalNumberOfEvents >= countExpectations.total) + + def parseShreddingCompleteMessage(message: String): LoaderMessage.ShreddingComplete = + LoaderMessage + .fromString(message) + .right + .get + .asInstanceOf[LoaderMessage.ShreddingComplete] + + def retryUntilNonEmpty[A](io: IO[List[A]]): IO[List[A]] = + retryingOnFailures[List[A]]( + policy = RetryPolicies.capDelay[IO](15.minutes, RetryPolicies.constantDelay[IO](30.seconds)), + wasSuccessful = items => IO.delay(items.nonEmpty), + onFailure = (r, d) => IO.delay(println(s"$r - $d")) + )(io) + + final case class CountExpectations(good: Int, bad: Int) { + def total = good + bad + } + + final case class WindowsAccumulator[A <: GetShreddingComplete](value: List[A]) { + def addWindow(window: A): WindowsAccumulator[A] = + WindowsAccumulator(value :+ window) + + def getTotalNumberOfEvents: Long = + value.map { window => + val good = window.shredding_complete.count.map(_.good).getOrElse(0L) + val bad = window.shredding_complete.count.flatMap(_.bad).getOrElse(0L) + good + bad + }.sum + } + + trait GetShreddingComplete { + def shredding_complete: LoaderMessage.ShreddingComplete + } + + implicit class ManifestItemListUtils(items: List[LoaderMessage.ManifestItem]) { + def totalGood: Long = + items.foldLeft(0L)((acc, i) => acc + i.count.map(_.good).getOrElse(0L)) + } +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/AzureTestResources.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/AzureTestResources.scala new file mode 100644 index 000000000..37666e02b --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/AzureTestResources.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.experimental + +import java.util.UUID + +import cats.effect.{IO, Resource} + +import com.snowplowanalytics.snowplow.rdbloader.azure.{AzureKeyVault, KafkaConsumer, KafkaProducer} +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, SecretStore} + +trait AzureTestResources extends CloudResources { + val eventHubsUrlEnv = "TEST_LOADER_EVENTHUBS_URL" + val inputHubNameEnv = "TEST_LOADER_INPUT_HUB_NAME" + val outputHubNameEnv = "TEST_LOADER_OUTPUT_HUB_NAME" + val inputHubKeyEnv = "TEST_LOADER_INPUT_HUB_KEY" + val outputHubKeyEnv = "TEST_LOADER_OUTPUT_HUB_KEY" + val azureKeyVaultNameEnv = "TEST_LOADER_AZURE_KEY_VAULT_NAME" + + def createConsumer: Resource[IO, Queue.Consumer[IO]] = + KafkaConsumer + .consumer[IO]( + bootstrapServers = System.getenv(eventHubsUrlEnv), + topicName = System.getenv(outputHubNameEnv), + consumerConf = Map( + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "PLAIN", + "sasl.jaas.config" -> + s"""org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$$ConnectionString\" password=\"${System.getenv( + outputHubKeyEnv + )}\";""", + "group.id" -> s"test-consumer-${UUID.randomUUID()}", + "enable.auto.commit" -> "true", + "auto.offset.reset" -> "latest" + ) + ) + + def createProducer: Resource[IO, Queue.Producer[IO]] = + KafkaProducer + .producer[IO]( + bootstrapServers = System.getenv(eventHubsUrlEnv), + topicName = System.getenv(inputHubNameEnv), + producerConf = Map( + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "PLAIN", + "sasl.jaas.config" -> + s"""org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$$ConnectionString\" password=\"${System.getenv( + inputHubKeyEnv + )}\";""" + ) + ) + + def createSecretStore: Resource[IO, SecretStore[IO]] = + AzureKeyVault.create[IO](Some(System.getenv(azureKeyVaultNameEnv))) + + override def getCloudResourcesEnvVars: List[String] = List( + eventHubsUrlEnv, + inputHubNameEnv, + outputHubNameEnv, + inputHubKeyEnv, + outputHubKeyEnv, + azureKeyVaultNameEnv + ) +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/CloudResources.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/CloudResources.scala new file mode 100644 index 000000000..ad841ca33 --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/CloudResources.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.experimental + +import cats.effect.{IO, Resource} + +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, SecretStore} + +trait CloudResources { + + def createConsumer: Resource[IO, Queue.Consumer[IO]] + def createProducer: Resource[IO, Queue.Producer[IO]] + def createSecretStore: Resource[IO, SecretStore[IO]] + def getCloudResourcesEnvVars: List[String] +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/LoaderSpecification.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/LoaderSpecification.scala new file mode 100644 index 000000000..426a345dc --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/LoaderSpecification.scala @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.experimental + +import scala.concurrent.duration.DurationInt + +import doobie.ConnectionIO + +import cats.effect.{IO, Resource} +import cats.effect.std.Dispatcher +import cats.effect.unsafe.implicits.global + +import fs2.concurrent.SignallingRef + +import org.http4s.blaze.client.BlazeClientBuilder + +import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} +import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Metrics +import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, Monitoring, Transaction} +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, SecretStore} +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.ItUtils._ +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch + +import org.specs2.mutable.Specification + +abstract class LoaderSpecification extends Specification with TestDAO.Provider with StorageTargetProvider with AzureTestResources { + skipAllIf(anyEnvironmentVariableMissing()) + import LoaderSpecification._ + + def run[A]( + inputBatches: List[InputBatch], + countExpectations: CountExpectations, + dbActions: TestDAO => IO[A] + ): IO[(WindowsAccumulator[WindowOutput], A)] = + createResources + .use { resources => + for { + _ <- resources.testDAO.cleanDb + windowsAccumulator <- SignallingRef.of[IO, WindowsAccumulator[WindowOutput]](WindowsAccumulator(List.empty)) + consuming = consumeAllIncomingWindows[WindowOutput]( + resources.queueConsumer, + countExpectations, + windowsAccumulator, + getWindowOutput = sc => IO.pure(WindowOutput(sc)) + ) + producing = produceInput(inputBatches, resources.producer) + _ <- consuming.concurrently(producing).compile.drain + collectedWindows <- windowsAccumulator.get + dbActionResult <- dbActions(resources.testDAO) + } yield (collectedWindows, dbActionResult) + } + + def createResources: Resource[IO, TestResources] = + for { + consumer <- createConsumer + producer <- createProducer + implicit0(secretStore: SecretStore[IO]) <- createSecretStore + transaction <- createDbTransaction + testDAO = createDAO(transaction) + } yield TestResources(queueConsumer = consumer, producer = producer, testDAO = testDAO) + + def createDbTransaction(implicit secretStore: SecretStore[IO]): Resource[IO, Transaction[IO, ConnectionIO]] = { + val storage: StorageTarget = createStorageTarget + val timeouts: Config.Timeouts = Config.Timeouts( + loading = 15.minutes, + nonLoading = 15.minutes, + sqsVisibility = 15.minutes, + rollbackCommit = 15.minutes, + connectionIsValid = 15.minutes + ) + val readyCheck: Config.Retries = Config.Retries( + strategy = Config.Strategy.Constant, + attempts = Some(3), + backoff = 30.seconds, + cumulativeBound = None + ) + for { + implicit0(dispatcher: Dispatcher[IO]) <- Dispatcher.parallel[IO] + httpClient <- BlazeClientBuilder[IO].withExecutionContext(global.compute).resource + implicit0(logging: Logging[IO]) = Logging.loggingInterpreter[IO](List()) + periodicMetrics <- Resource.eval(Metrics.PeriodicMetrics.init[IO](List.empty, 1.minutes)) + implicit0(monitoring: Monitoring[IO]) = + Monitoring.monitoringInterpreter[IO](None, None, List.empty, None, httpClient, periodicMetrics) + transaction <- Transaction.interpreter[IO](storage, timeouts, readyCheck) + } yield transaction + } + + def anyEnvironmentVariableMissing(): Boolean = + (getCloudResourcesEnvVars ::: getStorageTargetEnvVars).exists(varName => System.getenv(varName) == null) +} + +object LoaderSpecification { + + final case class TestResources( + queueConsumer: Queue.Consumer[IO], + producer: Queue.Producer[IO], + testDAO: TestDAO + ) + + final case class WindowOutput( + shredding_complete: LoaderMessage.ShreddingComplete + ) extends GetShreddingComplete +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/StorageTargetProvider.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/StorageTargetProvider.scala new file mode 100644 index 000000000..c11c64c8a --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/StorageTargetProvider.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.experimental + +import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget + +trait StorageTargetProvider { + def createStorageTarget: StorageTarget + def getStorageTargetEnvVars: List[String] +} diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/TestDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/TestDAO.scala new file mode 100644 index 000000000..85cdff147 --- /dev/null +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/experimental/TestDAO.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.experimental + +import cats.effect.IO + +import doobie.ConnectionIO + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage +import com.snowplowanalytics.snowplow.rdbloader.dsl.Transaction + +trait TestDAO { + def cleanDb: IO[Unit] + def queryManifest: IO[List[LoaderMessage.ManifestItem]] + def queryEventIds: IO[List[String]] +} + +object TestDAO { + + trait Provider { + def createDAO(transaction: Transaction[IO, ConnectionIO]): TestDAO + } +} diff --git a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/it/AzureSnowflakeLoaderSpecs.scala b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/it/AzureSnowflakeLoaderSpecs.scala new file mode 100644 index 000000000..d020a09fc --- /dev/null +++ b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/it/AzureSnowflakeLoaderSpecs.scala @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.loader.snowflake.it + +import cats.effect.unsafe.implicits.global + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.ItUtils._ +import com.snowplowanalytics.snowplow.rdbloader.experimental.{AzureTestResources, LoaderSpecification} + +import org.specs2.matcher.MatchResult + +class AzureSnowflakeLoaderSpecs extends LoaderSpecification with SnowflakeTestResources with AzureTestResources { + sequential + + "Scenario 1" in { + val goodEvent = InputBatch.Content.TextLines( + List( + """snowplowweb web 2014-06-01 14:04:11.639 2014-05-29 18:16:35.000 2014-05-29 18:16:35.967 unstruct 2b1b25a4-c0df-4859-8201-cf21492ad61b 114221 clojure js-2.0.0-M2 clj-0.6.0-tom-0.0.4 hadoop-0.5.0-common-0.4.0 68.42.204.218 1242058182 58df65c46e1ac937 11 437ad25b-2006-455e-b5d8-d664b74df8f3 US MI Holland 49423 42.742294 -86.0661 http://snowplowanalytics.com/blog/ https://www.google.com/ http snowplowanalytics.com 80 /blog/ https www.google.com 80 / search Google {"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/WebPage/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]} {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-0","data":{"targetUrl":"http://snowplowanalytics.com/blog/page2","elementClasses":["next"]}}} Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.114 Safari/537.36 Chrome Chrome Browser WEBKIT en-US 1 1 1 0 1 0 0 0 1 1 24 1241 806 Mac OS Mac OS Apple Inc. America/New_York Computer 0 1440 900 UTF-8 """ + ) + ) + val inputBatches = List(InputBatch(goodEvent)) + val countExpectations = CountExpectations(good = 1, bad = 0) + eventCountCheck(inputBatches, countExpectations) + } + + "Scenario 2" in { + val countExpectations = CountExpectations(good = 100, bad = 0) + val inputBatches = List(InputBatch.good(countExpectations.good)) + eventCountCheck(inputBatches, countExpectations) + } + + def eventCountCheck(inputBatches: List[InputBatch], countExpectations: CountExpectations): MatchResult[Any] = { + case class DbActionResult(manifestItems: List[LoaderMessage.ManifestItem], eventIds: List[String]) + val res = for { + (windowAcc, dbActionResult) <- run[DbActionResult]( + inputBatches, + countExpectations, + dbActions = testDAO => + for { + manifestItems <- retryUntilNonEmpty(testDAO.queryManifest) + eventIds <- testDAO.queryEventIds + } yield DbActionResult(manifestItems, eventIds) + ) + } yield { + windowAcc.value.size must beEqualTo(dbActionResult.manifestItems.size) + dbActionResult.eventIds.size must beEqualTo(countExpectations.good) + dbActionResult.manifestItems.totalGood must beEqualTo(countExpectations.good) + } + res.unsafeRunSync() + } + +} diff --git a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/it/SnowflakeTestResources.scala b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/it/SnowflakeTestResources.scala new file mode 100644 index 000000000..baefb37fd --- /dev/null +++ b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/it/SnowflakeTestResources.scala @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.loader.snowflake.it + +import scala.concurrent.duration.DurationInt + +import cats.effect.IO + +import doobie.ConnectionIO +import doobie.implicits._ + +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage +import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget +import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget.LoadAuthMethod +import com.snowplowanalytics.snowplow.rdbloader.dsl.Transaction +import com.snowplowanalytics.snowplow.rdbloader.experimental.{StorageTargetProvider, TestDAO} + +trait SnowflakeTestResources extends TestDAO.Provider with StorageTargetProvider { + + val secretStoreParameterNameEnv = "TEST_LOADER_SECRET_STORE_PARAMETER" + val snowflakeRegionEnv = "TEST_LOADER_SNOWFLAKE_REGION" + val snowflakeUsernameEnv = "TEST_LOADER_SNOWFLAKE_USERNAME" + val snowflakeRoleEnv = "TEST_LOADER_SNOWFLAKE_ROLE" + val snowflakeAccountEnv = "TEST_LOADER_SNOWFLAKE_ACCOUNT" + val snowflakeWarehouseEnv = "TEST_LOADER_SNOWFLAKE_WAREHOUSE" + val snowflakeDatabaseEnv = "TEST_LOADER_SNOWFLAKE_DATABASE" + val snowflakeSchemaEnv = "TEST_LOADER_SNOWFLAKE_SCHEMA" + + override def createDAO(transaction: Transaction[IO, ConnectionIO]): TestDAO = new TestDAO { + override def cleanDb: IO[Unit] = + for { + _ <- transaction.run(sql"delete from atomic.events".update.run) + _ <- transaction.run(sql"delete from atomic.manifest".update.run) + } yield () + + override def queryManifest: IO[List[LoaderMessage.ManifestItem]] = + transaction.run( + sql"""select base, types, shredding_started, shredding_completed, + min_collector_tstamp, max_collector_tstamp, + compression, processor_artifact, processor_version, count_good + FROM atomic.manifest""".query[LoaderMessage.ManifestItem].stream.compile.toList + ) + + override def queryEventIds: IO[List[String]] = + for { + res <- transaction.run(sql"select event_id from atomic.events".query[String].stream.compile.toList) + } yield res + } + + override def createStorageTarget: StorageTarget = + StorageTarget.Snowflake( + snowflakeRegion = Some(System.getenv(snowflakeRegionEnv)), + username = System.getenv(snowflakeUsernameEnv), + role = Some(System.getenv(snowflakeRoleEnv)), + password = StorageTarget.PasswordConfig.EncryptedKey(StorageTarget.EncryptedConfig(System.getenv(secretStoreParameterNameEnv))), + account = Some(System.getenv(snowflakeAccountEnv)), + warehouse = System.getenv(snowflakeWarehouseEnv), + database = System.getenv(snowflakeDatabaseEnv), + schema = System.getenv(snowflakeSchemaEnv), + transformedStage = None, + appName = "loader-test", + folderMonitoringStage = None, + jdbcHost = None, + loadAuthMethod = LoadAuthMethod.TempCreds.AzureTempCreds(15.minutes), + readyCheck = StorageTarget.Snowflake.ResumeWarehouse + ) + + // For some reason, using variables above doesn't work here. + // Therefore, we've used directly strings in here. + override def getStorageTargetEnvVars: List[String] = List( + "TEST_LOADER_SECRET_STORE_PARAMETER", + "TEST_LOADER_SNOWFLAKE_REGION", + "TEST_LOADER_SNOWFLAKE_USERNAME", + "TEST_LOADER_SNOWFLAKE_ROLE", + "TEST_LOADER_SNOWFLAKE_ACCOUNT", + "TEST_LOADER_SNOWFLAKE_WAREHOUSE", + "TEST_LOADER_SNOWFLAKE_DATABASE", + "TEST_LOADER_SNOWFLAKE_SCHEMA" + ) +} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala index 5810c7029..3fc4a0c0c 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala @@ -2,18 +2,18 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experi import cats.effect.IO import cats.effect.unsafe.implicits.global -import cats.implicits._ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.{BlobObject, Folder} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.ItUtils._ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification._ import fs2.Stream import fs2.compression.{Compression => FS2Compression} -import fs2.concurrent.Signal.SignalOps -import fs2.concurrent.{Signal, SignallingRef} +import fs2.concurrent.SignallingRef import io.circe.Json import org.specs2.matcher.MatchResult import org.specs2.mutable.Specification @@ -21,7 +21,6 @@ import org.specs2.mutable.Specification import java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit -import scala.concurrent.duration.DurationInt // format: off /** @@ -51,8 +50,6 @@ import scala.concurrent.duration.DurationInt abstract class TransformerSpecification extends Specification with AppDependencies.Provider { - private val timeout = 15.minutes - protected def description: String protected def inputBatches: List[InputBatch] protected def countExpectations: CountExpectations @@ -67,9 +64,14 @@ abstract class TransformerSpecification extends Specification with AppDependenci createDependencies() .use { dependencies => for { - windowsAccumulator <- SignallingRef.of[IO, WindowsAccumulator](WindowsAccumulator(List.empty)) - consuming = consumeAllIncomingWindows(dependencies, countExpectations, windowsAccumulator) - producing = produceInput(inputBatches, dependencies) + windowsAccumulator <- SignallingRef.of[IO, WindowsAccumulator[WindowOutput]](WindowsAccumulator(List.empty)) + consuming = consumeAllIncomingWindows[WindowOutput]( + dependencies.queueConsumer, + countExpectations, + windowsAccumulator, + getWindowOutput = readWindowOutput(dependencies.blobClient) + ) + producing = produceInput(inputBatches, dependencies.producer) _ <- consuming.concurrently(producing).compile.drain collectedWindows <- windowsAccumulator.get } yield { @@ -82,35 +84,11 @@ abstract class TransformerSpecification extends Specification with AppDependenci } } - private def produceInput(batches: List[InputBatch], dependencies: AppDependencies): Stream[IO, Unit] = - Stream.eval { - IO.sleep(10.seconds) *> batches.traverse_ { batch => - IO.sleep(batch.delay) *> InputBatch - .asTextLines(batch.content) - .parTraverse_(dependencies.producer.send) - } - } - - private def consumeAllIncomingWindows( - dependencies: AppDependencies, - countExpectations: CountExpectations, - windowsAccumulator: SignallingRef[IO, WindowsAccumulator] - ): Stream[IO, Unit] = - dependencies.queueConsumer.read - .map(_.content) - .map(parseShreddingCompleteMessage) - .evalMap(readWindowOutput(dependencies.blobClient)) - .evalMap { windowOutput => - windowsAccumulator.update(_.addWindow(windowOutput)) - } - .interruptWhen(allEventsProcessed(windowsAccumulator, countExpectations)) - .interruptAfter(timeout) - private def assertSingleWindow(output: WindowOutput): MatchResult[Any] = { - output.`shredding_complete.json`.compression must beEqualTo(requiredAppConfig.compression) - output.`shredding_complete.json`.count.get.good must beEqualTo(output.goodEvents.size) - output.`shredding_complete.json`.count.get.bad.get must beEqualTo(output.badEvents.size) - output.`shredding_complete.json`.typesInfo must beLike { case TypesInfo.WideRow(fileFormat, _) => + output.shredding_complete.compression must beEqualTo(requiredAppConfig.compression) + output.shredding_complete.count.get.good must beEqualTo(output.goodEvents.size) + output.shredding_complete.count.get.bad.get must beEqualTo(output.badEvents.size) + output.shredding_complete.typesInfo must beLike { case TypesInfo.WideRow(fileFormat, _) => fileFormat must beEqualTo(requiredAppConfig.fileFormat) } } @@ -129,7 +107,7 @@ abstract class TransformerSpecification extends Specification with AppDependenci private def aggregateDataFromAllWindows(windows: List[WindowOutput]): AggregatedData = windows.foldLeft(AggregatedData.empty) { case (aggregated, window) => - val windowTypes = window.`shredding_complete.json`.typesInfo.asInstanceOf[TypesInfo.WideRow].types + val windowTypes = window.shredding_complete.typesInfo.asInstanceOf[TypesInfo.WideRow].types AggregatedData( good = window.goodEvents ::: aggregated.good, bad = window.badEvents ::: aggregated.bad, @@ -137,13 +115,6 @@ abstract class TransformerSpecification extends Specification with AppDependenci ) } - private def allEventsProcessed( - windowsAccumulator: SignallingRef[IO, WindowsAccumulator], - countExpectations: CountExpectations - ): Signal[IO, Boolean] = - windowsAccumulator - .map(_.getTotalNumberOfEvents >= countExpectations.total) - private def readWindowOutput(blobClient: BlobStorage[IO])(message: LoaderMessage.ShreddingComplete): IO[WindowOutput] = for { scMessageInStorage <- readSCMessageFromBlobStorage(message, blobClient) @@ -228,13 +199,6 @@ abstract class TransformerSpecification extends Specification with AppDependenci .get(message.base.withKey("shredding_complete.json")) .map(value => parseShreddingCompleteMessage(value.right.get)) - private def parseShreddingCompleteMessage(message: String): LoaderMessage.ShreddingComplete = - LoaderMessage - .fromString(message) - .right - .get - .asInstanceOf[LoaderMessage.ShreddingComplete] - } object TransformerSpecification { @@ -243,29 +207,13 @@ object TransformerSpecification { type DataRow = Json type DataAssertion = AggregatedData => MatchResult[Any] - final case class CountExpectations(good: Int, bad: Int) { - def total = good + bad - } - - final case class WindowsAccumulator(value: List[WindowOutput]) { - def addWindow(window: WindowOutput): WindowsAccumulator = - WindowsAccumulator(value :+ window) - - def getTotalNumberOfEvents: Long = - value.map { window => - val good = window.`shredding_complete.json`.count.map(_.good).getOrElse(0L) - val bad = window.`shredding_complete.json`.count.flatMap(_.bad).getOrElse(0L) - good + bad - }.sum - } - final case class WindowOutput( appId: String, producedAt: LocalDateTime, - `shredding_complete.json`: LoaderMessage.ShreddingComplete, + shredding_complete: LoaderMessage.ShreddingComplete, goodEvents: List[DataRow], badEvents: List[DataRow] - ) + ) extends GetShreddingComplete final case class AggregatedData( good: List[DataRow], diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/BadDetailsScenario.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/BadDetailsScenario.scala index a4f5b696d..2f7289265 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/BadDetailsScenario.scala +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/BadDetailsScenario.scala @@ -1,12 +1,9 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.Content -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ - AppConfiguration, - AzureTransformerSpecification, - InputBatch -} +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch.Content +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.ItUtils._ +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{AppConfiguration, AzureTransformerSpecification} class BadDetailsScenario extends AzureTransformerSpecification { diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala index fd248d9a2..9bd30602c 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala @@ -1,12 +1,10 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios + import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good} -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ - AppConfiguration, - AzureTransformerSpecification, - InputBatch -} +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch.{Content, bad, good} +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.ItUtils._ +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{AppConfiguration, AzureTransformerSpecification} import io.circe.parser import scala.concurrent.duration.DurationInt diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala index 3b45153f8..481266629 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala @@ -1,14 +1,11 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios -import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good} -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ - AppConfiguration, - AzureTransformerSpecification, - InputBatch -} +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch.{Content, bad, good} +import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.ItUtils._ +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{AppConfiguration, AzureTransformerSpecification} import io.circe.parser import scala.concurrent.duration.DurationInt diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 921d141e2..ac6332130 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -253,7 +253,8 @@ object Dependencies { scalaTrackerEmit, http4sClient, slf4jApi, - sentry + sentry, + eventGenerator ) val loaderDependencies = Seq( @@ -339,8 +340,7 @@ object Dependencies { ) val transformerKafkaDependencies = Seq( - hadoopAzure, - eventGenerator + hadoopAzure ) val commonStreamTransformerExclusions =