Saturday, May 26, 2012

Setup Data replication from Oracle to MongoDB


Credits: http://tebros.com/2011/09/keep-mongodb-and-oracle-in-sync-using-streams-advanced-queuing/

Components used:
MongoDB: MongoDB shell version: 2.0.5
Oracle: 11.2.0.2
Oracle Advanced Queuing
Perl scripting
DBD:Oracle: perl DBD for Oracle
JSON perl module.

The model for this setup is:
Replicate HR.EMPLOYEES table from Oracle to MongoDB.
Use Oracle AQ to queue insert, update & delete and then use perl script to dequeue them in MongoDB.

Step 1:
Use SCOTT.EMP table available in the SCOTT schema in oracle.
The structure is:

SQL> desc emp


 Name Data Type     
 -------------------------
 EMPNO NUMBER(4)    
 ENAME VARCHAR2(10) 
 JOB VARCHAR2(9)  
 MGR NUMBER(4)    
 HIREDATE DATE         
 SAL NUMBER(7,2)  
 COMM NUMBER(7,2)  
 DEPTNO NUMBER(2)    


Step 2: Grant execute on DBMS_AQ, DBMS_AQADM to SCOTT


SQL> grant execute on DBMS_AQ to SCOTT;


Grant succeeded.


SQL> grant execute on DBMS_AQADM to SCOTT;


Grant succeeded.




Step 3: Setup Queue Table in Oracle


CREATE TYPE EMP_T AS OBJECT (MSG VARCHAR2(4000));


BEGIN
-- THIS PROCEDURE CREATES A QUEUE TABLE FOR MESSAGES OF A PREDEFINED TYPE.
DBMS_AQADM.CREATE_QUEUE_TABLE(
  QUEUE_TABLE => 'EMPQUEUE_QT',
  QUEUE_PAYLOAD_TYPE => 'EMP_T'
);


-- This procedure creates a queue in the specified queue table.
DBMS_AQADM.CREATE_QUEUE(
  QUEUE_NAME => 'EMPQUEUE_Q',
  QUEUE_TABLE => 'EMPQUEUE_QT'
);


-- This procedure enables the specified queue for enqueuing or dequeuing.
DBMS_AQADM.START_QUEUE(
  QUEUE_NAME => 'EMPQUEUE_Q'
);
END;
/


QUEUE TABLE Structure:
SQL> desc AQ$EMPQUEUE_QT
 Name   Null?    Type
 ----------------------------------------- -------- ----------------------------
 QUEUE    VARCHAR2(30)
 MSG_ID   NOT NULL RAW(16)
 CORR_ID    VARCHAR2(128)
 MSG_PRIORITY    NUMBER
 MSG_STATE    VARCHAR2(16)
 DELAY    DATE
 DELAY_TIMESTAMP    TIMESTAMP(6)
 EXPIRATION    NUMBER
 ENQ_TIME    DATE
 ENQ_TIMESTAMP    TIMESTAMP(6)
 ENQ_USER_ID    VARCHAR2(30)
 ENQ_TXN_ID    VARCHAR2(30)
 DEQ_TIME    DATE
 DEQ_TIMESTAMP    TIMESTAMP(6)
 DEQ_USER_ID    VARCHAR2(30)
 DEQ_TXN_ID    VARCHAR2(30)
 RETRY_COUNT    NUMBER
 EXCEPTION_QUEUE_OWNER    VARCHAR2(30)
 EXCEPTION_QUEUE    VARCHAR2(30)
 USER_DATA    EMP_T
 ORIGINAL_QUEUE_NAME    VARCHAR2(30)
 ORIGINAL_QUEUE_OWNER    VARCHAR2(30)
 EXPIRATION_REASON    VARCHAR2(18)
 SENDER_NAME    VARCHAR2(30)
 SENDER_ADDRESS    VARCHAR2(1024)
 SENDER_PROTOCOL    NUMBER
 ORIGINAL_MSGID    RAW(16)


SQL> SELECT name, queue_table,enqueue_enabled,dequeue_enabled,user_comment FROM user_queues; 


NAME       QUEUE_TABLE ENQUEUE DEQUEUE USER_COMMENT   
------------------------------ ------------------------------ -----
EMPQUEUE_Q       EMPQUEUE_QT YES YES                    
AQ$_EMPQUEUE_QT_E   EMPQUEUE_QT NO NO     exception queue




