Showing posts with label Hadoop. Show all posts
Showing posts with label Hadoop. Show all posts

Saturday, December 28, 2024

Debugging Spark Scala/ Java components

In continuation to the earlier post regarding debugging Pyspark, here we show how to debug the Spark Scala/ Java side. Spark is a distributed processing environment and has Scala Api's for connecting from different languages like Python & Java. The high level Pyspark Architecture is shown here.

For debugging the Spark Scala/ Java components as these run within the JVM, it's easy to make use of Java Tooling Options for remote debugging from any compatible IDE such as Idea (Eclipse longer supports Scala). A few points to remember:

  • Multiple JVMs in Spark: Since Spark is a distributed application, it involves several components like the Master/ Driver, Slave/ Worker, Executor. In a real world truly distributed setting, each of the components runs in its own separate JVM on separated Physical machines. So be clear about which component you are exactly wanting to debug & set up the Tooling options accordingly targetting the specific JVM instance.

  • Two-way connectivity between IDE & JVM: At the same time there should be a two-way network connectivity between the IDE (debugger) & the running JVM instance

  • Debugging Locally: Debugging is mostly a dev stage activity & done locally. So it may be better to debug on a a Spark cluster running locally. This could be either on a Spark Spark cluster or a Spark run locally (master=local[n]/ local[*]).

Steps:

Environment: Ubuntu-20.04 having Java-8, Spark/Pyspark (ver 2.1.0), Python3.5, Idea-Intelli (ver 2024.3), Maven3.6

(I) Idea Remote JVM Debugger
In Idea > Run/ Debug Config > Edit > Remote JVM Debug.

  • Start Debugger in Listen to Remote JVM Mode
  • Enable Auto Restart

(II)(a) Debug Spark Standlone cluster
Key features of the Spark Standalone cluster are:

  • Separate JVMs for Master, Slave/ Worker, Executor
  • All could run on a single dev box, provided enough resources (Mem, CPU) are available
  • Scripts inside SPARK_HOME/sbin folder like start-master.sh, start-slave.sh (start-worker.sh), etc to start the services

In order to Debug lets say some Executor, a Spark Standalone cluster could be started off with 1 Master, 1 Worker, 1 Executor.   

    # Start Master (Check http://localhost:8080/ to get Master URL/ PORT)
    ./sbin/start-master.sh 

    # Start Slave/ Worker
    ./sbin/start-slave.sh spark://MASTER_URL:<MASTER_PORT>

    # Add Jvm tooling to extraJavaOption to spark-defaults.conf
    spark.executor.extraJavaOptions  -agentlib:jdwp=transport=dt_socket,server=n,address=localhost:5005,suspend=n

    # The value could instead be passed as a conf to SparkContext in Python script:
    from pyspark.conf import SparkConf
    confVals = SparkConf()
    confVals.set("spark.executor.extraJavaOptions","-agentlib:jdwp=transport=dt_socket,server=n,address=localhost:5005,suspend=y")
    sc = SparkContext(master="spark://localhost:7077",appName="PythonStreamingStatefulNetworkWordCount1",conf=confVals)

(II)(b) Debug locally with master="local[n]"

  • In this case a local Spark cluster is spun up via scripts like spark-shell, spark-submit, etc. located inside the bin/ folder
  • The different components Master, Worker, Executor all run within one JVM as threads, where the value n is the no of threads, (set n=2)
  • Export JAVA_TOOL_OPTIONS before in the terminal from which the Pyspark script will be run

        export JAVA_TOOL_OPTIONS="-agentlib:jdwp=transport=dt_socket,server=n,suspend=n,address=5005"

(III) Execute PySpark Python script
    python3.5 ${SPARK_HOME}/examples/src/main/python/streaming/network_wordcount.py localhost 9999

This should start off the Pyspark & connect the Executor JVM to the waiting Idea Remote debugger instance for debugging.

Thursday, December 26, 2024

