gfc-aws-kinesis
Scala wrapper around AWS Kinesis Client Library.
A fork and new home of the now unmaintained Gilt Foundation Classes (com.gilt.gfc
), now called the GFC Collective, maintained by some of the original authors.
Getting gfc-aws-kinesis
The latest version is 1.0.0, released on 21/Jan/2020 and cross-built against Scala 2.12.x and 2.13.x.
If you're using SBT, add the following line to your build file:
libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis" % "1.0.0"
SBT Akka stream (2.5.x) dependency:
libraryDependencies += "org.gfccollective" %% "gfc-aws-kinesis-akka" % "1.0.0"
For Maven and other build tools, you can visit search.maven.org. (This search will also list other available libraries from the GFC Collective.)
Basic usage
Consume events:
implicit object StringRecordReader extends KinesisRecordReader[String]{
override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
}
val config = KCLConfiguration("consumer-name", "kinesis-stream-name")
KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
// .. do something with A
Future.successful(())
}
Publish events:
implicit object StringRecordWriter extends KinesisRecordWriter[String] {
override def toKinesisRecord(a: String) : KinesisRecord = {
KinesisRecord("partition-key", a.getBytes("UTF-8"))
}
}
val publisher = KinesisPublisher()
val messages = Seq("Hello World!", "foo bar", "baz bam")
val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)
DynamoDB streaming
Create the adapter client
val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
new AmazonDynamoDBStreamsAdapterClient()
Pass the adapter client in the configuration
val streamSource = {
val streamConfig = KinesisStreamConsumerConfig[Option[A]](
applicationName,
config.stream,
regionName = Some(config.region),
checkPointInterval = config.checkpointInterval,
initialPositionInStream = config.streamPosition,
dynamoDBKinesisAdapterClient = streamAdapterClient
)
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
}
Pass an implicit kinesis record reader suitable for dynamodb events
implicit val kinesisRecordReader
: KinesisRecordReader[Option[A]] =
new KinesisRecordReader[Option[A]] {
override def apply(record: Record): Option[A] = {
record match {
case recordAdapter: RecordAdapter =>
val dynamoRecord: DynamoRecord =
recordAdapter.getInternalObject
dynamoRecord.getEventName match {
case "INSERT" =>
ScanamoFree
.read[A](
dynamoRecord.getDynamodb.getNewImage)
.toOption
case _ => None
}
case _ => None
}
}
}
Consume e.g. using a sink
val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)
streamSource
.filter(!_.isEmpty)
.map(_.get)
.log(applicationName)(log)
.runWith(targetSink)
License
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0