Skip to content

Commit

Permalink
amendment 1: throttle acks per second
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 30, 2023
1 parent 5593fd6 commit 414e4b5
Show file tree
Hide file tree
Showing 7 changed files with 652 additions and 80 deletions.
71 changes: 0 additions & 71 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,8 @@ on:
pull_request:

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Check Scala formatting
if: ${{ always() }}
run: sbt scalafmtCheckAll
- name: Run tests
run: sbt coverage test coverageReport
- name: Aggregate coverage data
if: ${{ always() }}
run: sbt coverageAggregate
- name: Submit coveralls data
if: ${{ always() }}
run: sbt coveralls
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}

publish_docker:
needs: test
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -138,52 +116,3 @@ jobs:
command: monitor
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}

create_release:
needs: test
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
steps:
- name: Checkout Github
uses: actions/checkout@v2
- uses: coursier/cache-action@v6
- name: Set up JDK 11 for loader and streaming transformer
uses: actions/setup-java@v1
with:
java-version: 11
- name: Get the latest Databricks JDBC driver
run: |
curl https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/jdbc/2.6.33/DatabricksJDBC42-2.6.33.1055.zip --output DatabricksJDBC42.jar.zip
unzip DatabricksJDBC42.jar.zip
- name: Build jar files
env:
SKIP_TEST: true
run: |
sbt \
'project redshiftLoader; assembly' \
'project snowflakeLoader; assembly' \
'project databricksLoader; assembly' \
'project transformerBatch; assembly' \
'project transformerKinesis; assembly' \
'project transformerPubsub; assembly' \
'project transformerKafka; assembly'
- name: Get current version
id: ver
run: echo "::set-output name=project_version::${GITHUB_REF#refs/tags/}"
- name: Create GitHub release and attach jars
uses: softprops/action-gh-release@v1
with:
draft: true
prerelease: true
name: ${{ steps.ver.outputs.project_version }}
tag_name: ${{ steps.ver.outputs.project_version }}
files: |
modules/redshift-loader/target/scala-2.12/snowplow-redshift-loader-${{ steps.ver.outputs.project_version }}.jar
modules/snowflake-loader/target/scala-2.12/snowplow-snowflake-loader-${{ steps.ver.outputs.project_version }}.jar
modules/databricks-loader/target/scala-2.12/snowplow-databricks-loader-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-batch/target/scala-2.12/snowplow-transformer-batch-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kinesis/target/scala-2.12/snowplow-transformer-kinesis-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-pubsub/target/scala-2.12/snowplow-transformer-pubsub-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kafka/target/scala-2.12/snowplow-transformer-kafka-${{ steps.ver.outputs.project_version }}.jar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import java.net.URI

import cats.implicits._

import cats.effect.Sync
import cats.effect.implicits._
import cats.effect.{Async, Sync}

import io.circe.syntax.EncoderOps

Expand Down Expand Up @@ -56,7 +57,7 @@ object Completion {
* @param state
* all metadata shredder extracted from a batch
*/
def seal[F[_]: Sync, C: Checkpointer[F, *]](
def seal[F[_]: Async, C: Checkpointer[F, *]](
blobStorage: BlobStorage[F],
compression: Compression,
getTypes: Set[Data.ShreddedType] => TypesInfo,
Expand Down Expand Up @@ -84,8 +85,9 @@ object Completion {
)
body = message.selfDescribingData(legacyMessageFormat).asJson.noSpaces
_ <- writeFile(blobStorage, shreddingCompletePath, body)
_ <- Checkpointer[F, C].checkpoint(state.checkpointer)
fiber <- Checkpointer[F, C].checkpoint(state.checkpointer).start
_ <- producer.send(body)
_ <- fiber.join
} yield ()

def writeFile[F[_]: Sync](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ object Processing {
.through(windowing)

val sink: Pipe[F, Record[Window, List[(SinkPath, Transformed.Data)], State[C]], Unit] =
_.through(getSink(resources, config.output, config.formats))
_.through(getSink(resources, config.output, config.formats)).prefetch
.evalMap(onComplete)

Shutdown
Expand Down
Loading

0 comments on commit 414e4b5

Please sign in to comment.