Step 4: Create procedure to enqueue messages/DMLs


CREATE OR REPLACE PROCEDURE enqueue_emp(payload VARCHAR2) AS
  msg1 emp_t := emp_t(NULL);
  msg_id RAW(16);
  priority NUMBER;
  enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
  msg1.msg := payload;
  message_properties.priority := 1;
  DBMS_AQ.ENQUEUE(
    queue_name => 'EMPQUEUE_Q',
    enqueue_options => enqueue_options,
    message_properties => message_properties,
    payload => msg1,
    msgid => msg_id);
END;
/




Step 5: Create procedure to dequeue messages/DMPs to be used to load data in mongoDB.




CREATE PROCEDURE dequeue_emp(payload OUT VARCHAR2) AS
  msg1 emp_t := emp_t(NULL);
  msg_id RAW(16);
  dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
  DBMS_AQ.DEQUEUE(
    queue_name => 'EMPQUEUE_Q',
    dequeue_options => dequeue_options,
    message_properties => message_properties,
    payload => msg1,
    msgid => msg_id
  );
  payload := msg1.msg;
END;
/




Step 6: Create Trigger to queue DMLs on table SCOTT.EMP


CREATE OR REPLACE TRIGGER emp_q_trig
  AFTER INSERT OR UPDATE OR DELETE ON emp
  FOR EACH ROW
DECLARE
  msg VARCHAR2(4000);
BEGIN
  IF INSERTING OR UPDATING THEN
    msg := '{"empno":' || :new.empno || ',"ename":"' || :new.ename || '"';
    msg := msg||',"job":"' || :new.job || '","mgr":"' || :new.mgr || '"';
    msg := msg||',"hiredate":"' || :new.HIREDATE || '","sal":"' || :new.sal || '"';
    msg := msg||',"comm":"' || :new.COMM || '","deptno":"' || :new.DEPTNO || '"';
    msg := msg||',"dml_type":"';
    msg := msg|| CASE WHEN INSERTING THEN 'I' ELSE 'U' END || '"}';
  ELSE
    msg := '{"empno":' || :old.empno || ',"dml_type":"D"}';
  END IF; 
  enqueue_emp(msg);
END;
/


Step 7: Lets test the AQ Setup

SQL> select * from emp;


     EMPNO ENAME      JOB       MGR HIREDATE    SAL       COMM     DEPTNO
---------- ---------- --------- ---------- --------- ---------- ---------- ----------
      7369 SMITH      CLERK      7902 17-DEC-80    800   20
      7499 ALLEN      SALESMAN      7698 20-FEB-81   1600        300   30
      7521 WARD       SALESMAN      7698 22-FEB-81   1250        500   30
      7566 JONES      MANAGER      7839 02-APR-81   2975   20
      7654 MARTIN     SALESMAN      7698 28-SEP-81   1250       1400   30
      7698 BLAKE      MANAGER      7839 01-MAY-81   2850   30
      7782 CLARK      MANAGER      7839 09-JUN-81   2450   10
      7788 SCOTT      ANALYST      7566 19-APR-87   3000   20
      7839 KING       PRESIDENT   17-NOV-81   5000   10
      7844 TURNER     SALESMAN      7698 08-SEP-81   1500 0   30
      7876 ADAMS      CLERK      7788 23-MAY-87   1100   20
      7900 JAMES      CLERK      7698 03-DEC-81    950   30
      7902 FORD       ANALYST      7566 03-DEC-81   3000   20
      7934 MILLER     CLERK      7782 23-JAN-82   1300   10


14 rows selected.


SQL> UPDATE EMP SET COMM=200 WHERE EMPNO=7369;


1 ROW UPDATED.


SQL> COMMIT;


COMMIT COMPLETE.


SQL> SELECT COUNT(*) FROM AQ$EMPQUEUE_QT;


  COUNT(*)
----------
1


SQL> DECLARE
  2  VAR1 VARCHAR2(4000);
  3  BEGIN
  4  DEQUEUE_EMP(VAR1);
  5  DBMS_OUTPUT.PUT_LINE('QUEUE OUTPUT:'|| VAR1);
  6  END;
  7  /
