oracle队列(AQ)---实现orale到mysql的数据同步
                                            	高级队列(Advanced Queue,简称AQ): 
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
		我们利用触发器+高级序列  然后加ruby读取队列的主键,然后再在对应表中查出数据,insert 进MySQL,这是idc机房oracle到idc的mysql的过程;
高级队列是oracle的一种高级应用,它主要是表和触发器之间的组合而成的一种应用。其主要作用是在各应用系统中进行消息传递。
利用高级队列来实现消息在两个不同数据库之间的异步传输,满足业务系统的改造需求。
	至于idc机房到阿里云的mysql,处于安全考虑,ruby不能直接连接rds,借助了mq, 先放放到mq,然后从mq读取放进rds. 实现oracle到mysql的同步。需要注意oracle高级序列是可以让多个
	Oracle 高级队列具体开发步骤如下:
	
		(1)首先确定应用的需求,是否适合使用高级队列?使用高级队列预计提高性能的预期值
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete
(2)赋予数据库账户相应aq权限。
(3)确定队列包体结构,即创建type。
(4)创建队列表及队列。
(5)队列管理
一:我们的队列结构(type):
因为我们oracle中信息表有两个:t_publish_info 和t_publish_zbxx,所以会比会员表t_member_info 的type多一个字段:table_name 用来区分是从那个表读取数据,其中opr也是是标识位,1表示insert, 2表示update ,3表示delete
	CREATE OR REPLACE TYPE INFOSERVICE."INFO_SYNC_TYPE2"                                          as object
	(
	table_name number(3),
	opr number(2) ,
	record_id number(20)
	);
	二:队列表:INFOSERVICE.T_INFO_SYNC_MESSAGE
	begin
	sys.dbms_aqadm.create_queue_table(
	queue_table => 'INFOSERVICE.T_INFO_SYNC_MESSAGE',
	queue_payload_type => 'INFOSERVICE.INFO_SYNC_TYPE2',
	sort_list => 'ENQ_TIME',
	compatible => '10.0.0',
	primary_instance => 0,
	secondary_instance => 0,
	storage_clause => 'tablespace INFOSERVICE pctfree 10 initrans 1 maxtrans 255 storage ( initial 16M next 16M minextents 1 maxextents unlimited )');
	end;
	/
	三:创建队列:
	begin
	  sys.dbms_aqadm.create_queue
	 (
	  queue_name     => 'infoservice.q_info_sync_message',
	  queue_table    => 'INFOSERVICE.T_INFO_SYNC_MESSAGE',                 
	  queue_type     => sys.dbms_aqadm.normal_queue,
	  max_retries    => 3,                              
	  retry_delay    => 1,                              
	  retention_time => 0                                
	 );
	end;
	启动队列:
	begin
	  sys.dbms_aqadm.start_queue
	  (queue_name => 'infoservice.q_info_sync_message',enqueue => true ,dequeue => true );
	end;
暂停队列:
暂停队列:
		begin
	
			  sys.dbms_aqadm.stop_queue ( queue_name => '队列名');
	
			end;
	
			删除队列:
	
			begin
	
			  sys.dbms_aqadm.drop_queue ( queue_name => '队列名');
	
			end;
	
			删除队列表:
	
			begin
	
			  sys.dbms_aqadm.drop_queue_table (queue_table   =>  '队列表名');
	
			end;
	