Debugging Pyspark in Eclipse with PyDev

An earlier post shows how to run Pyspark (Spark 2.1.0) in Eclipse (ver 2024-06 (4.32)) using the PyDev (ver 12.1) plugin. The OS is Ubuntu-20.04 with Java-8, & an older version of Python3.5 compatible with PySpark (2.1.0).

While the Pyspark code runs fine within Eclipse, when trying to Debug an error is thrown:

    Pydev: Unexpected error setting up the debugger: Socket closed". 

This is due to a higher Python requirement (>3.6) for pydevd debugger module within PyDev. Details from the PyDev installations page clearly state that Python3.5 is compatible only with PyDev9.3.0. So it's back to square one.

Install/ replace Pydev 12.1 with PyDev 9.3 in Eclipse

  • Uninstall Pydev 12.1 (Help > About > Installation details > Installed software > Uninstall PyDev plugin)
  • Also manually remove all Pydev folders from eclipse/plugins folder (com.python.pydev.* & org.python.pydev.*)
  • Unzip to eclipse/dropins folder
  • Restart eclipse & check (Help > About > Installation details > Installed software)

Test debugging Pyspark
Refer to the steps to Run Pyspark on PyDev in Eclipse, & ensure the PyDev Interpreter is python3.5, PYSPARK_PYTHON variable and PYTHONPATH are correctly setup.

Finally, right click on network_wordcount.py > Debug as > Python run
(Set up Debug Configurations > Arguments & provide program arguments, e.g. "localhost 9999", & any breakpoints in the python code to test).

 

Wednesday, December 25, 2024

Pyspark in Eclipse with PyDev

This post captures the steps to get Spark (ver 2.1) working within Eclipse (ver 2024-06 (4.32)) using the PyDev (ver 12.1) plugin. The OS is Ubuntu-20.04 with Java-8, Python 3.x & Maven 3.6.

(I) Compile Spark code

The Spark code is downloaded & compiled from a location "SPARK_HOME".

    export SPARK_HOME="/SPARK/DOWNLOAD/LOCATION"

    cd ${SPARK_HOME}

    mvn install -DskipTests=true -Dcheckstyle.skip -o

