Continuing with the same PySpark (ver 2.1.0, Python3.5, etc.) setup explained in an earlier post. In order to connect to the mocked Kinesis stream on Localstack from PySpark use the kinesis_wordcount_asl.py script located in Spark external/ (connector/) folder.
(a) Update value of master in kinesis_wordcount_asl.py
Update value of master(local[n], spark://localhost:7077, etc) in SparkContext in kinesis_wordcount_asl.py:
sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl",master="local[2]")
(b) Add aSpark compiled jars to Spark Driver/ Executor Classpath
As explained in step (III) of an earlier post, to work with Localstack a few changes were done to the KinesisReceiver.scala onStart() to explicitly set endPoint on kinesis, dynamoDb, cloudWatch clients. Accordingly the compiled aSpark jars with the modifications need to be added to Spark Driver/ Executor classpath.
- For Spark local mode (master="local[n]"): additions to classpath can be exported in the SPARK_CLASSPATH variable.
export aSPARK_PROJ_HOME="/Downlaod/Location/aSpark"
export SPARK_CLASSPATH="${aSPARK_PROJ_HOME}/target/original-aSpark_1.0-2.1.0.jar:${aSPARK_PROJ_HOME}/target/scala-2.11/classes:${aSPARK_PROJ_HOME}/target/scala-2.11/jars/*"
- For Spark Standalone mode: "spark.executor.extraClassPath" needs to be set in either spark-defaults.conf or added as a SparkConf to SparkContext (see (II)(a))
(c) Ensure SPARK_HOME, PYSPARK_PYTHON & PYTHONPATH variables are exported.
(d) Run kinesis_wordcount_asl
python3.5 ${SPARK_HOME}/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py SampleKinesisApplication myFirstStream http://localhost:4566/ us-east-1
aws --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"
- Count of the words streamed (put) will show up on the kinesis_wordcount_asl console