四:入队存储过程DBMS_AQ.enqueue:	create or replace procedure infoservice.info_sync_enqueue(  table_name in number, opr in number,record_id in number)
	as
	begin
	DECLARE
	queue_options DBMS_AQ.enqueue_options_t;
	message_properties DBMS_AQ.message_properties_t;
	message_id RAW(16);
	my_message info_sync_type2;
	BEGIN
	my_message := info_sync_type2( table_name, opr,record_id );
	DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message',
	enqueue_options => queue_options,
	message_properties => message_properties,
	payload => my_message,
	msgid => message_id);
	COMMIT;
	END;
	end info_sync_enqueue;
	出对存储过程DBMS_AQ.DEQUEUE:
	create or replace procedure infoservice.info_sync_dequeue(table_name out number,opr out number ,record_id out number)
	as
	begin
	DECLARE
	queue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
	message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
	message_id RAW(200);
	my_message info_sync_type2;
	BEGIN
	DBMS_AQ.DEQUEUE(
	queue_name => 'infoservice.q_info_sync_message',
	dequeue_options => queue_options,
	message_properties => message_properties,
	payload => my_message,
	msgid => message_id );
	COMMIT;
	table_name := my_message.table_name;
	opr :=my_message. opr;
	record_id :=  my_message.record_id ;
	END;
	end info_sync_dequeue;
	五:我们这里用的触发器实现自动入队,当在T_PUBLISH_INFO表上做增删改的时候,触发入队,通过标识字段 opr的值 来区分: 1表示insert, 2表示update ,3表示delete:
	create or replace trigger INFOSERVICE.TRG_PUBLISH_2016A_Q
	before insert or delete or update of table_name,table_name2,cust_id, title,publish_date,OK_STATUS,OK_DATE,UP_DATE,IN_DATE,FILE_NAME on T_PUBLISH_INFO
	for each row
	declare
	queue_options DBMS_AQ.enqueue_options_t;
	message_properties DBMS_AQ.message_properties_t;
	message_id RAW(16);
	my_message info_sync_type2;
	opr number(2);
	table_name number(3);
	begin
	opr := 0;
	table_name := 22;###就把标识位置为22,
	CASE
	WHEN inserting THEN
	if (:NEW.OK_STATUS = 'Y') then
	opr := 1;
	my_message := info_sync_type2( table_name, opr,:new.record_id );
	end if;
	WHEN updating THEN
	if ((:NEW.OK_STATUS != :OLD.OK_STATUS)or (:NEW.OK_STATUS = 'Y' )) then
	opr := 2;
	my_message := info_sync_type2( table_name, opr,:new.record_id );
	end   if;
	WHEN deleting THEN
	opr := 3;
	my_message := info_sync_type2( table_name, opr,:old.record_id );
	END CASE;
	if ( opr != 0) then
	DBMS_AQ.enqueue(queue_name => 'infoservice.q_info_sync_message',
	enqueue_options => queue_options,
	message_properties =>message_properties,
	payload => my_message,
	msgid => message_id);
	end if;
	end;
	这里的table_name:=22,就是说这个触发器入队的时候,就把标识位table_name设置成了22,方便接下来ruby在读取的时候,判断从那个表读取数据,
	call infoservice.info_sync_enqueue(21,1,39097403);
	call infoservice.info_sync_enqueue(155,1,39097403); ---这样就读t_publish_info_20142015
	执行入队的存储过程,就会把type(INFO_SYNC_TYPE2)对应的三个字段的值存进队列表中,如下:需要注意的是队列表的字段是固定的。
	SQL> desc INFOSERVICE.T_INFO_SYNC_MESSAGE;
	Name                                      Null?    Type
	----------------------------------------- -------- ----------------------------
	Q_NAME                                             VARCHAR2(30)
	MSGID                                     NOT NULL RAW(16)
	CORRID                                             VARCHAR2(128)
	PRIORITY                                           NUMBER
	STATE                                              NUMBER
	DELAY                                              TIMESTAMP(6)
	EXPIRATION                                         NUMBER
	TIME_MANAGER_INFO                                  TIMESTAMP(6)
	LOCAL_ORDER_NO                                     NUMBER
	CHAIN_NO                                           NUMBER
	CSCN                                               NUMBER
	DSCN                                               NUMBER
	ENQ_TIME                                           TIMESTAMP(6)
	ENQ_UID                                            VARCHAR2(30)
	ENQ_TID                                            VARCHAR2(30)
	DEQ_TIME                                           TIMESTAMP(6)
	DEQ_UID                                            VARCHAR2(30)
	DEQ_TID                                            VARCHAR2(30)
	RETRY_COUNT                                        NUMBER
	EXCEPTION_QSCHEMA                                  VARCHAR2(30)
	EXCEPTION_QUEUE                                    VARCHAR2(30)
	STEP_NO                                            NUMBER
	RECIPIENT_KEY                                      NUMBER
	DEQUEUE_MSGID                                      RAW(16)
	SENDER_NAME                                        VARCHAR2(30)
	SENDER_ADDRESS                                     VARCHAR2(1024)
	SENDER_PROTOCOL                                    NUMBER
	USER_DATA                                          INFOSERVICE.INFO_SYNC_TYPE2
	USER_PROP                                          ANYDATA
	查看现在队列表中的数据,USER_DATA.table_name ,USER_DATA.opr ,USER_DATA.record_id  这三个是之前type中定义的字段。
	select * from  INFOSERVICE.T_INFO_SYNC_MESSAGE;

总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
            
            
                                                            
总结:我们这个案例中高级队列中实际上保存的最重要的信息是那个主键id,也就是record_id,然后ruby连接上oracle和mysql。通过主键id在oracle对应的表中查出数据(通过table_name 的值来判断是从哪个表找数据),通过标识符opr的值来判断是增删改操作。ruby读取oracle数据到内存中,然后insert 到mysql。实现了可靠的异步同步。
分享文章:oracle队列(AQ)---实现orale到mysql的数据同步
当前路径:http://www.jxjierui.cn/article/igdghd.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 