Big Data & Analytics

Akka Actor System – Parallelizing Bulk Data Upload

Akka Actor System
Parallel Programming

Parallel Programming is a form of computation in which many calculations are carried out simultaneously. It operates on the principle that a large problem can be divided into smaller subtasks and the smaller subtasks are then solved concurrently (in-parallel).

Parallel computer programs are typically more difficult to write than their sequential counterparts, mainly due to the surrounding concurrency issues such as race conditions, deadlock or live-lock. Communication and synchronization between the different smaller subtasks are the most difficult aspects in writing a successful parallel program.

Map-Reduce Model

MapReduce is one of the programming models that provide an approach for processing large data sets with a parallel programming approach. This model is inspired by map and reduce functions very commonly used in many Functional programming languages e.g. Haskell, Erlang, Lisp, Scala, Clojure etc.

A MapReduce program is composed of a Mapfunction that performs filtering and sorting and a Reduce function that performs a summary or aggregate operation. The MapReduce framework orchestrates the processing by marshaling the concurrency with running various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance.

Introducing Akka

Akka is a toolkit and runtime for building highly concurrent, distributed and resilient message-driven applications inside the JVM. Akka’s Actor system provides an abstraction over low level system primitives like threads, locks and makes it lot easier to write, test and maintain concurrent as well as distributed systems. It facilitates message processing through an asynchronous mechanism using an event-driven receive loop. Along with Concurrency features, Akka also provides Remoting and Fault Tolerance for the Actors.

What is an Actor?

Actors are light weighted entities that perform small tasks. Each Actor maintains its state internally and interacts with other Actors via messages. All messages sent to an Actor are stored in its mailbox. Generally, Actors process messages in the order they arrive in the mailbox. Depending on the type of message Actors take appropriate steps. Intricate business functionality can be implemented by creating child actors and delegating sub tasks.

Akka runs a set of Actors on a set of threads. So every invocation of the Actors may be processed by different threads but at any moment an Actor runs on a single thread. So we can write actor code without worrying about locks and concurrency.

Actors Communication

Actors Communication

What is an Actor System?

An actor system is a hierarchical group of actors which share common configuration, e.g. dispatchers, deployments, remote capabilities and addresses.

Use Case – Bulk Data Upload

Bulk Data Upload use case can be solved with parallel programming approach.The only assumption would be that there should not be any interdependency of data rows in the to-be uploaded dataset.

In a typical bulk data upload use case, the file containing the data is initially uploaded on the server software system, and is then given to the program which handles the data insertion into the backend datastore.

Parallelized Bulk Data Insertion

There are two main sub-tasks in the bulk data insertion task:

  • File Reading
  • Bulk Data Insertion
File Reading

This sub-task, does following:

  • reads certain number of lines from the file continuously
  • passes the read lines of data to the Bulk Data Insertion sub-tasks
Bulk Data Insertion

This sub-task simply takes the passed data from File Reading sub-task and does following

  • creates a data object for each line of read data
  • uses bulk data insertion technique (supported by the underlying datastore) to insert data into the backend datastore
What can be made parallel?

Ideally, if the file type was text and with a certain constraint on the format, the large file could be split into multiple smaller files, but in this case, the file format didn’t allow me to do that. So, the file reading activity was kept to a sequential line-by-line read and there was no amount of parallelism in that. But the subsequent activity of bulk data insertion was made parallel.

Bulk Data Insert With Akka Actors

Bulk Data Insert With Akka Actors

So as the file is being read for ‘n’ number of data records, those ‘n’ numbers of data records were passed to the bulk data insertion component sub-task. Multiple instances of Akka Actors for bulk data insertion are executed in-parallel.

Solution

The sample application reads a file named NameList.csv containing sample data about people’s names and cities they stay in. This data is inserted into a Mongo collection called ‘persons’ in ‘akka-bulkimport’ DB.

The important components of the application are as follows:

  • CSVRecordBatchMsg
  • BatchCompleteMsg
  • EndOfFileMsg
  • FileReaderActor
  • MongoInsertionActor
CSVRecordBatchMsg

The com.xoriant.akka.message.CSVRecordBatchMsg class is a message sent by the com.xoriant.akka.actor.FileReaderActor giving com.xoriant.akka.actor.MongoInsertionActor a list of CSV records to validate and insert.

BatchCompleteMsg

The com.xoriant.akka.message.BatchCompleteMsg class is a message sent by the MongoInsertionActor back to the FileReaderActor saying it is done with insertion of the batch of CSV records sent earlier.

EndOfFileMsg

The com.xoriant.akka.message.EndOfFileMsg class is a message sent by the FileReaderActor notifying all the MongoInsertionActors that file reading is complete.

FileReaderActor

The com.xoriant.actor.FileReaderActor extends Akka’s akka.actor.UntypedActor and implements onReceive(Object mssage) method to process messages.

Following code snippet shows how the actor sends a batch of CSV records using CSVRecordBatchMsg

mongoInsertionActor.tell(csvRecordBatch, getSelf());

FileReaderActor maintains a counter for batches sent for insertion. Every time it receives a BatchCompleteMsg, the actor checks if this is the last batch to be inserted. If so then the actor stops itsef

if(message instanceof BatchCompleteMsg){
batchCompleteCounter++;
BatchCompleteMsg batchComplete = (BatchCompleteMsg)message;
if(batchSentCounter == batchCompleteCounter){
      System.out.println("All batches completed successfully !!");
      getContext().stop(getSelf());
}
MongoInsertionActor

com.xoriant.akka.actor.MongoInsertionActor uses Mongo Java Driver’s bulk APIs to insert all records into the ‘person’ collection. After insertion it sends a completion message back to the FileReaderActor.

BulkWriteOperation builder = personColl.initializeUnorderedBulkOperation();
List<BasicDBObject> persons = csvRecordBatch.getRecords();
for (BasicDBObject personDBObject : persons) {
      if(validate(personDBObject)){
            builder.insert(personDBObject);
}
}
BulkWriteResult result = builder.execute();
BatchCompleteMsg batchComplete = new BatchCompleteMsg(csvRecordBatch.getBatchNo(),result.getInsertedCount());
getSender().tell(batchComplete, getSelf());

The actor system should be shut down after all the actors have completed their work. In our sample we keep it simple and make the main application thread sleep for a few seconds and shutting down the system.

Source code:

https://github.com/xoriantcorporation/akka-bulkimport

References:
  1. http://akka.io/
  2. http://en.wikipedia.org/wiki/Parallel_programming_model
  3. http://en.wikipedia.org/wiki/MapReduce