In this post we get a Spark streaming application working with AWS Kinesis stream, a mocked version of Kinesis running locally on Localstack. In earlier posts we have explained how to get Localstack running and various AWS services up on Localstack. The client connections to AWS services (Localstack) is done using AWS cli and AWS Java-Sdk v1.
Environment: This set-up continues on a Ubuntu20.04, with Java-8, Maven-3.6x, Docker-24.0x, Python3.5, PySpark/ Spark-2.1.0, Localstack-3.8.1, AWS Java-Sdk-v1 (ver.1.12.778),
Once the Localstack installation is done, steps to follow are:
(I) Start Localstack
# Start locally
localstack start
That should get Localstack should be running on: http://localhost:4566
(II) Check Kinesis services from CLI on Localstack
# List Streams
aws --endpoint-url=http://localhost:4566 kinesis list-streams
# Create Stream
aws --endpoint-url=http://localhost:4566 kinesis create-stream --stream-name myFirstStream --shard-count 1
# List Streams
aws --endpoint-url=http://localhost:4566 kinesis list-streams
# describe-stream-summary
aws --endpoint-url=http://localhost:4566 kinesis describe-stream-summary --stream-name myFirstStream
# Put Record
aws --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"
aws --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata efgh"
(III) Connect to Kinesis from Spark Streaming
- Download & Build a sample aSpark - Java Kinesis application.
- The code is similar to Spark's kinesis-asl from external (connector) module. Except for a few changes to KinesisReceiver.scala onStart() method to explicitly set endPoint on kinesis, dynamoDb, cloudWatch clients. This enables Localstack endPoint url to be plugged into kinesis, dynamoDb & cloudwatch.
# Build
mvn install -DskipTests=true -Dcheckstyle.skip
# Run JavaKinesisWordCountASL with Localstack
- JavaKinesisWordCountASL SampleKinesisApplication myFirstStream http://localhost:4566/
- runJavaKinesisWordCountASL.sh script located in sbin/ folder of the aSpark project can be used to run JavaKinesisWordCoundASL from the shell
(IV) Add Data to Localstack Kinesis & View Counts on Console
a) Put Record from cli
aws --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"
aws --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata efgh"
b) Alternatively Put records from Java Kinesis application
Download, build & run AmazonKinesisRecordProducerSample.java
c) Now check the output console of JavaKinesisWordCountASL run in step (III) above. Counts of the words streamed from Localstack Kinesis will be displayed on the console.