Credits: http://tebros.com/2011/09/keep-mongodb-and-oracle-in-sync-using-streams-advanced-queuing/
Components used:
MongoDB: MongoDB shell version: 2.0.5
Oracle: 11.2.0.2
Oracle Advanced Queuing
Perl scripting
DBD:Oracle: perl DBD for Oracle
JSON perl module.
The model for this setup is:
Replicate HR.EMPLOYEES table from Oracle to MongoDB.
Use Oracle AQ to queue insert, update & delete and then use perl script to dequeue them in MongoDB.
Step 1:
Use SCOTT.EMP table available in the SCOTT schema in oracle.
The structure is:
SQL> desc emp
Name Data Type ------------------------- EMPNO NUMBER(4) ENAME VARCHAR2(10) JOB VARCHAR2(9) MGR NUMBER(4) HIREDATE DATE SAL NUMBER(7,2) COMM NUMBER(7,2) DEPTNO NUMBER(2) Step 2: Grant execute on DBMS_AQ, DBMS_AQADM to SCOTT
SQL> grant execute on DBMS_AQ to SCOTT;
Grant succeeded.
SQL> grant execute on DBMS_AQADM to SCOTT;
Grant succeeded.
Step 3: Setup Queue Table in Oracle
CREATE TYPE EMP_T AS OBJECT (MSG VARCHAR2(4000));
BEGIN-- THIS PROCEDURE CREATES A QUEUE TABLE FOR MESSAGES OF A PREDEFINED TYPE.DBMS_AQADM.CREATE_QUEUE_TABLE( QUEUE_TABLE => 'EMPQUEUE_QT', QUEUE_PAYLOAD_TYPE => 'EMP_T');
-- This procedure creates a queue in the specified queue table.DBMS_AQADM.CREATE_QUEUE( QUEUE_NAME => 'EMPQUEUE_Q', QUEUE_TABLE => 'EMPQUEUE_QT');
-- This procedure enables the specified queue for enqueuing or dequeuing.DBMS_AQADM.START_QUEUE( QUEUE_NAME => 'EMPQUEUE_Q');END;/
QUEUE TABLE Structure:SQL> desc AQ$EMPQUEUE_QT Name Null? Type ----------------------------------------- -------- ---------------------------- QUEUE VARCHAR2(30) MSG_ID NOT NULL RAW(16) CORR_ID VARCHAR2(128) MSG_PRIORITY NUMBER MSG_STATE VARCHAR2(16) DELAY DATE DELAY_TIMESTAMP TIMESTAMP(6) EXPIRATION NUMBER ENQ_TIME DATE ENQ_TIMESTAMP TIMESTAMP(6) ENQ_USER_ID VARCHAR2(30) ENQ_TXN_ID VARCHAR2(30) DEQ_TIME DATE DEQ_TIMESTAMP TIMESTAMP(6) DEQ_USER_ID VARCHAR2(30) DEQ_TXN_ID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QUEUE_OWNER VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) USER_DATA EMP_T ORIGINAL_QUEUE_NAME VARCHAR2(30) ORIGINAL_QUEUE_OWNER VARCHAR2(30) EXPIRATION_REASON VARCHAR2(18) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER ORIGINAL_MSGID RAW(16)
SQL> SELECT name, queue_table,enqueue_enabled,dequeue_enabled,user_comment FROM user_queues;
NAME QUEUE_TABLE ENQUEUE DEQUEUE USER_COMMENT ------------------------------ ------------------------------ -----EMPQUEUE_Q EMPQUEUE_QT YES YES AQ$_EMPQUEUE_QT_E EMPQUEUE_QT NO NO exception queue
Step 4: Create procedure to enqueue messages/DMLs
CREATE OR REPLACE PROCEDURE enqueue_emp(payload VARCHAR2) AS msg1 emp_t := emp_t(NULL); msg_id RAW(16); priority NUMBER; enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;BEGIN msg1.msg := payload; message_properties.priority := 1; DBMS_AQ.ENQUEUE( queue_name => 'EMPQUEUE_Q', enqueue_options => enqueue_options, message_properties => message_properties, payload => msg1, msgid => msg_id);END;/
Step 5: Create procedure to dequeue messages/DMPs to be used to load data in mongoDB.
CREATE PROCEDURE dequeue_emp(payload OUT VARCHAR2) AS msg1 emp_t := emp_t(NULL); msg_id RAW(16); dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;BEGIN DBMS_AQ.DEQUEUE( queue_name => 'EMPQUEUE_Q', dequeue_options => dequeue_options, message_properties => message_properties, payload => msg1, msgid => msg_id ); payload := msg1.msg;END;/
Step 6: Create Trigger to queue DMLs on table SCOTT.EMP
CREATE OR REPLACE TRIGGER emp_q_trig AFTER INSERT OR UPDATE OR DELETE ON emp FOR EACH ROWDECLARE msg VARCHAR2(4000);BEGIN IF INSERTING OR UPDATING THEN msg := '{"empno":' || :new.empno || ',"ename":"' || :new.ename || '"'; msg := msg||',"job":"' || :new.job || '","mgr":"' || :new.mgr || '"'; msg := msg||',"hiredate":"' || :new.HIREDATE || '","sal":"' || :new.sal || '"'; msg := msg||',"comm":"' || :new.COMM || '","deptno":"' || :new.DEPTNO || '"'; msg := msg||',"dml_type":"'; msg := msg|| CASE WHEN INSERTING THEN 'I' ELSE 'U' END || '"}'; ELSE msg := '{"empno":' || :old.empno || ',"dml_type":"D"}'; END IF; enqueue_emp(msg);END;/Step 7: Lets test the AQ Setup
SQL> select * from emp;
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO---------- ---------- --------- ---------- --------- ---------- ---------- ---------- 7369 SMITH CLERK 7902 17-DEC-80 800 20 7499 ALLEN SALESMAN 7698 20-FEB-81 1600 300 30 7521 WARD SALESMAN 7698 22-FEB-81 1250 500 30 7566 JONES MANAGER 7839 02-APR-81 2975 20 7654 MARTIN SALESMAN 7698 28-SEP-81 1250 1400 30 7698 BLAKE MANAGER 7839 01-MAY-81 2850 30 7782 CLARK MANAGER 7839 09-JUN-81 2450 10 7788 SCOTT ANALYST 7566 19-APR-87 3000 20 7839 KING PRESIDENT 17-NOV-81 5000 10 7844 TURNER SALESMAN 7698 08-SEP-81 1500 0 30 7876 ADAMS CLERK 7788 23-MAY-87 1100 20 7900 JAMES CLERK 7698 03-DEC-81 950 30 7902 FORD ANALYST 7566 03-DEC-81 3000 20 7934 MILLER CLERK 7782 23-JAN-82 1300 10
14 rows selected.
SQL> UPDATE EMP SET COMM=200 WHERE EMPNO=7369;
1 ROW UPDATED.
SQL> COMMIT;
COMMIT COMPLETE.
SQL> SELECT COUNT(*) FROM AQ$EMPQUEUE_QT;
COUNT(*)---------- 1
SQL> DECLARE 2 VAR1 VARCHAR2(4000); 3 BEGIN 4 DEQUEUE_EMP(VAR1); 5 DBMS_OUTPUT.PUT_LINE('QUEUE OUTPUT:'|| VAR1); 6 END; 7 /QUEUE OUTPUT:{"EMPNO":7369,"ENAME":"SMITH","JOB":"CLERK","MGR":"7902","HIREDATE":"17-DEC-80","SAL":"800","COMM":"200","DEPTNO":"20"","DML_TYPE":"U"}
PL/SQL PROCEDURE SUCCESSFULLY COMPLETED.
SQL> SELECT COUNT(*) FROM AQ$EMPQUEUE_QT;
COUNT(*)---------- 0We can see the QUEUE Output as well.
Step 8: Create a perl script to dequeue the message to mongoDB collection called EMP.
--------------Script Start------------------#!/usr/bin/perl
use strict;use DBI;use DBD::Oracle;use MongoDB;use MongoDB::OID;use Time::Local;use JSON;
#Connect to the Oracle DBmy $dbh = DBI->connect("dbi:Oracle:host=hostname1;port=1521;sid=test","scott","tiger") or die "Error: " . $DBI::errstr;
#Connect to Mongo DBmy $conn=MongoDB::Connection->new(host => 'hostname1:27020');print "conn $conn\n";
#Connect to required Mongo DBmy $db=$conn->test;print "db $db\n";
#Connect to the required Mongo Collectionmy $coll=$db->emp;print "Collection: $coll\n";
# Count the messages in Oracle Queuemy $sql1 = "select count(*) from AQ\$EMPQUEUE_QT";my $stm1 = $dbh->prepare($sql1);$stm1->execute();my $ccc = $stm1->fetchrow_array();print "Message Count: $ccc\n";
#Fetch the Oracle AQ Queue Datamy $aqdata;my $sql2 = "BEGIN dequeue_emp(:aqdata); END;";my $stm2 = $dbh->prepare($sql2);$stm2->bind_param_inout(":aqdata", \$aqdata,4000);
#Start of the while loopmy $counter="1";
while ($counter <= $ccc) {$stm2->execute();print "Counter: $counter\n";##JSON output {"empno":7369,"ename":"SMITH","job":"CLERK","mgr":"7902","hiredate":"17-DEC-80","sal":"800","comm":"200","deptno":"20","dml_type":"U"}my $json = $aqdata;my $decoded_json = decode_json($json);print $decoded_json->{"dml_type"};print "\n";print $decoded_json->{"ename"};print "\n";
if ($decoded_json->{"dml_type"} eq "I"){$coll->insert({empno => $decoded_json->{"empno"},ename => $decoded_json->{"ename"}, job => $decoded_json->{"job"}, mgr => $decoded_json->{"mgr"}, hiredate => $decoded_json->{"hiredate"}, sal => $decoded_json->{"sal"}, comm => $decoded_json->{"comm"}, deptno => $decoded_json->{"deptno"}});print "Insert if \n";}if ($decoded_json->{"dml_type"} eq "U"){$coll->remove({"empno" => $decoded_json->{"empno"}});$coll->insert({empno => $decoded_json->{"empno"},ename => $decoded_json->{"ename"}, job => $decoded_json->{"job"}, mgr => $decoded_json->{"mgr"}, hiredate => $decoded_json->{"hiredate"}, sal => $decoded_json->{"sal"}, comm => $decoded_json->{"comm"}, deptno => $decoded_json->{"deptno"}});print "Update if \n";}if ($decoded_json->{"dml_type"} eq "D"){$coll->remove({"empno" => $decoded_json->{"empno"}});print "Delete if \n";}$counter ++;}
--------------Script END------------------Step 9: Executing and testing the data replication
SQL> update emp set comm=200 where empno=7369;
1 row updated.
SQL> commit;
Commit complete.
SQL> select count(*) from aq$EMPQUEUE_QT;
COUNT(*)---------- 1
###Executing the perl script
[oracle@hostname1 perlstuff]$ perl o2m_emp.pl conn MongoDB::Connection=HASH(0x1b45c630)db MongoDB::Database=HASH(0x1b930580)Collection: MongoDB::Collection=HASH(0x1c1c5210)Message Count: 1Counter: 1USMITHUpdate if
####Checking the data in mongoDB[oracle@hostname1 ~]$ mongo --port 27020MongoDB shell version: 2.0.5connecting to: 127.0.0.1:27020/test> use testswitched to db test> show collectionsempsystem.indexes> db.emp.find(){ "_id" : ObjectId("4fc016298b93a68269000000"), "comm" : "200", "sal" : "800", "job" : "CLERK", "mgr" : "7902", "hiredate" : "17-DEC-80", "deptno" : "20", "ename" : "SMITH", "empno" : NumberLong(7369) }>
Step 10: Seed existing data to MongoDB from Oracle
We had 14 rows existing in SCOTT.EMP table. We might want to load those rows in MongoDB and then start the replication.
Spool the output of the below SQL
sqlplus scott/tiger
select '$collection->insert({"EMPNO"=>'||empno||',"ENAME"=>"'||ename||'","JOB"=>"'||job||'","MGR"=>"'||mgr||'","HIREDATE"=>"'||hiredate||'","SAL"=>"'||sal||'","COMM"=>"'||comm||'","DEPTNO"=>"'||deptno||'"});' from emp;
### Create a perl script to load these into MongoDB:--------------Script Start------------------#!/usr/bin/perl
use strict;use MongoDB;use MongoDB::OID;use Time::Local;
my $conn=MongoDB::Connection->new(host => 'hostname1:27020');
print "conn $conn\n";
my $db=$conn->test;
print "db $db\n";
my $collection=$db->emp;
print "collection $collection\n";
my $cursor=$collection->find;
print "cursor $cursor\n\n";
$collection->insert({"EMPNO"=>7369,"ENAME"=>"SMITH","JOB"=>"CLERK","MGR"=>"7902","HIREDATE"=>"17-DEC-80","SAL"=>"800","COMM"=>"200","DEPTNO"=>"20"});$collection->insert({"EMPNO"=>7499,"ENAME"=>"ALLEN","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"20-FEB-81","SAL"=>"1600","COMM"=>"300","DEPTNO"=>"30"});$collection->insert({"EMPNO"=>7521,"ENAME"=>"WARD","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"22-FEB-81","SAL"=>"1250","COMM"=>"500","DEPTNO"=>"30"});$collection->insert({"EMPNO"=>7566,"ENAME"=>"JONES","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"02-APR-81","SAL"=>"2975","COMM"=>"","DEPTNO"=>"20"});$collection->insert({"EMPNO"=>7654,"ENAME"=>"MARTIN","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"28-SEP-81","SAL"=>"1250","COMM"=>"1400","DEPTNO"=>"30"});$collection->insert({"EMPNO"=>7698,"ENAME"=>"BLAKE","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"01-MAY-81","SAL"=>"2850","COMM"=>"","DEPTNO"=>"30"});$collection->insert({"EMPNO"=>7782,"ENAME"=>"CLARK","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"09-JUN-81","SAL"=>"2450","COMM"=>"","DEPTNO"=>"10"});$collection->insert({"EMPNO"=>7788,"ENAME"=>"SCOTT","JOB"=>"ANALYST","MGR"=>"7566","HIREDATE"=>"19-APR-87","SAL"=>"3000","COMM"=>"","DEPTNO"=>"20"});$collection->insert({"EMPNO"=>7839,"ENAME"=>"KING","JOB"=>"PRESIDENT","MGR"=>"","HIREDATE"=>"17-NOV-81","SAL"=>"5000","COMM"=>"","DEPTNO"=>"10"});$collection->insert({"EMPNO"=>7844,"ENAME"=>"TURNER","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"08-SEP-81","SAL"=>"1500","COMM"=>"0","DEPTNO"=>"30"});$collection->insert({"EMPNO"=>7876,"ENAME"=>"ADAMS","JOB"=>"CLERK","MGR"=>"7788","HIREDATE"=>"23-MAY-87","SAL"=>"1100","COMM"=>"","DEPTNO"=>"20"});$collection->insert({"EMPNO"=>7900,"ENAME"=>"JAMES","JOB"=>"CLERK","MGR"=>"7698","HIREDATE"=>"03-DEC-81","SAL"=>"950","COMM"=>"","DEPTNO"=>"30"});$collection->insert({"EMPNO"=>7902,"ENAME"=>"FORD","JOB"=>"ANALYST","MGR"=>"7566","HIREDATE"=>"03-DEC-81","SAL"=>"3000","COMM"=>"","DEPTNO"=>"20"});$collection->insert({"EMPNO"=>7934,"ENAME"=>"MILLER","JOB"=>"CLERK","MGR"=>"7782","HIREDATE"=>"23-JAN-82","SAL"=>"1300","COMM"=>"","DEPTNO"=>"10"});--------------Script END------------------DEMO for AQ: http://psoug.org/reference/aq_demo1.html
3 Comments:
Thanks for this post Apun!
I've been curious on how to use mongodb as a logging tool for an Oracle database, and this gave me a good start.
Thanks for the post Apun.
I have few questions:
1. We have greenplum datawearhouse and planning to load data into Mongodb after nightly process is done. right now it take 3 hrs to load all our reporting data into mongo db. So, were looking for different options. One option was to look at Golden Gate replication during ETL process. Does Golden Gate support Greenplum database?
2. What options we have to improve the performance to load data into Mongodb. We are planning to load 3 TB data from Greenplum datawarehouse to Mongo db files.
Very useful post Arun.
I just want to know that is it possible to replicate mongodb table to oracle?
It is helpful if you share the details.
Thanks
Post a Comment