Skip to content

Commit

Permalink
pubsub transformer: increase subscriber's awaitTermiantePeriod (close #…
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 4, 2023
1 parent 2a24091 commit 1233f7b
Show file tree
Hide file tree
Showing 15 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# Optional. Sets a lower-bound on how the pubsub Subscriber extends ack deadlines. Most relevant after the app
# first starts up, until the underlying Subscriber has metrics on how long we take to process an event.
"minDurationPerAckExtension": "60 seconds"

# Optional. How long we allow the pubsub Subscriber to ack any outstanding events during clean shutdown
"awaitTerminatePeriod": "60 seconds"
}

# Path to transformed archive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object Config {
bufferSize: Int,
maxAckExtensionPeriod: FiniteDuration,
maxOutstandingMessagesSize: Option[Long],
minDurationPerAckExtension: FiniteDuration
minDurationPerAckExtension: FiniteDuration,
awaitTerminatePeriod: FiniteDuration
) extends StreamInput {
val (projectId, subscriptionId) =
subscription.split("/").toList match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ object QueueBadSinkSpec {
| "parallelPullCount": 1
| "bufferSize": 500
| "maxAckExtensionPeriod": "1 hours"
| "minDurationPerAckExtension": "60 seconds"
| "awaitTerminatePeriod": "5 seconds"
| }
| "output": {
| "path": "${outputPath.toNioPath.toUri.toString}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ object ShredTsvProcessingSpec {
| "parallelPullCount": 1
| "bufferSize": 500
| "maxAckExtensionPeriod": "1 hours"
| "minDurationPerAckExtension": "60 seconds"
| "awaitTerminatePeriod": "5 seconds"
| }
| "output": {
| "path": "${outputPath.toNioPath.toUri.toString}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ object WiderowJsonProcessingSpec {
| "parallelPullCount": 1
| "bufferSize": 500
| "maxAckExtensionPeriod": "1 hours"
| "minDurationPerAckExtension": "60 seconds"
| "awaitTerminatePeriod": "5 seconds"
| }
| "output": {
| "path": "${outputPath.toNioPath.toUri.toString}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ object ConfigSpec {
subscription = "projects/project-id/subscriptions/subscription-id",
customPubsubEndpoint = None,
parallelPullCount = 1,
awaitTerminatePeriod = 30.seconds,
bufferSize = 10
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ object Pubsub {
parallelPullCount: Int,
bufferSize: Int,
maxAckExtensionPeriod: FiniteDuration,
awaitTerminatePeriod: FiniteDuration,
customPubsubEndpoint: Option[String] = None,
customizeSubscriber: Subscriber.Builder => Subscriber.Builder = identity,
postProcess: Option[Queue.Consumer.PostProcess[F]] = None
Expand All @@ -111,6 +112,7 @@ object Pubsub {
parallelPullCount = parallelPullCount,
maxQueueSize = bufferSize,
maxAckExtensionPeriod = maxAckExtensionPeriod,
awaitTerminatePeriod = awaitTerminatePeriod,
customizeSubscriber = {
val customChannel: Subscriber.Builder => Subscriber.Builder = channelProvider
.map { c => b: Subscriber.Builder =>
Expand Down
1 change: 1 addition & 0 deletions modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"cloud": "dummy"
"messageQueue": {
"parallelPullCount": 1,
"awaitTerminatePeriod": "30 seconds"
"bufferSize": 10
"consumerConf": {
"enable.auto.commit": "false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ object Config {
subscription: String,
customPubsubEndpoint: Option[String],
parallelPullCount: Int,
awaitTerminatePeriod: FiniteDuration,
bufferSize: Int
) {
val (projectId, subscriptionId) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ object Environment {
parallelPullCount = c.messageQueue.parallelPullCount,
bufferSize = c.messageQueue.bufferSize,
maxAckExtensionPeriod = config.timeouts.loading,
awaitTerminatePeriod = c.messageQueue.awaitTerminatePeriod,
customPubsubEndpoint = c.messageQueue.customPubsubEndpoint,
postProcess = Some(postProcess)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ConfigSpec extends Specification {
)
val awsConfig = exampleCloud
val gcpConfig = Config.Cloud.GCP(
messageQueue = Config.Cloud.GCP.Pubsub("projects/project-id/subscriptions/subscription-id", None, 1, 1)
messageQueue = Config.Cloud.GCP.Pubsub("projects/project-id/subscriptions/subscription-id", None, 1, 30.seconds, 1)
)
val azureConfig = Config.Cloud.Azure(
URI.create("https://test.blob.core.windows.net/test-container/"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ConfigSpec extends Specification {
subscription = "projects/project-id/subscriptions/subscription-id",
customPubsubEndpoint = None,
parallelPullCount = 1,
awaitTerminatePeriod = 30.seconds,
bufferSize = 10
)
)
Expand Down Expand Up @@ -165,6 +166,7 @@ class ConfigSpec extends Specification {
subscription = "projects/project-id/subscriptions/subscription-id",
customPubsubEndpoint = None,
parallelPullCount = 1,
awaitTerminatePeriod = 30.seconds,
bufferSize = 10
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"bufferSize": 500
"maxAckExtensionPeriod": "1 hours"
"minDurationPerAckExtension": "60 seconds"
"awaitTerminatePeriod": "60 seconds"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object Main extends IOApp {
parallelPullCount = conf.parallelPullCount,
bufferSize = conf.bufferSize,
maxAckExtensionPeriod = conf.maxAckExtensionPeriod,
awaitTerminatePeriod = conf.awaitTerminatePeriod,
customPubsubEndpoint = conf.customPubsubEndpoint,
customizeSubscriber = { s =>
s.setFlowControlSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ object ConfigSpec {
bufferSize = 500,
maxAckExtensionPeriod = 1.hour,
maxOutstandingMessagesSize = None,
minDurationPerAckExtension = 60.seconds
minDurationPerAckExtension = 60.seconds,
awaitTerminatePeriod = 60.seconds
)
val exampleWindowPeriod = 5.minutes
val exampleOutput = Config.Output.GCS(
Expand Down

0 comments on commit 1233f7b

Please sign in to comment.