ElasticSearch

How to do horizontally scalable data migration using Talend ETL and AKKA Cluster

Posted on

Key Objectives:

  1. Need to do data migration from MSSql to Elasticsearch and Couchbase
  2. Need to migrate millions of record from MSSql.
  3. Need to support highly scalable parallel processing.
  4. Need to support horizontal scaling.
  5. Need to complete data migration in predefined time window.

Solution:

The solution consist of two phases, In phase one need to have data migration Job, here you can use Talend ETL for big data. This Talend ETL Job is parameterized for data migration details like which record from MSSQL as well as Connection details for MSSQL, Elastic search and Couchbase etc …

In phase two, need to run the Talend ETL job in environment which should scale and run it parallely along with this one need to orchestrate this process by passing the required parameter or messages to this Talend Job.

This is achieved by Cluster enable AKKA’s Actor Model, in which Master actor knows which records need to be migrated from MSSQL server and will assign the each record id to Worker Actors which eventually run the Talend ETL JOB

Conclusion:

This is going to be definitely useful for migrating large amount of data parallely with horizontally scalable cloud infrastructure like AWS from different data sources using one of the best open source stack.

Advertisements

How to import data from MS SQL Server into Elasticsearch 1.6

Posted on Updated on

Problem:

Need to provide the analytics and visualization for audit log data which is stored in relational database.

Solution

One of the solution to this problem is to visualize the data in open source tool like kibana . But kibana uses the elasticsearch for search and storage purpose.

So that need to import selected records from relational database into the elasticsearch 1.6. The new index will be created in elasticsearch for this data and it will be used by kibana.

Prior to elasticsearch 1.6 the river plugin was available for this purpose but it is now deprecated.

But to solve the same problem another standalone java utility known as elasticsearch – jdbc is available.

Here I am going to tell you how to use this utility through docker so whenever you need it. it would be only three steps process for you i.e clone it, build image and start the container with parameter.

Prerequisite:

  1. Ubuntu 14.04
  2. Install Docker Host
  3. Install elasticsearch
    docker run -d -p 9200:9200 -p 9300:9300 elasticsearch 
    
  4. Install Kibana

Step 1: Check out the docker file https://github.com/vinayakbhadage/data-importer

git clone https://github.com/vinayakbhadage/data-importer.git

Step 2: Change the required parameter from this file dataimport.sh as mentioned here

Step 3: Build the images from Dockerfile

docker build -t data-importer .

Step 4: Run the data-importer by setting following parameter

1.LAST_EXECUTION_START=”2014-06-06T09:08:00.948Z”

This date time used to import the data from the log table of your database. All records in that table with timestamp column value greater than this will be imported in Elastic search.

2.INDEX_NAME=** Provide the value **

This one is index name for elasticsearch.

3.CLUSTER=** Provide the value **

Provide the elasticsearch cluster name.

4.ES_HOST=** Provide the value **

Provide the elastic search host name or IP address.

5.ES_PORT=”9300″

Provide the elastic search host port number.

6.SCHEDULE=”0 0/10 * * * ?”

Default interval for data-importer is 10 min. this is Quartz cron trigger syntax.

7.SQL_SERVER_HOST=”Provide the value”

It should be sql server database IP or hostname.

8.DB_NAME=”Provide the value”

It should be sql server database name.

9.DB_USER_NAME=”Provide the value”

It should be sql server user name, here server authentication is required.

10.DB_PASSWORD=”Provide the value”

It should be sql server user password, here server authentication is required.

Note: Please change the environment variable as per your requirement

docker run -d --name data-importer -e LAST_EXECUTION_START="2014-06-06T09:08:00.948Z" \
  -e INDEX_NAME="myindex"  -e CLUSTER="elasticsearch" -e ES_HOST="myeshost" \
  -e ES_PORT="9300" -e SCHEDULE="0 0/10 * * * ?" -e SQL_SERVER_HOST="mydb" \
  -e DB_NAME="mydb" -e DB_USER_NAME="myuser" -e DB_PASSWORD="find-out" data-importer

Lastly checkout the status of elasticsearch index then you can find data over there.

Distributed Transaction with Nosql using 2 Phase Commit

Posted on Updated on

Problem:

Distributed transaction need to be supported across MongoDB and Elastic Search from an application. Inherently MongoDB and Elastic Search is not supporting distributed transactions. Here I am going to explain you how it has been solved using well known 2 Phase commit protocol.

Solution:

Basic components/actors involve in 2 phase commit protocol for MongoDB and elastic search is as shown below:

BDAT

Resource manager for MongoDB and ElasticSearch has registered with Transaction Coordinator to participate in two phase commit. During phase I transaction coordinator will send prepare notification to registered resource manager. Each resource manager can give the response as ready to commit or rollback. During first phase transaction log will be created into Redis and in case of update or delete operation need to perform, the initial state of data is also logged and lock is applied on transnational entity in Redis only. In phase I the actual changes will be done on the resource.