QUEUE OUTPUT:{"EMPNO":7369,"ENAME":"SMITH","JOB":"CLERK","MGR":"7902","HIREDATE":"17-DEC-80","SAL":"800","COMM":"200","DEPTNO":"20"","DML_TYPE":"U"}


PL/SQL PROCEDURE SUCCESSFULLY COMPLETED.


SQL> SELECT COUNT(*) FROM AQ$EMPQUEUE_QT;


  COUNT(*)
----------
0


We can see the QUEUE Output as well.

Step 8: Create a perl script to dequeue the message to mongoDB collection called EMP.


--------------Script Start------------------
#!/usr/bin/perl


use strict;
use DBI;
use DBD::Oracle;
use MongoDB;
use MongoDB::OID;
use Time::Local;
use JSON;


#Connect to the Oracle DB
my $dbh = DBI->connect("dbi:Oracle:host=hostname1;port=1521;sid=test","scott","tiger")
        or die "Error: " . $DBI::errstr;


#Connect to Mongo DB
my $conn=MongoDB::Connection->new(host => 'hostname1:27020');
print "conn $conn\n";


#Connect to required Mongo DB
my $db=$conn->test;
print "db $db\n";


#Connect to the required Mongo Collection
my $coll=$db->emp;
print "Collection: $coll\n";


# Count the messages in Oracle Queue
my $sql1 = "select count(*) from AQ\$EMPQUEUE_QT";
my $stm1 = $dbh->prepare($sql1);
$stm1->execute();
my $ccc = $stm1->fetchrow_array();
print "Message Count: $ccc\n";


#Fetch the Oracle AQ Queue Data
my $aqdata;
my $sql2 = "BEGIN dequeue_emp(:aqdata); END;";
my $stm2 = $dbh->prepare($sql2);
$stm2->bind_param_inout(":aqdata", \$aqdata,4000);


#Start of the while loop
my $counter="1";


while ($counter <= $ccc) {
$stm2->execute();
print "Counter: $counter\n";
##JSON output {"empno":7369,"ename":"SMITH","job":"CLERK","mgr":"7902","hiredate":"17-DEC-80","sal":"800","comm":"200","deptno":"20","dml_type":"U"}
my $json = $aqdata;
my $decoded_json = decode_json($json);
print $decoded_json->{"dml_type"};
print "\n";
print $decoded_json->{"ename"};
print "\n";


if ($decoded_json->{"dml_type"} eq "I"){
$coll->insert({empno => $decoded_json->{"empno"},ename => $decoded_json->{"ename"}, job => $decoded_json->{"job"}, mgr => $decoded_json->{"mgr"}, hiredate => $decoded_json->{"hiredate"}, sal => $decoded_json->{"sal"}, comm => $decoded_json->{"comm"}, deptno => $decoded_json->{"deptno"}});
print "Insert if \n";
}
if ($decoded_json->{"dml_type"} eq "U"){
$coll->remove({"empno" => $decoded_json->{"empno"}});
$coll->insert({empno => $decoded_json->{"empno"},ename => $decoded_json->{"ename"}, job => $decoded_json->{"job"}, mgr => $decoded_json->{"mgr"}, hiredate => $decoded_json->{"hiredate"}, sal => $decoded_json->{"sal"}, comm => $decoded_json->{"comm"}, deptno => $decoded_json->{"deptno"}});
print "Update if \n";
}
if ($decoded_json->{"dml_type"} eq "D"){
$coll->remove({"empno" => $decoded_json->{"empno"}});
print "Delete if \n";
}
$counter ++;
}


--------------Script END------------------


Step 9: Executing and testing the data replication


SQL> update emp set comm=200 where empno=7369;


1 row updated.


SQL> commit;


Commit complete.


SQL> select count(*) from aq$EMPQUEUE_QT;


  COUNT(*)
----------
1


###Executing the perl script


[oracle@hostname1 perlstuff]$ perl o2m_emp.pl 
conn MongoDB::Connection=HASH(0x1b45c630)
db MongoDB::Database=HASH(0x1b930580)
Collection: MongoDB::Collection=HASH(0x1c1c5210)
Message Count: 1
Counter: 1
U
SMITH
Update if


####Checking the data in mongoDB
[oracle@hostname1 ~]$ mongo --port 27020
MongoDB shell version: 2.0.5
connecting to: 127.0.0.1:27020/test
> use test
switched to db test
> show collections
emp
system.indexes
> db.emp.find()
{ "_id" : ObjectId("4fc016298b93a68269000000"), "comm" : "200", "sal" : "800", "job" : "CLERK", "mgr" : "7902", "hiredate" : "17-DEC-80", "deptno" : "20", "ename" : "SMITH", "empno" : NumberLong(7369) }
>




Step 10: Seed existing data to MongoDB from Oracle

We had 14 rows existing in SCOTT.EMP table. We might want to load those rows in MongoDB and then start the replication.
Spool the output of the below SQL

sqlplus scott/tiger


select '$collection->insert({"EMPNO"=>'||empno||',"ENAME"=>"'||ename||'","JOB"=>"'||job||'","MGR"=>"'||mgr||'","HIREDATE"=>"'||hiredate||'","SAL"=>"'||sal||'","COMM"=>"'||comm||'","DEPTNO"=>"'||deptno||'"});' from emp;


### Create a perl script to load these into MongoDB:
--------------Script Start------------------
#!/usr/bin/perl


use strict;
use MongoDB;
use MongoDB::OID;
use Time::Local;


my $conn=MongoDB::Connection->new(host => 'hostname1:27020');


print "conn $conn\n";


my $db=$conn->test;


print "db $db\n";


my $collection=$db->emp;


print "collection $collection\n";


my $cursor=$collection->find;


print "cursor $cursor\n\n";


$collection->insert({"EMPNO"=>7369,"ENAME"=>"SMITH","JOB"=>"CLERK","MGR"=>"7902","HIREDATE"=>"17-DEC-80","SAL"=>"800","COMM"=>"200","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7499,"ENAME"=>"ALLEN","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"20-FEB-81","SAL"=>"1600","COMM"=>"300","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7521,"ENAME"=>"WARD","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"22-FEB-81","SAL"=>"1250","COMM"=>"500","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7566,"ENAME"=>"JONES","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"02-APR-81","SAL"=>"2975","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7654,"ENAME"=>"MARTIN","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"28-SEP-81","SAL"=>"1250","COMM"=>"1400","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7698,"ENAME"=>"BLAKE","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"01-MAY-81","SAL"=>"2850","COMM"=>"","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7782,"ENAME"=>"CLARK","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"09-JUN-81","SAL"=>"2450","COMM"=>"","DEPTNO"=>"10"});
$collection->insert({"EMPNO"=>7788,"ENAME"=>"SCOTT","JOB"=>"ANALYST","MGR"=>"7566","HIREDATE"=>"19-APR-87","SAL"=>"3000","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7839,"ENAME"=>"KING","JOB"=>"PRESIDENT","MGR"=>"","HIREDATE"=>"17-NOV-81","SAL"=>"5000","COMM"=>"","DEPTNO"=>"10"});
$collection->insert({"EMPNO"=>7844,"ENAME"=>"TURNER","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"08-SEP-81","SAL"=>"1500","COMM"=>"0","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7876,"ENAME"=>"ADAMS","JOB"=>"CLERK","MGR"=>"7788","HIREDATE"=>"23-MAY-87","SAL"=>"1100","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7900,"ENAME"=>"JAMES","JOB"=>"CLERK","MGR"=>"7698","HIREDATE"=>"03-DEC-81","SAL"=>"950","COMM"=>"","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7902,"ENAME"=>"FORD","JOB"=>"ANALYST","MGR"=>"7566","HIREDATE"=>"03-DEC-81","SAL"=>"3000","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7934,"ENAME"=>"MILLER","JOB"=>"CLERK","MGR"=>"7782","HIREDATE"=>"23-JAN-82","SAL"=>"1300","COMM"=>"","DEPTNO"=>"10"});
--------------Script END------------------


DEMO for AQ: http://psoug.org/reference/aq_demo1.html

3 Comments:

Garrett D. said...

Thanks for this post Apun!

I've been curious on how to use mongodb as a logging tool for an Oracle database, and this gave me a good start.

Anonymous said...

Thanks for the post Apun.

I have few questions:

1. We have greenplum datawearhouse and planning to load data into Mongodb after nightly process is done. right now it take 3 hrs to load all our reporting data into mongo db. So, were looking for different options. One option was to look at Golden Gate replication during ETL process. Does Golden Gate support Greenplum database?

2. What options we have to improve the performance to load data into Mongodb. We are planning to load 3 TB data from Greenplum datawarehouse to Mongo db files.

Dattatray said...

Very useful post Arun.
I just want to know that is it possible to replicate mongodb table to oracle?
It is helpful if you share the details.
Thanks