gfc-aws-kinesis
Scala wrapper around AWS Kinesis Client Library. Part of the Gilt Foundation Classes.
Getting gfc-aws-kinesis
The latest version is 0.17.0, which is cross-built against Scala 2.11.x and 2.12.x.
SBT dependency:
libraryDependencies += "com.gilt" %% "gfc-aws-kinesis" % "0.17.0"
SBT Akka stream (2.5.x) dependency:
libraryDependencies += "com.gilt" %% "gfc-aws-kinesis-akka" % "0.17.0"
Basic usage
Consume events:
implicit object StringRecordReader extends KinesisRecordReader[String]{
override def apply(r: Record) : String = new String(r.getData.array(), "UTF-8")
}
val config = KCLConfiguration("consumer-name", "kinesis-stream-name")
KCLWorkerRunner(config).runAsyncSingleRecordProcessor[String](1 minute) { a: String =>
// .. do something with A
Future.successful(())
}
Publish events:
implicit object StringRecordWriter extends KinesisRecordWriter[String] {
override def toKinesisRecord(a: String) : KinesisRecord = {
KinesisRecord("partition-key", a.getBytes("UTF-8"))
}
}
val publisher = KinesisPublisher()
val messages = Seq("Hello World!", "foo bar", "baz bam")
val result: Future[KinesisPublisherBatchResult] = publisher.publishBatch("kinesis-stream-name", messages)
DynamoDB streaming
Create the adapter client
val streamAdapterClient: AmazonDynamoDBStreamsAdapterClient =
new AmazonDynamoDBStreamsAdapterClient()
Pass the adapter client in the configuration
val streamSource = {
val streamConfig = KinesisStreamConsumerConfig[Option[A]](
applicationName,
config.stream,
regionName = Some(config.region),
checkPointInterval = config.checkpointInterval,
initialPositionInStream = config.streamPosition,
dynamoDBKinesisAdapterClient = streamAdapterClient
)
KinesisStreamSource(streamConfig).mapMaterializedValue(_ => NotUsed)
}
Pass an implicit kinesis record reader suitable for dynamodb events
implicit val kinesisRecordReader
: KinesisRecordReader[Option[A]] =
new KinesisRecordReader[Option[A]] {
override def apply(record: Record): Option[A] = {
record match {
case recordAdapter: RecordAdapter =>
val dynamoRecord: DynamoRecord =
recordAdapter.getInternalObject
dynamoRecord.getEventName match {
case "INSERT" =>
ScanamoFree
.read[A](
dynamoRecord.getDynamodb.getNewImage)
.toOption
case _ => None
}
case _ => None
}
}
}
Consume e.g. using a sink
val targetSink = Sink.actorRefWithAck(target, startMsg, ackMsg, Done)
streamSource
.filter(!_.isEmpty)
.map(_.get)
.log(applicationName)(log)
.runWith(targetSink)