(Issue: For a "Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.8.0:check":
Copy scalastyle-config.xml to the sub-project (next to pom.xml) having the error.

(II) Compile Pyspark
    (a) Install Pyspark dependencies

  • Install Pandoc

        sudo apt-get install pandoc

  • Install a compatible older Pypandoc (ver 1.5)

        pip3 install pypandoc==1.5

        sudo add-apt-repository ppa:deadsnakes/ppa

        sudo apt-get install python3.5
    
    (b) Build Pyspark

        cd ${SPARK_HOME}/python

        export PYSPARK_PYTHON=python3.5

        # Build - creates ${SPARK_HOME}/python/build
        python3.5 setup.py

        # Dist - creates ${SPARK_HOME}/python/dist
        python3.5 setup.py sdist

    (c) export PYTHON_PATH

    export PYTHONPATH=$PYTHONPATH:${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${SPARK_HOME}/python/pyspark/shell.py;
    
(III) Run Pyspark from console
    Pyspark setup is done & stanalone examples code should run. Ensure variables ${SPARK_HOME}, ${PYSPARK_PYTHON} & ${PYTHONPATH} are all correctly exported (steps (I), (II)(b) & (II)(c) above):

    python3.5 ${SPARK_HOME} /python/build/lib/pyspark/examples/src/main/python/streaming/network_wordcount.py localhost 9999

(IV) Run Pyspark on PyDev in Eclipse

    (a) Eclipse with PyDev plugin installed:
    Set-up tested on Eclipse (ver 2024-06 (4.32.0)) and PyDev plugin (ver 12.1x).
 
    (b) Import the spark project in Eclipse
    There would be compilation errors due to missing Spark Scala classes.

    (c) Add Target jars for Spark Scala classes
    Eclipse no longer has support for Scala so the corresponding Spark Scala classes are missing. A work around is to add the Scala target jars compiled using mvn (in step (I) above) manually to: 

        spark-example > Properties > Java Build Path > Libraries
   

Eclipse Build Path Add Libraries

    (d) Add PyDev Interpreter for Python3.5
    Go to: spark-example > Properties > PyDev - Interpreter/ Grammar > Click to confure an Interpreter not listed > Open Interpreter Preferences Page > New > Choose from List:  

    & Select /usr/bin/python3.5

 Eclipse - Pydev Interpreter Python3.5

 On the same page, under the Environment tab add a variable named "PYSPARK_PYTHON" having value "python3.5"

Eclipse - Pydev Interpreter Python3.5 variable

    (e) Set up PYTHONPATH for PyDev

    spark-example > Properties > PyDev - PYTHONPATH

  • Under String Substitution Variables add a variable with name "SPARK_HOME" & value "/SPARK/DOWNLOAD/LOCATION" (same location added in Step (I)). 
 
Eclipse - Pydev PYTHONPATH variable

  • Under External Libraries, Choose Add based on variable, add 3 entries:

                 ${SPARK_HOME}/python/

                 ${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip

                 ${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip

   With that Pyspark should be properly set-up within PyDev.

    (f) Run Pyspark from Eclipse

    Right click on network_wordcount.py > Run as > Python run
    (You can further change Run Configurations > Arguments & provide program arguments, e.g. "localhost 9999")


Monday, July 22, 2024

Cloudera - Streaming Data Platform

Cloudera has a significantly mature streaming offering on their Data Platform. Data from varied sources such as rich media, text, chat, message queues, etc is brought in to their unified DataFlow platform using Nifi or other ETL/ ELT. After processing these can be directed to one or more of the Op./ App DB, Data Lake (Iceberg), Vector DB post embedding (for AI/ ML), etc.

Streaming in AI/ ML apps help to provide a real-time context that can be leveraged by the apps. Things like feedback mechanism, grounding of outputs, avoiding hallucinations, model evolution, etc all of them require real-time data to be available. So with a better faster data, MLOPs platform Cloudera is looking to improve the quality of the ML apps itself running on them.

Cloudera has also made it easy to get stared with ML with their cloud based Accelarators (AMP). AMPs have support for not just Cloudera built modules, but even those from others like Pinecode, AWS, Hugging Face, etc & the ML community. Apps for Chats, Text summarization, Image analysis, Time series, LLMs, etc are available for use off the shelf. As always, Cloudera continues to offer all deployment options like on-premise, cloud & hybrid as per customer's needs.

 

Friday, March 20, 2015

Teradata

Teradata busy getting a chunk of the BigData pie. Teradata Parrallel Transporter (TPT) and Adv. SQL Combo makes querying Big Data sources fast and efficient using state of the art caching and other optimizations.

Saturday, June 21, 2014

Getting Table Information From Hive Metastore

The Hive Metastore holds various meta information about Hive tables such as schema names, table names, partitions, fields, permissions, and so on. The metastore is built in a schema within a relational database, outside of Hive, and accessed via services by Hive clients. An embedded Derby database is configured as the default database for experimentation purposes, and must be changed over to something like MySql, Postgre, etc. for production use.

The Hive Metastore ER diagram is fairly straightforward. Once familiar with the schema, it is easy to query the metastore for information about the Hive tables. Here's a sample query to identify all Partitioned tables from a given Hive databases:

Monday, April 21, 2014

Urlencode and Urldecode in Hive using the Reflect UDF

Hive doesn't offer an in-built UDF to perform Urlencode or Urldecode. One option could be to write a custom UDF to fill for the void.

On the other hand, a rather straight forward alternative to have the same feature, as shown on the forum, us using the very generic Reflect UDF.

Thursday, April 10, 2014

Hive Query Plan Generation

Hive query is passed through several built-in modules for the final plan to be generated.

The stages/ modules are:

Query 
     => (1) Parser 
               => (2) Semantic Analyzer 
                     => (3) Logical Plan Generation  
                           => (4) Optimizer 
                                => (5) Physical Plan Generation
                                         => Executor to run on Hadoop

Monday, March 24, 2014

Hive History File

Hive maintains a history of all commands executed via the hive cli. These commands are written to a file called .hivehistory, on the user's home folder.

Sunday, March 16, 2014

Hive Optimizations

Explain Plan

Mappers and Reducers Count

Map Joins

Sorting

Optimization step:
Between the logical & physical plan generation phase of hive, hive optimizations gets executed. The current set of optimizations include:

  • Column pruning
  • Partition pruning
  • Sample pruning
  • Predicate push down
  • Map join processor
  • Union processor
  • Join reorder
  • Union processor
More on each of these optimizations to follow..

Sunday, February 2, 2014

Build Hadoop from Source Code with Native Libraries and Snappy Compression

When running Hadoop using a pre-built Hadoop binary distribution (a downloaded hadoop-<Latest_Version>.tar.gz bundle), Hadoop may not be able to load certain native libraries. The following warning is also displayed at the time of starting up Hadoop:

"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable "

This issue comes up due to the difference in architecture of the particular machine on which Hadoop is being run now vs. that of the machine on which it was orginally compiled. While most of Hadoop (written in Java) loads up fine, there are native libraries (compression, etc.) which do not get loaded (more details to follow).

The fix is to compile Hadoop locally & use it in place of the pre-built Hadoop binary (tar.gz). At a high level this requires:

Sunday, October 20, 2013

General Availability (GA) for Hadoop 2.x

The Hadoop 2.x GA is nothing less that a big leap forward. Most of the features released such as YARN - a pluggable resource management framework, Name Node HA, HDFS Federation and so on were long awaited. As per the official mail to the community, this release includes:

"To recap, this release has a number of significant highlights compared to Hadoop 1.x:
        • YARN - A general purpose resource management system for Hadoop to allow MapReduce and other other data processing frameworks and services
        • High Availability for HDFS
        • HDFS Federation
        • HDFS Snapshots
        • NFSv3 access to data in HDFS
        • Support for running Hadoop on Microsoft Windows
        • Binary Compatibility for MapReduce applications built on hadoop-1.x
        • Substantial amount of integration testing with rest of projects in the ecosystem

 Please see the Hadoop 2.2.0 Release Notes for details."


Also as per the official email to the community, users are encouraged to move forward to the 2.x branch which is more stable & backward compatible.

Monday, September 17, 2012

Workaround for Copy Command from WebHDFS

At the moment the WebHDFS api doesn't offer the Copy command. As a result, the client ends up having to download the file to the local disk and re-upload the files via the Create command. Since this ends up being a lot of round trips all the way to the client (typically a non Java based client) the following workaround can be set up to partly alleviates the problem.

Set up a HDFS Webdav server on one of the DN or NN boxes. Issue the Copy command to the Webdav server via a REST call. Free up the client application, while letting the Webdav server with much better connectivity & proximity to the HDFS complete the Copy command request.

Friday, August 10, 2012

REST based integration with HDFS via WebHDFS

WebHDFS offers a set of perfectly good REST api's for any application to integrate with the HDFS. This can be particularly advantageous for applications written in languages other than Java such as Rails, Dot Net and so on.

Within our lan with commodity desktop class boxes with 2.5 Ghz processors, 8 G Ram set up, and a replication factor of 2, we found about  read/ write speeds of about 27 Mbps via WebHDFS. This was only a shade slower than the 30 Mbps that we were getting via raw file transfers between the same Data Nodes (DN).

Another observation was that our best transfer rates were achieved by setting the buffer size to 22K. We played around with several other buffer size values, but found 22K to be the magic number. Hoping to find some logical explanation for this observation.