Showing posts with label Streaming Solutions. Show all posts
Showing posts with label Streaming Solutions. Show all posts

Thursday, January 2, 2025

Mocked Kinesis (Localstack) with PySpark Streaming

Continuing with the same PySpark (ver 2.1.0, Python3.5, etc.) setup explained in an earlier post. In order to connect to the mocked Kinesis stream on Localstack from PySpark use the kinesis_wordcount_asl.py script located in Spark external/ (connector/) folder.

(a) Update value of master in kinesis_wordcount_asl.py

Update value of master(local[n], spark://localhost:7077, etc) in SparkContext in kinesis_wordcount_asl.py:
    sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl",master="local[2]")

(b) Add aSpark compiled jars to Spark Driver/ Executor Classpath

As explained in step (III) of an earlier post, to work with Localstack a few changes were done to the KinesisReceiver.scala onStart() to explicitly set endPoint on kinesis, dynamoDb, cloudWatch clients. Accordingly the compiled aSpark jars with the modifications need to be added to Spark Driver/ Executor classpath.

     export aSPARK_PROJ_HOME="/Downlaod/Location/aSpark"
    export SPARK_CLASSPATH="${aSPARK_PROJ_HOME}/target/original-aSpark_1.0-2.1.0.jar:${aSPARK_PROJ_HOME}/target/scala-2.11/classes:${aSPARK_PROJ_HOME}/target/scala-2.11/jars/*"

  •  For Spark Standalone mode: "spark.executor.extraClassPath" needs to be set in either spark-defaults.conf or added as a SparkConf to SparkContext (see (II)(a))

(c) Ensure SPARK_HOME, PYSPARK_PYTHON & PYTHONPATH variables are exported.

(d) Run kinesis_wordcount_asl

    python3.5 ${SPARK_HOME}/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py SampleKinesisApplication myFirstStream http://localhost:4566/ us-east-1

    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"

  • Count of the words streamed (put) will show up on the kinesis_wordcount_asl console
 

Thursday, December 26, 2024

Debugging Pyspark in Eclipse with PyDev

An earlier post shows how to run Pyspark (Spark 2.1.0) in Eclipse (ver 2024-06 (4.32)) using the PyDev (ver 12.1) plugin. The OS is Ubuntu-20.04 with Java-8, & an older version of Python3.5 compatible with PySpark (2.1.0).

While the Pyspark code runs fine within Eclipse, when trying to Debug an error is thrown:

    Pydev: Unexpected error setting up the debugger: Socket closed". 

This is due to a higher Python requirement (>3.6) for pydevd debugger module within PyDev. Details from the PyDev installations page clearly state that Python3.5 is compatible only with PyDev9.3.0. So it's back to square one.

Install/ replace Pydev 12.1 with PyDev 9.3 in Eclipse

  • Uninstall Pydev 12.1 (Help > About > Installation details > Installed software > Uninstall PyDev plugin)
  • Also manually remove all Pydev folders from eclipse/plugins folder (com.python.pydev.* & org.python.pydev.*)
  • Unzip to eclipse/dropins folder
  • Restart eclipse & check (Help > About > Installation details > Installed software)

Test debugging Pyspark
Refer to the steps to Run Pyspark on PyDev in Eclipse, & ensure the PyDev Interpreter is python3.5, PYSPARK_PYTHON variable and PYTHONPATH are correctly setup.

Finally, right click on network_wordcount.py > Debug as > Python run
(Set up Debug Configurations > Arguments & provide program arguments, e.g. "localhost 9999", & any breakpoints in the python code to test).

 

Monday, July 22, 2024

Cloudera - Streaming Data Platform

Cloudera has a significantly mature streaming offering on their Data Platform. Data from varied sources such as rich media, text, chat, message queues, etc is brought in to their unified DataFlow platform using Nifi or other ETL/ ELT. After processing these can be directed to one or more of the Op./ App DB, Data Lake (Iceberg), Vector DB post embedding (for AI/ ML), etc.

Streaming in AI/ ML apps help to provide a real-time context that can be leveraged by the apps. Things like feedback mechanism, grounding of outputs, avoiding hallucinations, model evolution, etc all of them require real-time data to be available. So with a better faster data, MLOPs platform Cloudera is looking to improve the quality of the ML apps itself running on them.

Cloudera has also made it easy to get stared with ML with their cloud based Accelarators (AMP). AMPs have support for not just Cloudera built modules, but even those from others like Pinecode, AWS, Hugging Face, etc & the ML community. Apps for Chats, Text summarization, Image analysis, Time series, LLMs, etc are available for use off the shelf. As always, Cloudera continues to offer all deployment options like on-premise, cloud & hybrid as per customer's needs.

 

Monday, January 22, 2018

Streaming Solutions

In the streaming solutions space, it all begins with the event driven architecture. This basically includes events (what triggers everything), the handlers (responsible for taking action) & the event loop (for coordinating). When things get more involved & complicated with multiple event streams/ sources, etc. solutions move into the cep space.

Another very popular programming methodology in recent times is Reactive programming. This in some senses is a special case of event driven programming with the focus on data change (as the event) & the reactive step to do other downstream data changes (as the handlers).

A whole bunch of frameworks for streaming solutions have emerged from the Big Data ecosystem such as Storm, Spark Streaming, Flink, etc. These allow for quick development of streaming solutions using high level abstractions. Even Solr has a streaming expression support now for building distributed streaming search solutions.

Outside of these frameworks, Akka Streams seems promising. It's built on top of Akka's robus Actor model & the Reactive Streams api. Solutions such as Gear Pump can provide a sense of the ground up solutions possible with Akka Streams