Showing posts with label spark. Show all posts
Showing posts with label spark. Show all posts

Monday, January 6, 2025

Spark API Categorization

A way to categorize Spark API features:

  • Flow of data is generally across the category swim lanes, from creation of a New Spark Context to reading data using I/O to Filter, Map/ Transform, Reduce/ Agg etc Action.
  • Lazy processing upto Transformation.
  • Steps only get executed once an Action is invoke.
  • Post Actions (Reduce, Collect, etc) there could again be I/O, thus the reverse flow from Action
  • Partition is a cross cutting concern across all layers. For I/O, Transformations, Actions could be across all or a few Partitions.
  • forEach on the Stream could be at either at Transform or Action levels.

The diagram is based on code within various Spark test suites

Thursday, January 2, 2025

Mocked Kinesis (Localstack) with PySpark Streaming

Continuing with the same PySpark (ver 2.1.0, Python3.5, etc.) setup explained in an earlier post. In order to connect to the mocked Kinesis stream on Localstack from PySpark use the kinesis_wordcount_asl.py script located in Spark external/ (connector/) folder.

(a) Update value of master in kinesis_wordcount_asl.py

Update value of master(local[n], spark://localhost:7077, etc) in SparkContext in kinesis_wordcount_asl.py:
    sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl",master="local[2]")

(b) Add aSpark compiled jars to Spark Driver/ Executor Classpath

As explained in step (III) of an earlier post, to work with Localstack a few changes were done to the KinesisReceiver.scala onStart() to explicitly set endPoint on kinesis, dynamoDb, cloudWatch clients. Accordingly the compiled aSpark jars with the modifications need to be added to Spark Driver/ Executor classpath.

     export aSPARK_PROJ_HOME="/Downlaod/Location/aSpark"
    export SPARK_CLASSPATH="${aSPARK_PROJ_HOME}/target/original-aSpark_1.0-2.1.0.jar:${aSPARK_PROJ_HOME}/target/scala-2.11/classes:${aSPARK_PROJ_HOME}/target/scala-2.11/jars/*"

  •  For Spark Standalone mode: "spark.executor.extraClassPath" needs to be set in either spark-defaults.conf or added as a SparkConf to SparkContext (see (II)(a))

(c) Ensure SPARK_HOME, PYSPARK_PYTHON & PYTHONPATH variables are exported.

(d) Run kinesis_wordcount_asl

    python3.5 ${SPARK_HOME}/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py SampleKinesisApplication myFirstStream http://localhost:4566/ us-east-1

    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"

  • Count of the words streamed (put) will show up on the kinesis_wordcount_asl console
 

Wednesday, January 1, 2025

Spark Streaming with Kinesis mocked on Localstack

In this post we get a Spark streaming application working with AWS Kinesis stream, a mocked version of Kinesis running locally on Localstack. In earlier posts we have explained how to get Localstack running and various AWS services up on Localstack. The client connections to AWS services (Localstack) is done using AWS cli and AWS Java-Sdk v1.

Environment: This set-up continues on a Ubuntu20.04, with Java-8, Maven-3.6x, Docker-24.0x, Python3.5, PySpark/ Spark-2.1.0, Localstack-3.8.1, AWS Java-Sdk-v1 (ver.1.12.778),

Once the Localstack installation is done, steps to follow are:

(I) Start Localstack
    # Start locally
    localstack start

    That should get Localstack should be running on: http://localhost:4566

(II) Check Kinesis services from CLI on Localstack

    # List Streams
    aws --endpoint-url=http://localhost:4566 kinesis list-streams

    # Create Stream
    aws --endpoint-url=http://localhost:4566 kinesis create-stream --stream-name myFirstStream --shard-count 1

    # List Streams
    aws --endpoint-url=http://localhost:4566 kinesis list-streams

    # describe-stream-summary
    aws --endpoint-url=http://localhost:4566 kinesis describe-stream-summary --stream-name myFirstStream

    # Put Record
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata efgh"

(III) Connect to Kinesis from Spark Streaming

    # Build
    mvn install -DskipTests=true -Dcheckstyle.skip

    # Run JavaKinesisWordCountASL with Localstack

  • JavaKinesisWordCountASL SampleKinesisApplication myFirstStream http://localhost:4566/

(IV) Add Data to Localstack Kinesis & View Counts on Console
    a) Put Record from cli
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata efgh"

    b) Alternatively Put records from Java Kinesis application
    Download, build & run AmazonKinesisRecordProducerSample.java
    
    c) Now check the output console of JavaKinesisWordCountASL run in step (III) above. Counts of the words streamed from Localstack Kinesis will be displayed on the console.

