Credits: http://tebros.com/2011/09/keep-mongodb-and-oracle-in-sync-using-streams-advanced-queuing/
Components used:
MongoDB: MongoDB shell version: 2.0.5
Oracle: 11.2.0.2
Oracle Advanced Queuing
Perl scripting
DBD:Oracle: perl DBD for Oracle
JSON perl module.
The model for this setup is:
Replicate HR.EMPLOYEES table from Oracle to MongoDB.
Use Oracle AQ to queue insert, update & delete and then use perl script to dequeue them in MongoDB.
Step 1:
Use SCOTT.EMP table available in the SCOTT schema in oracle.
The structure is:
SQL> desc emp
Name Data Type
-------------------------
EMPNO NUMBER(4)
ENAME VARCHAR2(10)
JOB VARCHAR2(9)
MGR NUMBER(4)
HIREDATE DATE
SAL NUMBER(7,2)
COMM NUMBER(7,2)
DEPTNO NUMBER(2)
Step 2: Grant execute on DBMS_AQ, DBMS_AQADM to SCOTT
SQL> grant execute on DBMS_AQ to SCOTT;
Grant succeeded.
SQL> grant execute on DBMS_AQADM to SCOTT;
Grant succeeded.
Step 3: Setup Queue Table in Oracle
CREATE TYPE EMP_T AS OBJECT (MSG VARCHAR2(4000));
BEGIN
-- THIS PROCEDURE CREATES A QUEUE TABLE FOR MESSAGES OF A PREDEFINED TYPE.
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => 'EMPQUEUE_QT',
QUEUE_PAYLOAD_TYPE => 'EMP_T'
);
-- This procedure creates a queue in the specified queue table.
DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => 'EMPQUEUE_Q',
QUEUE_TABLE => 'EMPQUEUE_QT'
);
-- This procedure enables the specified queue for enqueuing or dequeuing.
DBMS_AQADM.START_QUEUE(
QUEUE_NAME => 'EMPQUEUE_Q'
);
END;
/
QUEUE TABLE Structure:
SQL> desc AQ$EMPQUEUE_QT
Name Null? Type
----------------------------------------- -------- ----------------------------
QUEUE VARCHAR2(30)
MSG_ID NOT NULL RAW(16)
CORR_ID VARCHAR2(128)
MSG_PRIORITY NUMBER
MSG_STATE VARCHAR2(16)
DELAY DATE
DELAY_TIMESTAMP TIMESTAMP(6)
EXPIRATION NUMBER
ENQ_TIME DATE
ENQ_TIMESTAMP TIMESTAMP(6)
ENQ_USER_ID VARCHAR2(30)
ENQ_TXN_ID VARCHAR2(30)
DEQ_TIME DATE
DEQ_TIMESTAMP TIMESTAMP(6)
DEQ_USER_ID VARCHAR2(30)
DEQ_TXN_ID VARCHAR2(30)
RETRY_COUNT NUMBER
EXCEPTION_QUEUE_OWNER VARCHAR2(30)
EXCEPTION_QUEUE VARCHAR2(30)
USER_DATA EMP_T
ORIGINAL_QUEUE_NAME VARCHAR2(30)
ORIGINAL_QUEUE_OWNER VARCHAR2(30)
EXPIRATION_REASON VARCHAR2(18)
SENDER_NAME VARCHAR2(30)
SENDER_ADDRESS VARCHAR2(1024)
SENDER_PROTOCOL NUMBER
ORIGINAL_MSGID RAW(16)
SQL> SELECT name, queue_table,enqueue_enabled,dequeue_enabled,user_comment FROM user_queues;
NAME QUEUE_TABLE ENQUEUE DEQUEUE USER_COMMENT
------------------------------ ------------------------------ -----
EMPQUEUE_Q EMPQUEUE_QT YES YES
AQ$_EMPQUEUE_QT_E EMPQUEUE_QT NO NO exception queue
Step 4: Create procedure to enqueue messages/DMLs
CREATE OR REPLACE PROCEDURE enqueue_emp(payload VARCHAR2) AS
msg1 emp_t := emp_t(NULL);
msg_id RAW(16);
priority NUMBER;
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
msg1.msg := payload;
message_properties.priority := 1;
DBMS_AQ.ENQUEUE(
queue_name => 'EMPQUEUE_Q',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg1,
msgid => msg_id);
END;
/
Step 5: Create procedure to dequeue messages/DMPs to be used to load data in mongoDB.
CREATE PROCEDURE dequeue_emp(payload OUT VARCHAR2) AS
msg1 emp_t := emp_t(NULL);
msg_id RAW(16);
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'EMPQUEUE_Q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg1,
msgid => msg_id
);
payload := msg1.msg;
END;
/
Step 6: Create Trigger to queue DMLs on table SCOTT.EMP
CREATE OR REPLACE TRIGGER emp_q_trig
AFTER INSERT OR UPDATE OR DELETE ON emp
FOR EACH ROW
DECLARE
msg VARCHAR2(4000);
BEGIN
IF INSERTING OR UPDATING THEN
msg := '{"empno":' || :new.empno || ',"ename":"' || :new.ename || '"';
msg := msg||',"job":"' || :new.job || '","mgr":"' || :new.mgr || '"';
msg := msg||',"hiredate":"' || :new.HIREDATE || '","sal":"' || :new.sal || '"';
msg := msg||',"comm":"' || :new.COMM || '","deptno":"' || :new.DEPTNO || '"';
msg := msg||',"dml_type":"';
msg := msg|| CASE WHEN INSERTING THEN 'I' ELSE 'U' END || '"}';
ELSE
msg := '{"empno":' || :old.empno || ',"dml_type":"D"}';
END IF;
enqueue_emp(msg);
END;
/
Step 7: Lets test the AQ Setup
SQL> select * from emp;
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
---------- ---------- --------- ---------- --------- ---------- ---------- ----------
7369 SMITH CLERK 7902 17-DEC-80 800 20
7499 ALLEN SALESMAN 7698 20-FEB-81 1600 300 30
7521 WARD SALESMAN 7698 22-FEB-81 1250 500 30
7566 JONES MANAGER 7839 02-APR-81 2975 20
7654 MARTIN SALESMAN 7698 28-SEP-81 1250 1400 30
7698 BLAKE MANAGER 7839 01-MAY-81 2850 30
7782 CLARK MANAGER 7839 09-JUN-81 2450 10
7788 SCOTT ANALYST 7566 19-APR-87 3000 20
7839 KING PRESIDENT 17-NOV-81 5000 10
7844 TURNER SALESMAN 7698 08-SEP-81 1500 0 30
7876 ADAMS CLERK 7788 23-MAY-87 1100 20
7900 JAMES CLERK 7698 03-DEC-81 950 30
7902 FORD ANALYST 7566 03-DEC-81 3000 20
7934 MILLER CLERK 7782 23-JAN-82 1300 10
14 rows selected.
SQL> UPDATE EMP SET COMM=200 WHERE EMPNO=7369;
1 ROW UPDATED.
SQL> COMMIT;
COMMIT COMPLETE.
SQL> SELECT COUNT(*) FROM AQ$EMPQUEUE_QT;
COUNT(*)
----------
1
SQL> DECLARE
2 VAR1 VARCHAR2(4000);
3 BEGIN
4 DEQUEUE_EMP(VAR1);
5 DBMS_OUTPUT.PUT_LINE('QUEUE OUTPUT:'|| VAR1);
6 END;
7 /
QUEUE OUTPUT:{"EMPNO":7369,"ENAME":"SMITH","JOB":"CLERK","MGR":"7902","HIREDATE":"17-DEC-80","SAL":"800","COMM":"200","DEPTNO":"20"","DML_TYPE":"U"}
PL/SQL PROCEDURE SUCCESSFULLY COMPLETED.
SQL> SELECT COUNT(*) FROM AQ$EMPQUEUE_QT;
COUNT(*)
----------
0
We can see the QUEUE Output as well.
Step 8: Create a perl script to dequeue the message to mongoDB collection called EMP.
--------------Script Start------------------
#!/usr/bin/perl
use strict;
use DBI;
use DBD::Oracle;
use MongoDB;
use MongoDB::OID;
use Time::Local;
use JSON;
#Connect to the Oracle DB
my $dbh = DBI->connect("dbi:Oracle:host=hostname1;port=1521;sid=test","scott","tiger")
or die "Error: " . $DBI::errstr;
#Connect to Mongo DB
my $conn=MongoDB::Connection->new(host => 'hostname1:27020');
print "conn $conn\n";
#Connect to required Mongo DB
my $db=$conn->test;
print "db $db\n";
#Connect to the required Mongo Collection
my $coll=$db->emp;
print "Collection: $coll\n";
# Count the messages in Oracle Queue
my $sql1 = "select count(*) from AQ\$EMPQUEUE_QT";
my $stm1 = $dbh->prepare($sql1);
$stm1->execute();
my $ccc = $stm1->fetchrow_array();
print "Message Count: $ccc\n";
#Fetch the Oracle AQ Queue Data
my $aqdata;
my $sql2 = "BEGIN dequeue_emp(:aqdata); END;";
my $stm2 = $dbh->prepare($sql2);
$stm2->bind_param_inout(":aqdata", \$aqdata,4000);
#Start of the while loop
my $counter="1";
while ($counter <= $ccc) {
$stm2->execute();
print "Counter: $counter\n";
##JSON output {"empno":7369,"ename":"SMITH","job":"CLERK","mgr":"7902","hiredate":"17-DEC-80","sal":"800","comm":"200","deptno":"20","dml_type":"U"}
my $json = $aqdata;
my $decoded_json = decode_json($json);
print $decoded_json->{"dml_type"};
print "\n";
print $decoded_json->{"ename"};
print "\n";
if ($decoded_json->{"dml_type"} eq "I"){
$coll->insert({empno => $decoded_json->{"empno"},ename => $decoded_json->{"ename"}, job => $decoded_json->{"job"}, mgr => $decoded_json->{"mgr"}, hiredate => $decoded_json->{"hiredate"}, sal => $decoded_json->{"sal"}, comm => $decoded_json->{"comm"}, deptno => $decoded_json->{"deptno"}});
print "Insert if \n";
}
if ($decoded_json->{"dml_type"} eq "U"){
$coll->remove({"empno" => $decoded_json->{"empno"}});
$coll->insert({empno => $decoded_json->{"empno"},ename => $decoded_json->{"ename"}, job => $decoded_json->{"job"}, mgr => $decoded_json->{"mgr"}, hiredate => $decoded_json->{"hiredate"}, sal => $decoded_json->{"sal"}, comm => $decoded_json->{"comm"}, deptno => $decoded_json->{"deptno"}});
print "Update if \n";
}
if ($decoded_json->{"dml_type"} eq "D"){
$coll->remove({"empno" => $decoded_json->{"empno"}});
print "Delete if \n";
}
$counter ++;
}
--------------Script END------------------
Step 9: Executing and testing the data replication
SQL> update emp set comm=200 where empno=7369;
1 row updated.
SQL> commit;
Commit complete.
SQL> select count(*) from aq$EMPQUEUE_QT;
COUNT(*)
----------
1
###Executing the perl script
[oracle@hostname1 perlstuff]$ perl o2m_emp.pl
conn MongoDB::Connection=HASH(0x1b45c630)
db MongoDB::Database=HASH(0x1b930580)
Collection: MongoDB::Collection=HASH(0x1c1c5210)
Message Count: 1
Counter: 1
U
SMITH
Update if
####Checking the data in mongoDB
[oracle@hostname1 ~]$ mongo --port 27020
MongoDB shell version: 2.0.5
connecting to: 127.0.0.1:27020/test
> use test
switched to db test
> show collections
emp
system.indexes
> db.emp.find()
{ "_id" : ObjectId("4fc016298b93a68269000000"), "comm" : "200", "sal" : "800", "job" : "CLERK", "mgr" : "7902", "hiredate" : "17-DEC-80", "deptno" : "20", "ename" : "SMITH", "empno" : NumberLong(7369) }
>
Step 10: Seed existing data to MongoDB from Oracle
We had 14 rows existing in SCOTT.EMP table. We might want to load those rows in MongoDB and then start the replication.
Spool the output of the below SQL
sqlplus scott/tiger
select '$collection->insert({"EMPNO"=>'||empno||',"ENAME"=>"'||ename||'","JOB"=>"'||job||'","MGR"=>"'||mgr||'","HIREDATE"=>"'||hiredate||'","SAL"=>"'||sal||'","COMM"=>"'||comm||'","DEPTNO"=>"'||deptno||'"});' from emp;
### Create a perl script to load these into MongoDB:
--------------Script Start------------------
#!/usr/bin/perl
use strict;
use MongoDB;
use MongoDB::OID;
use Time::Local;
my $conn=MongoDB::Connection->new(host => 'hostname1:27020');
print "conn $conn\n";
my $db=$conn->test;
print "db $db\n";
my $collection=$db->emp;
print "collection $collection\n";
my $cursor=$collection->find;
print "cursor $cursor\n\n";
$collection->insert({"EMPNO"=>7369,"ENAME"=>"SMITH","JOB"=>"CLERK","MGR"=>"7902","HIREDATE"=>"17-DEC-80","SAL"=>"800","COMM"=>"200","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7499,"ENAME"=>"ALLEN","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"20-FEB-81","SAL"=>"1600","COMM"=>"300","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7521,"ENAME"=>"WARD","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"22-FEB-81","SAL"=>"1250","COMM"=>"500","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7566,"ENAME"=>"JONES","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"02-APR-81","SAL"=>"2975","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7654,"ENAME"=>"MARTIN","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"28-SEP-81","SAL"=>"1250","COMM"=>"1400","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7698,"ENAME"=>"BLAKE","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"01-MAY-81","SAL"=>"2850","COMM"=>"","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7782,"ENAME"=>"CLARK","JOB"=>"MANAGER","MGR"=>"7839","HIREDATE"=>"09-JUN-81","SAL"=>"2450","COMM"=>"","DEPTNO"=>"10"});
$collection->insert({"EMPNO"=>7788,"ENAME"=>"SCOTT","JOB"=>"ANALYST","MGR"=>"7566","HIREDATE"=>"19-APR-87","SAL"=>"3000","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7839,"ENAME"=>"KING","JOB"=>"PRESIDENT","MGR"=>"","HIREDATE"=>"17-NOV-81","SAL"=>"5000","COMM"=>"","DEPTNO"=>"10"});
$collection->insert({"EMPNO"=>7844,"ENAME"=>"TURNER","JOB"=>"SALESMAN","MGR"=>"7698","HIREDATE"=>"08-SEP-81","SAL"=>"1500","COMM"=>"0","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7876,"ENAME"=>"ADAMS","JOB"=>"CLERK","MGR"=>"7788","HIREDATE"=>"23-MAY-87","SAL"=>"1100","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7900,"ENAME"=>"JAMES","JOB"=>"CLERK","MGR"=>"7698","HIREDATE"=>"03-DEC-81","SAL"=>"950","COMM"=>"","DEPTNO"=>"30"});
$collection->insert({"EMPNO"=>7902,"ENAME"=>"FORD","JOB"=>"ANALYST","MGR"=>"7566","HIREDATE"=>"03-DEC-81","SAL"=>"3000","COMM"=>"","DEPTNO"=>"20"});
$collection->insert({"EMPNO"=>7934,"ENAME"=>"MILLER","JOB"=>"CLERK","MGR"=>"7782","HIREDATE"=>"23-JAN-82","SAL"=>"1300","COMM"=>"","DEPTNO"=>"10"});
--------------Script END------------------
DEMO for AQ: http://psoug.org/reference/aq_demo1.html
3 Comments:
Thanks for this post Apun!
I've been curious on how to use mongodb as a logging tool for an Oracle database, and this gave me a good start.
Thanks for the post Apun.
I have few questions:
1. We have greenplum datawearhouse and planning to load data into Mongodb after nightly process is done. right now it take 3 hrs to load all our reporting data into mongo db. So, were looking for different options. One option was to look at Golden Gate replication during ETL process. Does Golden Gate support Greenplum database?
2. What options we have to improve the performance to load data into Mongodb. We are planning to load 3 TB data from Greenplum datawarehouse to Mongo db files.
Very useful post Arun.
I just want to know that is it possible to replicate mongodb table to oracle?
It is helpful if you share the details.
Thanks
Post a Comment