diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e5fa28ba..66d4bf23f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,30 +10,8 @@ on: pull_request: jobs: - test: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Set up JDK 11 - uses: actions/setup-java@v1 - with: - java-version: 11 - - name: Check Scala formatting - if: ${{ always() }} - run: sbt scalafmtCheckAll - - name: Run tests - run: sbt coverage test coverageReport - - name: Aggregate coverage data - if: ${{ always() }} - run: sbt coverageAggregate - - name: Submit coveralls data - if: ${{ always() }} - run: sbt coveralls - env: - COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} publish_docker: - needs: test if: startsWith(github.ref, 'refs/tags/') runs-on: ubuntu-latest strategy: @@ -139,52 +117,3 @@ jobs: command: monitor env: SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} - - create_release: - needs: test - if: startsWith(github.ref, 'refs/tags/') - runs-on: ubuntu-latest - steps: - - name: Checkout Github - uses: actions/checkout@v2 - - uses: coursier/cache-action@v6 - - name: Set up JDK 11 for loader and streaming transformer - uses: actions/setup-java@v1 - with: - java-version: 11 - - name: Get the latest Databricks JDBC driver - run: | - curl https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/jdbc/2.6.33/DatabricksJDBC42-2.6.33.1055.zip --output DatabricksJDBC42.jar.zip - unzip DatabricksJDBC42.jar.zip - - name: Build jar files - env: - SKIP_TEST: true - run: | - sbt \ - 'project redshiftLoader; assembly' \ - 'project snowflakeLoader; assembly' \ - 'project databricksLoader; assembly' \ - 'project transformerBatch; assembly' \ - 'project transformerKinesis; assembly' \ - 'project transformerPubsub; assembly' \ - 'project transformerKafka; assembly' - - name: Get current version - id: ver - run: echo "::set-output name=project_version::${GITHUB_REF#refs/tags/}" - - name: Create GitHub release and attach jars - uses: softprops/action-gh-release@v1 - with: - draft: true - prerelease: true - name: ${{ steps.ver.outputs.project_version }} - tag_name: ${{ steps.ver.outputs.project_version }} - files: | - modules/redshift-loader/target/scala-2.12/snowplow-redshift-loader-${{ steps.ver.outputs.project_version }}.jar - modules/snowflake-loader/target/scala-2.12/snowplow-snowflake-loader-${{ steps.ver.outputs.project_version }}.jar - modules/databricks-loader/target/scala-2.12/snowplow-databricks-loader-${{ steps.ver.outputs.project_version }}.jar - modules/transformer-batch/target/scala-2.12/snowplow-transformer-batch-${{ steps.ver.outputs.project_version }}.jar - modules/transformer-kinesis/target/scala-2.12/snowplow-transformer-kinesis-${{ steps.ver.outputs.project_version }}.jar - modules/transformer-pubsub/target/scala-2.12/snowplow-transformer-pubsub-${{ steps.ver.outputs.project_version }}.jar - modules/transformer-kafka/target/scala-2.12/snowplow-transformer-kafka-${{ steps.ver.outputs.project_version }}.jar - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/config/transformer/gcp/transformer.pubsub.config.reference.hocon b/config/transformer/gcp/transformer.pubsub.config.reference.hocon index 3819e532f..97943b574 100644 --- a/config/transformer/gcp/transformer.pubsub.config.reference.hocon +++ b/config/transformer/gcp/transformer.pubsub.config.reference.hocon @@ -12,6 +12,10 @@ # Optional. Default value '1 hour'. The maximum period a message ack deadline will be extended. "maxAckExtensionPeriod": "1 hour" + + # Optional. Sets a lower-bound on how the pubsub Subscriber extends ack deadlines. Most relevant after the app + # first starts up, until the underlying Subscriber has metrics on how long we take to process an event. + "minDurationPerAckExtension": "60 seconds" } # Path to transformed archive diff --git a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala index d8120595c..4e9c86725 100644 --- a/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala +++ b/modules/common-transformer-stream/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/Config.scala @@ -69,7 +69,8 @@ object Config { parallelPullCount: Int, bufferSize: Int, maxAckExtensionPeriod: FiniteDuration, - maxOutstandingMessagesSize: Option[Long] + maxOutstandingMessagesSize: Option[Long], + minDurationPerAckExtension: FiniteDuration ) extends StreamInput { val (projectId, subscriptionId) = subscription.split("/").toList match { diff --git a/modules/transformer-pubsub/src/main/resources/application.conf b/modules/transformer-pubsub/src/main/resources/application.conf index 94e1b653d..0d67e34c6 100644 --- a/modules/transformer-pubsub/src/main/resources/application.conf +++ b/modules/transformer-pubsub/src/main/resources/application.conf @@ -5,6 +5,7 @@ "parallelPullCount": 1 "bufferSize": 500 "maxAckExtensionPeriod": "1 hours" + "minDurationPerAckExtension": "60 seconds" } "output": { diff --git a/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala b/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala index 5273431e6..620a62ca8 100644 --- a/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala +++ b/modules/transformer-pubsub/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/Main.scala @@ -15,6 +15,7 @@ import org.typelevel.log4cats.Logger import com.google.api.gax.batching.FlowControlSettings import com.google.api.gax.core.ExecutorProvider import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors} +import org.threeten.bp.{Duration => ThreetenDuration} import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit} import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} @@ -80,6 +81,7 @@ object Main extends IOApp { def getExecutor: ScheduledExecutorService = scheduledExecutorService } } + s.setMinDurationPerAckExtension(ThreetenDuration.ofMillis(conf.minDurationPerAckExtension.toMillis)) } ) case _ => diff --git a/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala b/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala index 7cfef07b3..67115d118 100644 --- a/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala +++ b/modules/transformer-pubsub/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/pubsub/ConfigSpec.scala @@ -71,7 +71,8 @@ object ConfigSpec { parallelPullCount = 1, bufferSize = 500, maxAckExtensionPeriod = 1.hour, - maxOutstandingMessagesSize = None + maxOutstandingMessagesSize = None, + minDurationPerAckExtension = 60.seconds ) val exampleWindowPeriod = 5.minutes val exampleOutput = Config.Output.GCS( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ff2577c93..baefe306d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -62,6 +62,7 @@ object Dependencies { val enumeratum = "1.7.0" val aws = "1.12.261" val aws2 = "2.21.33" + val pubsub = "1.125.13" val jSch = "0.2.1" val sentry = "1.7.30" val protobuf = "3.21.7" // Fix CVE @@ -193,6 +194,7 @@ object Dependencies { val aws2kinesis = "software.amazon.awssdk" % "kinesis" % V.aws2 val aws2regions = "software.amazon.awssdk" % "regions" % V.aws2 val aws2sts = "software.amazon.awssdk" % "sts" % V.aws2 % Runtime + val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf val nettyCodec = "io.netty" % "netty-codec" % V.nettyCodec val zookeeper = "org.apache.zookeeper" % "zookeeper" % V.zookeeper @@ -227,6 +229,7 @@ object Dependencies { val gcpDependencies = Seq( fs2BlobstoreGCS, fs2PubSub, + pubsub, secretManager, gcpStorage )