-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathFirehoseEcho.scala
112 lines (93 loc) · 4.29 KB
/
FirehoseEcho.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package alpakka.kinesis
import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Attributes
import org.apache.pekko.stream.connectors.kinesisfirehose.scaladsl.KinesisFirehoseFlow
import org.apache.pekko.stream.connectors.s3.AccessStyle.PathAccessStyle
import org.apache.pekko.stream.connectors.s3.scaladsl.S3
import org.apache.pekko.stream.connectors.s3.{ListBucketResultContents, S3Attributes, S3Settings}
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import org.apache.pekko.util.ByteString
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient
import software.amazon.awssdk.services.firehose.model.{PutRecordBatchResponseEntry, Record as RecordFirehose}
import java.net.URI
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
/**
* Show the possibilities of a "Firehose pipeline"; eg
* producerClientFirehose()
* --> Elasticsearch -> Check entries manually via browserClient()
* +-> S3 -> Check via countFilesBucket()
*
* Run via [[alpakka.firehose.FirehoseEchoIT]] against localStack docker container
* Possible to run against AWS, after a all the resources are setup via console
*
* Doc:
* https://docs.localstack.cloud/user-guide/aws/kinesis-firehose
* https://doc.akka.io/docs/alpakka/current/kinesis.html
*/
class FirehoseEcho(urlWithMappedPort: URI = new URI("http://localhost:4566"), accessKey: String = "accessKey", secretKey: String = "secretKey", region: String = "us-east-1") {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
implicit val system: ActorSystem = ActorSystem("FirehoseEcho")
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val firehoseStreamName = "activity-to-elasticsearch-local"
val s3BucketName = "kinesis-activity-backup-local"
val batchSize = 10
val credentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
private val s3Settings: S3Settings =
S3Settings()
.withAccessStyle(PathAccessStyle)
.withEndpointUrl(urlWithMappedPort.toString)
.withCredentialsProvider(credentialsProvider)
implicit private val s3attributes: Attributes = S3Attributes.settings(s3Settings)
implicit val awsFirehoseClient: FirehoseAsyncClient = {
FirehoseAsyncClient
.builder()
.endpointOverride(urlWithMappedPort)
.credentialsProvider(credentialsProvider)
.region(Region.of(region))
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
.build()
}
system.registerOnTermination(awsFirehoseClient.close())
def run(): Int = {
val done = for {
_ <- producerClientFirehose()
filesFut <- countFilesBucket()
} yield filesFut
val result = Await.result(done, 80.seconds)
result.size
}
private def producerClientFirehose() = {
logger.info(s"About to start Firehose upload...")
val firehoseFlow: Flow[RecordFirehose, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow(firehoseStreamName)
val done = Source(1 to batchSize)
.map(each => convertToBatchRecord(each))
.via(firehoseFlow)
.runWith(Sink.seq)
done.onComplete(result => logger.info(s"Successfully uploaded: ${result.get.size} records"))
done
}
private def convertToBatchRecord(each: Int): RecordFirehose = {
val payload = s"{ \"target\": \"myTarget_$each\" }"
RecordFirehose.builder().data(SdkBytes.fromByteBuffer(ByteString(payload).asByteBuffer)).build()
}
private def countFilesBucket() = {
val resultFut: Future[Seq[ListBucketResultContents]] = S3
.listBucket(s3BucketName, None)
.withAttributes(s3attributes)
.runWith(Sink.seq)
resultFut.onComplete(result => logger.info(s"Number of files in bucket: ${result.get.size}"))
resultFut
}
}
object FirehoseEcho extends App {
// Use to connect to localStack with default params, eg when localStack image is run via Cockpit
val echo = new FirehoseEcho()
echo.run()
}