Showing posts with label Mockito. Show all posts
Showing posts with label Mockito. Show all posts

Thursday, January 2, 2025

Mocked Kinesis (Localstack) with PySpark Streaming

Continuing with the same PySpark (ver 2.1.0, Python3.5, etc.) setup explained in an earlier post. In order to connect to the mocked Kinesis stream on Localstack from PySpark use the kinesis_wordcount_asl.py script located in Spark external/ (connector/) folder.

(a) Update value of master in kinesis_wordcount_asl.py

Update value of master(local[n], spark://localhost:7077, etc) in SparkContext in kinesis_wordcount_asl.py:
    sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl",master="local[2]")

(b) Add aSpark compiled jars to Spark Driver/ Executor Classpath

As explained in step (III) of an earlier post, to work with Localstack a few changes were done to the KinesisReceiver.scala onStart() to explicitly set endPoint on kinesis, dynamoDb, cloudWatch clients. Accordingly the compiled aSpark jars with the modifications need to be added to Spark Driver/ Executor classpath.

     export aSPARK_PROJ_HOME="/Downlaod/Location/aSpark"
    export SPARK_CLASSPATH="${aSPARK_PROJ_HOME}/target/original-aSpark_1.0-2.1.0.jar:${aSPARK_PROJ_HOME}/target/scala-2.11/classes:${aSPARK_PROJ_HOME}/target/scala-2.11/jars/*"

  •  For Spark Standalone mode: "spark.executor.extraClassPath" needs to be set in either spark-defaults.conf or added as a SparkConf to SparkContext (see (II)(a))

(c) Ensure SPARK_HOME, PYSPARK_PYTHON & PYTHONPATH variables are exported.

(d) Run kinesis_wordcount_asl

    python3.5 ${SPARK_HOME}/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py SampleKinesisApplication myFirstStream http://localhost:4566/ us-east-1

    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"

  • Count of the words streamed (put) will show up on the kinesis_wordcount_asl console
 

Wednesday, January 1, 2025

Spark Streaming with Kinesis mocked on Localstack

In this post we get a Spark streaming application working with AWS Kinesis stream, a mocked version of Kinesis running locally on Localstack. In earlier posts we have explained how to get Localstack running and various AWS services up on Localstack. The client connections to AWS services (Localstack) is done using AWS cli and AWS Java-Sdk v1.

Environment: This set-up continues on a Ubuntu20.04, with Java-8, Maven-3.6x, Docker-24.0x, Python3.5, PySpark/ Spark-2.1.0, Localstack-3.8.1, AWS Java-Sdk-v1 (ver.1.12.778),

Once the Localstack installation is done, steps to follow are:

(I) Start Localstack
    # Start locally
    localstack start

    That should get Localstack should be running on: http://localhost:4566

(II) Check Kinesis services from CLI on Localstack

    # List Streams
    aws --endpoint-url=http://localhost:4566 kinesis list-streams

    # Create Stream
    aws --endpoint-url=http://localhost:4566 kinesis create-stream --stream-name myFirstStream --shard-count 1

    # List Streams
    aws --endpoint-url=http://localhost:4566 kinesis list-streams

    # describe-stream-summary
    aws --endpoint-url=http://localhost:4566 kinesis describe-stream-summary --stream-name myFirstStream

    # Put Record
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata efgh"

(III) Connect to Kinesis from Spark Streaming

    # Build
    mvn install -DskipTests=true -Dcheckstyle.skip

    # Run JavaKinesisWordCountASL with Localstack

  • JavaKinesisWordCountASL SampleKinesisApplication myFirstStream http://localhost:4566/

(IV) Add Data to Localstack Kinesis & View Counts on Console
    a) Put Record from cli
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata abcd"
    aws  --endpoint-url=http://localhost:4566 kinesis put-record --stream-name myFirstStream --partition-key 123 --data "testdata efgh"

    b) Alternatively Put records from Java Kinesis application
    Download, build & run AmazonKinesisRecordProducerSample.java
    
    c) Now check the output console of JavaKinesisWordCountASL run in step (III) above. Counts of the words streamed from Localstack Kinesis will be displayed on the console.

Saturday, November 16, 2024

Mutable Argument Capture with Mockito

There are well known scenarios like caching, pooling, etc wherein object reuse is common. Testing these cases using a framework like Mockito could run into problems. Esp if there's a need to verify the arguments sent by the Caller of a Service, where the Service is mocked.

ArgumentCaptor (mockito) fails because it keeps references to the argument obj, which due to reuse by the caller only have the last/ latest updated value.    
The discussion here led to using Void Answer as one possible way to solve the issue. The following (junit-3+, mockito-1.8+, commons-lang-2.5) code explains the details.

1. Service: 

public class Service {

public void serve(MutableInt value) {

System.out.println("Service.serve(): "+value);

}


....

 

2. Caller:

public class Caller {

public void callService(Service service) {

MutableInt value = new MutableInt();

value.setValue(1);

service.serve(value);


value.setValue(2);

service.serve(value);

}

...

 

3.Tests:

public class MutableArgsTest extends TestCase{

List<MutableInt> multiValuesWritten;

@Mock

Service service;

 

/**

* Failure with ArgumentCaptor

*/

public void testMutableArgsWithArgCaptorFail() {

Caller caller = new Caller();

ArgumentCaptor<MutableInt> valueCaptor

ArgumentCaptor.forClass(MutableInt.class);


caller.callService(service);

verify(service,times(2)).serve(valueCaptor.capture());

// AssertionFailedError: expected:<[1, 2]> but was:<[2, 2]>"

assertEquals(Arrays.asList(new MutableInt(1), 

new MutableInt(2)),valueCaptor.getAllValues());

}

 

        /**

* Success with Answer

*/

public void testMutableArgsWithDoAnswer() {

Caller caller = new Caller();

doAnswer(new CaptureArgumentsWrittenAsMutableInt<Void>()).

when(service).serve(any(MutableInt.class));

caller.callService(service);

verify(service,times(2)).serve(any(MutableInt.class));


// Works!

assertEquals(new MutableInt(1),multiValuesWritten.get(0));

assertEquals(new MutableInt(2),multiValuesWritten.get(1));

}

/**

* Captures Arguments to the Service.serve() method:

* - Multiple calls to serve() happen from the same caller

* - Along with reuse of MutableInt argument objects by the caller

* - Argument value is copied to a new MutableInt object & that's captured

* @param <Void>

*/


public class CaptureArgumentsWrittenAsMutableInt<Void> implements Answer<Void>{

public Void answer(InvocationOnMock invocation) {

Object[] args = invocation.getArguments();

multiValuesWritten.add(new MutableInt(args[0].toString()));

return null ;

}

}

}