If all participants resource manager voted success in phase I then transaction coordinator will send the commit notification in phase II to all participant.If any one failed in phase I will send the rollback notification to remaining participated resource manager. In Phase II participated resource manager will update the transaction log and release the lock on transaction entity.

If in phase II any participant failed to commit or rollback,then that transaction will be logged as in doubt transaction and eventually watch dog will rollback corresponding changes from participated resource manager.

If transaction coordinator it self failed after or in between phase I or in between phase II, then the watch dog will rollback the transaction after transaction timeout and release the lock. In this case data will be inconsistent state still watch dog perform recovery.

2 phase commit failure scenario and its remedy

1 MongDB fail in phase I Data is consistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Failed & rollback I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo Rollback not called II
t2 dt1 ES Rollback II
2 ES fail in phase I Data is consistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready To Commit I
t2 dt1 ES Failed & rollback I
t1 dt1 Mongo Rollback II
t2 dt1 ES Rollback not called II
3 MongDB fail in phase II commit Data is inconsistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready To Commit I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo Commit Failed II
t2 dt1 ES Commited II
t3 dt1 TC InDoubt End of phase II
Wacthdog Action Retry to rollback transaction and release lock
4 ES fail in phase II commit Data is inconsistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready To Commit I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo Commited II
t2 dt1 ES Commit Failed II
t3 dt1 TC InDoubt End of phase II
Wacthdog Action Retry to rollback transaction and release lock
5 MongDB fail in phase II rollback Data is inconsistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready To Commit I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo rollback Failed II
t2 dt1 ES rollback II
t3 dt1 TC InDoubt End of phase II
Wacthdog Action Retry to rollback transaction and release lock
6 ES fail in phase II rollback Data is inconsistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready To Commit I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo rollback II
t2 dt1 ES rollback Failed II
t3 dt1 TC InDoubt End of phase II
Wacthdog Action Retry to rollback transaction and release lock
7 Transaction Coordinator failed before phase I Data is consistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready 0
t2 dt1 ES Ready 0
t1 dt1 Mongo waiting I
t2 dt1 ES waiting I
Wacthdog Action No action needed just delete the transaction
8 Transaction Coordinator failed after phase I Data is inconsistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready To Commit I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo waiting II
t2 dt1 ES waiting II
Wacthdog Action Retry to rollback transaction and release lock
9 Transaction Coordinator failed inbetween phase II Data is inconsistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Ready To Commit I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo Commited II
t2 dt1 ES waiting II
Wacthdog Action Retry to rollback transaction and release lock
10 Transaction Coordinator failed after phase I Data is inconsistent
TransactionLog ID Distributed Transaction Id Participant State Phase
t1 dt1 Mongo Failed & rollback I
t2 dt1 ES Ready To Commit I
t1 dt1 Mongo waiting II
t2 dt1 ES waiting II
Wacthdog Action Retry to rollback transaction and release lock

2 Phase commit is supported by .Net framework since 2.0 using System.Transaction package. It has been extensible for any other resource like MongoDB, Sql, Message Queue … etc using the IEnlistmentNotification interface.

The source code for POC is added here

Command query responsibility segregation pattern with MongoDB, Redis and ElasticSearch

Posted on Updated on

Problem:

In large scales data-centric enterprise application read write ratio is very high. In that case we want the fast read access with different search criteria from enterprise database. This is often a requirement for any large scales enterprise application.

Solution:

In most of the cases for enterprise application data storage and retrieval is the fundamental thing. The Command Query Responsibility Segregation is very well known for this kind of requirement for multiple reasons.

Here this pattern is implemented using MongoDB as primary storage and Elastic Search and Redis as secondary storage. Following solution diagram will represent the same.

Every create, update or delete action for the domain object will go through following steps in Command Service:

1. Create, update or delete for a domain object happened on MongoDB as a primary storage.
2. Invalidate the Redis cache entry based on the primary key of the domain object.
3. Create, update or delete the Elastic Search index for the domain object.

Every query or read for domain object with search criteria will go through following stages in Query Service:

1. Try to get the primary keys from the elastic search index based on search criteria unless search is not based on a primary key itself.
2. Based on the received primary key from previous step try to get the domain object from  Redis cache.
3. If domain object is not available with cache, then get the domain object from MongoDB. (As retrieval using a primary key is always efficient with MongoDB or any other SQL providers)
4. Update the domain object into the Redis cache and return the result.

Using this pattern you can achieve better response time for all your read queries with some a write overhead and you don’t need to surprise and change your design after a performance testing.