Skip to content

Commit

Permalink
pubsub transformer: Increase default value of minDurationPerAckExtens…
Browse files Browse the repository at this point in the history
…ion (close #1326)
  • Loading branch information
istreeter committed Dec 4, 2023
1 parent ca3c660 commit 42dfb60
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 73 deletions.
71 changes: 0 additions & 71 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,8 @@ on:
pull_request:

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Check Scala formatting
if: ${{ always() }}
run: sbt scalafmtCheckAll
- name: Run tests
run: sbt coverage test coverageReport
- name: Aggregate coverage data
if: ${{ always() }}
run: sbt coverageAggregate
- name: Submit coveralls data
if: ${{ always() }}
run: sbt coveralls
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}

publish_docker:
needs: test
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -139,52 +117,3 @@ jobs:
command: monitor
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}

create_release:
needs: test
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
steps:
- name: Checkout Github
uses: actions/checkout@v2
- uses: coursier/cache-action@v6
- name: Set up JDK 11 for loader and streaming transformer
uses: actions/setup-java@v1
with:
java-version: 11
- name: Get the latest Databricks JDBC driver
run: |
curl https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/jdbc/2.6.33/DatabricksJDBC42-2.6.33.1055.zip --output DatabricksJDBC42.jar.zip
unzip DatabricksJDBC42.jar.zip
- name: Build jar files
env:
SKIP_TEST: true
run: |
sbt \
'project redshiftLoader; assembly' \
'project snowflakeLoader; assembly' \
'project databricksLoader; assembly' \
'project transformerBatch; assembly' \
'project transformerKinesis; assembly' \
'project transformerPubsub; assembly' \
'project transformerKafka; assembly'
- name: Get current version
id: ver
run: echo "::set-output name=project_version::${GITHUB_REF#refs/tags/}"
- name: Create GitHub release and attach jars
uses: softprops/action-gh-release@v1
with:
draft: true
prerelease: true
name: ${{ steps.ver.outputs.project_version }}
tag_name: ${{ steps.ver.outputs.project_version }}
files: |
modules/redshift-loader/target/scala-2.12/snowplow-redshift-loader-${{ steps.ver.outputs.project_version }}.jar
modules/snowflake-loader/target/scala-2.12/snowplow-snowflake-loader-${{ steps.ver.outputs.project_version }}.jar
modules/databricks-loader/target/scala-2.12/snowplow-databricks-loader-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-batch/target/scala-2.12/snowplow-transformer-batch-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kinesis/target/scala-2.12/snowplow-transformer-kinesis-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-pubsub/target/scala-2.12/snowplow-transformer-pubsub-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kafka/target/scala-2.12/snowplow-transformer-kafka-${{ steps.ver.outputs.project_version }}.jar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

# Optional. Default value '1 hour'. The maximum period a message ack deadline will be extended.
"maxAckExtensionPeriod": "1 hour"

# Optional. Sets a lower-bound on how the pubsub Subscriber extends ack deadlines. Most relevant after the app
# first starts up, until the underlying Subscriber has metrics on how long we take to process an event.
"minDurationPerAckExtension": "60 seconds"
}

# Path to transformed archive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object Config {
parallelPullCount: Int,
bufferSize: Int,
maxAckExtensionPeriod: FiniteDuration,
maxOutstandingMessagesSize: Option[Long]
maxOutstandingMessagesSize: Option[Long],
minDurationPerAckExtension: FiniteDuration
) extends StreamInput {
val (projectId, subscriptionId) =
subscription.split("/").toList match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"parallelPullCount": 1
"bufferSize": 500
"maxAckExtensionPeriod": "1 hours"
"minDurationPerAckExtension": "60 seconds"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.typelevel.log4cats.Logger
import com.google.api.gax.batching.FlowControlSettings
import com.google.api.gax.core.ExecutorProvider
import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors}
import org.threeten.bp.{Duration => ThreetenDuration}

import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
Expand Down Expand Up @@ -80,6 +81,7 @@ object Main extends IOApp {
def getExecutor: ScheduledExecutorService = scheduledExecutorService
}
}
s.setMinDurationPerAckExtension(ThreetenDuration.ofMillis(conf.minDurationPerAckExtension.toMillis))
}
)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ object ConfigSpec {
parallelPullCount = 1,
bufferSize = 500,
maxAckExtensionPeriod = 1.hour,
maxOutstandingMessagesSize = None
maxOutstandingMessagesSize = None,
minDurationPerAckExtension = 60.seconds
)
val exampleWindowPeriod = 5.minutes
val exampleOutput = Config.Output.GCS(
Expand Down
3 changes: 3 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object Dependencies {
val enumeratum = "1.7.0"
val aws = "1.12.261"
val aws2 = "2.21.33"
val pubsub = "1.125.13"
val jSch = "0.2.1"
val sentry = "1.7.30"
val protobuf = "3.21.7" // Fix CVE
Expand Down Expand Up @@ -193,6 +194,7 @@ object Dependencies {
val aws2kinesis = "software.amazon.awssdk" % "kinesis" % V.aws2
val aws2regions = "software.amazon.awssdk" % "regions" % V.aws2
val aws2sts = "software.amazon.awssdk" % "sts" % V.aws2 % Runtime
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf
val nettyCodec = "io.netty" % "netty-codec" % V.nettyCodec
val zookeeper = "org.apache.zookeeper" % "zookeeper" % V.zookeeper
Expand Down Expand Up @@ -227,6 +229,7 @@ object Dependencies {
val gcpDependencies = Seq(
fs2BlobstoreGCS,
fs2PubSub,
pubsub,
secretManager,
gcpStorage
)
Expand Down

0 comments on commit 42dfb60

Please sign in to comment.