Saturday, December 28, 2024

Debugging Spark Scala/ Java components

In continuation to the earlier post regarding debugging Pyspark, here we show how to debug the Spark Scala/ Java side. Spark is a distributed processing environment and has Scala Api's for connecting from different languages like Python & Java. The high level Pyspark Architecture is shown here.

For debugging the Spark Scala/ Java components as these run within the JVM, it's easy to make use of Java Tooling Options for remote debugging from any compatible IDE such as Idea (Eclipse longer supports Scala). A few points to remember:

  • Multiple JVMs in Spark: Since Spark is a distributed application, it involves several components like the Master/ Driver, Slave/ Worker, Executor. In a real world truly distributed setting, each of the components runs in its own separate JVM on separated Physical machines. So be clear about which component you are exactly wanting to debug & set up the Tooling options accordingly targetting the specific JVM instance.

  • Two-way connectivity between IDE & JVM: At the same time there should be a two-way network connectivity between the IDE (debugger) & the running JVM instance

  • Debugging Locally: Debugging is mostly a dev stage activity & done locally. So it may be better to debug on a a Spark cluster running locally. This could be either on a Spark Spark cluster or a Spark run locally (master=local[n]/ local[*]).

Steps:

Environment: Ubuntu-20.04 having Java-8, Spark/Pyspark (ver 2.1.0), Python3.5, Idea-Intelli (ver 2024.3), Maven3.6

(I) Idea Remote JVM Debugger
In Idea > Run/ Debug Config > Edit > Remote JVM Debug.

  • Start Debugger in Listen to Remote JVM Mode
  • Enable Auto Restart

(II)(a) Debug Spark Standlone cluster
Key features of the Spark Standalone cluster are:

  • Separate JVMs for Master, Slave/ Worker, Executor
  • All could run on a single dev box, provided enough resources (Mem, CPU) are available
  • Scripts inside SPARK_HOME/sbin folder like start-master.sh, start-slave.sh (start-worker.sh), etc to start the services

In order to Debug lets say some Executor, a Spark Standalone cluster could be started off with 1 Master, 1 Worker, 1 Executor.   

    # Start Master (Check http://localhost:8080/ to get Master URL/ PORT)
    ./sbin/start-master.sh 

    # Start Slave/ Worker
    ./sbin/start-slave.sh spark://MASTER_URL:<MASTER_PORT>

    # Add Jvm tooling to extraJavaOption to spark-defaults.conf
    spark.executor.extraJavaOptions  -agentlib:jdwp=transport=dt_socket,server=n,address=localhost:5005,suspend=n

    # The value could instead be passed as a conf to SparkContext in Python script:
    from pyspark.conf import SparkConf
    confVals = SparkConf()
    confVals.set("spark.executor.extraJavaOptions","-agentlib:jdwp=transport=dt_socket,server=n,address=localhost:5005,suspend=y")
    sc = SparkContext(master="spark://localhost:7077",appName="PythonStreamingStatefulNetworkWordCount1",conf=confVals)

(II)(b) Debug locally with master="local[n]"

  • In this case a local Spark cluster is spun up via scripts like spark-shell, spark-submit, etc. located inside the bin/ folder
  • The different components Master, Worker, Executor all run within one JVM as threads, where the value n is the no of threads, (set n=2)
  • Export JAVA_TOOL_OPTIONS before in the terminal from which the Pyspark script will be run

        export JAVA_TOOL_OPTIONS="-agentlib:jdwp=transport=dt_socket,server=n,suspend=n,address=5005"

(III) Execute PySpark Python script
    python3.5 ${SPARK_HOME}/examples/src/main/python/streaming/network_wordcount.py localhost 9999

This should start off the Pyspark & connect the Executor JVM to the waiting Idea Remote debugger instance for debugging.

Thursday, December 26, 2024

Debugging Pyspark in Eclipse with PyDev

An earlier post shows how to run Pyspark (Spark 2.1.0) in Eclipse (ver 2024-06 (4.32)) using the PyDev (ver 12.1) plugin. The OS is Ubuntu-20.04 with Java-8, & an older version of Python3.5 compatible with PySpark (2.1.0).

While the Pyspark code runs fine within Eclipse, when trying to Debug an error is thrown:

    Pydev: Unexpected error setting up the debugger: Socket closed". 

This is due to a higher Python requirement (>3.6) for pydevd debugger module within PyDev. Details from the PyDev installations page clearly state that Python3.5 is compatible only with PyDev9.3.0. So it's back to square one.

Install/ replace Pydev 12.1 with PyDev 9.3 in Eclipse

  • Uninstall Pydev 12.1 (Help > About > Installation details > Installed software > Uninstall PyDev plugin)
  • Also manually remove all Pydev folders from eclipse/plugins folder (com.python.pydev.* & org.python.pydev.*)
  • Unzip to eclipse/dropins folder
  • Restart eclipse & check (Help > About > Installation details > Installed software)

Test debugging Pyspark
Refer to the steps to Run Pyspark on PyDev in Eclipse, & ensure the PyDev Interpreter is python3.5, PYSPARK_PYTHON variable and PYTHONPATH are correctly setup.

Finally, right click on network_wordcount.py > Debug as > Python run
(Set up Debug Configurations > Arguments & provide program arguments, e.g. "localhost 9999", & any breakpoints in the python code to test).

 

Wednesday, December 25, 2024

Pyspark in Eclipse with PyDev

This post captures the steps to get Spark (ver 2.1) working within Eclipse (ver 2024-06 (4.32)) using the PyDev (ver 12.1) plugin. The OS is Ubuntu-20.04 with Java-8, Python 3.x & Maven 3.6.

(I) Compile Spark code

The Spark code is downloaded & compiled from a location "SPARK_HOME".

    export SPARK_HOME="/SPARK/DOWNLOAD/LOCATION"

    cd ${SPARK_HOME}

    mvn install -DskipTests=true -Dcheckstyle.skip -o

(Issue: For a "Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.8.0:check":
Copy scalastyle-config.xml to the sub-project (next to pom.xml) having the error.

(II) Compile Pyspark
    (a) Install Pyspark dependencies

  • Install Pandoc

        sudo apt-get install pandoc

  • Install a compatible older Pypandoc (ver 1.5)

        pip3 install pypandoc==1.5

        sudo add-apt-repository ppa:deadsnakes/ppa

        sudo apt-get install python3.5
    
    (b) Build Pyspark

        cd ${SPARK_HOME}/python

        export PYSPARK_PYTHON=python3.5

        # Build - creates ${SPARK_HOME}/python/build
        python3.5 setup.py

        # Dist - creates ${SPARK_HOME}/python/dist
        python3.5 setup.py sdist

    (c) export PYTHON_PATH

    export PYTHONPATH=$PYTHONPATH:${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${SPARK_HOME}/python/pyspark/shell.py;
    
(III) Run Pyspark from console
    Pyspark setup is done & stanalone examples code should run. Ensure variables ${SPARK_HOME}, ${PYSPARK_PYTHON} & ${PYTHONPATH} are all correctly exported (steps (I), (II)(b) & (II)(c) above):

    python3.5 ${SPARK_HOME} /python/build/lib/pyspark/examples/src/main/python/streaming/network_wordcount.py localhost 9999

(IV) Run Pyspark on PyDev in Eclipse

    (a) Eclipse with PyDev plugin installed:
    Set-up tested on Eclipse (ver 2024-06 (4.32.0)) and PyDev plugin (ver 12.1x).
 
    (b) Import the spark project in Eclipse
    There would be compilation errors due to missing Spark Scala classes.

    (c) Add Target jars for Spark Scala classes
    Eclipse no longer has support for Scala so the corresponding Spark Scala classes are missing. A work around is to add the Scala target jars compiled using mvn (in step (I) above) manually to: 

        spark-example > Properties > Java Build Path > Libraries
   

Eclipse Build Path Add Libraries

    (d) Add PyDev Interpreter for Python3.5
    Go to: spark-example > Properties > PyDev - Interpreter/ Grammar > Click to confure an Interpreter not listed > Open Interpreter Preferences Page > New > Choose from List:  

    & Select /usr/bin/python3.5

 Eclipse - Pydev Interpreter Python3.5

 On the same page, under the Environment tab add a variable named "PYSPARK_PYTHON" having value "python3.5"

Eclipse - Pydev Interpreter Python3.5 variable

    (e) Set up PYTHONPATH for PyDev

    spark-example > Properties > PyDev - PYTHONPATH

  • Under String Substitution Variables add a variable with name "SPARK_HOME" & value "/SPARK/DOWNLOAD/LOCATION" (same location added in Step (I)). 
 
Eclipse - Pydev PYTHONPATH variable

  • Under External Libraries, Choose Add based on variable, add 3 entries:

                 ${SPARK_HOME}/python/

                 ${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip

                 ${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip

   With that Pyspark should be properly set-up within PyDev.

    (f) Run Pyspark from Eclipse

    Right click on network_wordcount.py > Run as > Python run
    (You can further change Run Configurations > Arguments & provide program arguments, e.g. "localhost 9999")