Oracle 10g CDC
从Oracle9i开始,Oracle引入了CDC技术来实现对变化数据的捕获。在Oracle9i中CDC只支持同步的数据捕获(synchronous change capture),源数据的变化被实时的捕获,捕获的过程和源数据是同一个事务。它的实现需要源数据支持trigger,所以这种同步的技术会给数据源带来性能的问题。这是CDC在Oracle9i的一个缺陷(在Oracle10g中已经改进)。
在CDC中变化的数据被保存在一个变化表(change table)中,使用者通过订阅的方式(subscribe)生成一个视图(subscribe view)并且通过这个视图来得到这些变化的数据,一个change table可以有多个订阅者,也就可以有多个subscribe view。每个subscribe view可以从change table提取自己所关心的不同的columns 和rows。
在Oracle10g中,CDC开始支持异步方式来捕获变化的数据。在Oracle10g中利用redo log来实现CDC。Redo log是一个记录所有数据库变化的独立的文件。CDC的异步方式是非入侵方式(noninvasive)的实现,对于变化数据的捕获就不需要数据库加入trigger了。
Oracle的redo log有两种类型:
● On-line redo log
● Archive redo log
这种从redo log中读取数据变化的方式叫做“日志挖掘”(mining the logs),这种方式使捕获数据的操作从源数据的事务中分离出来,减轻了trigger技术给数据源带来的性能问题(是减轻不是消除!)
在Oracle10g中,异步CDC有两种实现方式:
● HOTLOG
这种方式利用On-line redo log。变化的数据从on-line redo log中获取,然后通过(Oracle Streams技术)把获取的变化保存在本地的变化表中(变化表还是在数据源上)。然后还需要其他方式把这些数据移到数据仓库中。
● AUTOLOG
这种方式与HOTLOG不同的是数据获取的位置不同。AUTOLOG方式利用Log Transport Services技术在不同数据库之间转换数据时进行变化数据获取操作的。如果目标数据库正好是数据仓库的话,这种方式就比HOTLOG方式减少了一次数据迁移的环节。
Log Transport Services技术是标准的日志迁移技术,比如Oracle的Oracle Data Guard。
Latency of change:
从发生变化的数据源事务提交到利用CDC发现变化的数据,并且把变化的数据移植到变化表之间的时间间隔。HOTLOG方式的时间滞后要比AUTOLOG方式小。
Oracle Streams
Oracle Streams技术可以在多个数据库之间进行数据备份,它通过在数据队列里面记录变化的数据。Oracle Streams技术也是利用数据库的redo log来解析变化的数据的。Oracle Streams除了进行数据备份外,还可以用来作为数据仓库的增量抽取。
Oracle Streams与CDC技术比较,可以用一个形象的比喻来形容:
把Oracle Streams比作砖、瓦;
把CDC看作是用砖、瓦盖成的大厦;
Oracle Streams可以用来实现CDC,它们是辅助的关系,不是真正竞争的关系。
上面是CDC的简单介绍,下面转一篇应用(但是怎么看都不太防舒服)
*****************************************************************************************
Oracle 10g CDC Test
怎样使用oracle 10G CDC(Capturing Changed Data)
1)create the user on the source DB and target DB.
sqlplus "/ as sysdba"
connect system/manager
set serveroutput on size 100000
set linesize 2000
create or replace package etl_util AUTHID CURRENT_USER
IS
FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW) RETURN VARCHAR2;
END etl_util;
/
show errors
create or replace package body etl_util
IS
-- ******************************************************************************
-- PUBLIC FUNCTION get_col_names
-- ******************************************************************************
FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2
IS
BEGIN
DECLARE
v_col_nm VARCHAR2(100);
v_ret VARCHAR2(32767);
v_work_str VARCHAR2(32767);
CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS
SELECT column_name FROM all_tab_columns
WHERE owner = UPPER(TRIM(p_schm))
AND table_name = UPPER(TRIM(p_tbl_nm))
ORDER BY column_name;
BEGIN
OPEN c_tab_col(p_table_name, p_schema);
v_ret := '';
v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
LOOP
FETCH c_tab_col INTO v_col_nm;
EXIT WHEN c_tab_col%NOTFOUND;
IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
v_ret := v_ret || ',' || p_pref ||LOWER(v_col_nm);
END IF;
END LOOP;
CLOSE c_tab_col;
IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2);
END IF;
IF (v_ret IS NULL) THEN
raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
|| ' Does not exist or has no columns');
END IF;
RETURN v_ret;
END;
END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_col_names
-- ******************************************************************************
-- ******************************************************************************
-- PUBLIC FUNCTION get_cols_definition
-- ******************************************************************************
FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2
IS
BEGIN
DECLARE
v_col_nm VARCHAR2(100);
v_col_def VARCHAR2(300);
v_ret VARCHAR2(32767);
v_work_str VARCHAR2(32767);
CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS
SELECT column_name,
RPAD(column_name,53) ||
decode(data_type
,'NUMBER','NUMBER('||TO_CHAR(data_precision)||
decode(data_scale,0,'',','||data_scale)||')'
,'CHAR','CHAR('||TO_CHAR(DATA_LENGTH)||')'
,'VARCHAR2','VARCHAR2('||TO_CHAR(DATA_LENGTH)||')'
,'DATE','DATE'
,data_type) def
from all_tab_columns
where table_name = upper(trim(p_tbl_nm))
AND owner = upper(trim(p_schm))
order by owner, table_name, column_name
;
BEGIN
OPEN c_tab_col(p_table_name, p_schema);
v_ret := '';
v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
LOOP
FETCH c_tab_col INTO v_col_nm, v_col_def;
EXIT WHEN c_tab_col%NOTFOUND;
IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
v_ret := v_ret || ', ' || p_pref ||LOWER(v_col_def);
END IF;
END LOOP;
CLOSE c_tab_col;
IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2); END IF;
IF (v_ret IS NULL) THEN
raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
|| ' Does not exist or has no columns');
END IF;
RETURN v_ret;
exception
when others
then dbms_output.put_line(SUBSTR('Error in get_cols_definition ' || sqlerrm,1,220));
return 'error ' || sqlerrm ;
END;
END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_cols_definition
-- ******************************************************************************
-- ******************************************************************************
-- PUBLIC FUNCTION get_modified_col_names
-- ******************************************************************************
FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW) RETURN VARCHAR2
IS
BEGIN
DECLARE
v_col_nm VARCHAR2(100);
v_col_pos number;
v_ret VARCHAR2(32767);
v_work_str VARCHAR2(32767);
v_byte_nr INTEGER;
v_col_map VARCHAR2(32767);
i INTEGER; j INTEGER;k INTEGER;
v_val VARCHAR2(5);
CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS
SELECT column_name, POWER(2,MOD(column_id,8)) col_pos, 1 + floor((column_id)/8) byte_nr
FROM all_tab_cols
WHERE table_name=UPPER(p_tbl_nm)
AND OWNER=UPPER(p_schm)
ORDER BY column_id;
BEGIN
SELECT dump(source_colmap$,10) INTO v_col_map FROM DUAL;
i:=INSTR(v_col_map,':');
v_col_map:=','||SUBSTR(v_col_map,i + 2)||',';
-- dbms_output.put_line('v_col_map='||v_col_map);
OPEN c_tab_col(table_name, schema);
v_ret := '';
LOOP
-- dbms_output.put_line('--1');
FETCH c_tab_col INTO v_col_nm, v_col_pos, v_byte_nr;
EXIT WHEN c_tab_col%NOTFOUND;
-- dbms_output.put_line('byte_nr='||to_char(v_byte_nr));
i:=INSTR(v_col_map,',',1,v_byte_nr);
j:=INSTR(v_col_map,',',1,v_byte_nr + 1);
IF ((i=0) OR (j = 0)) THEN
EXIT;
END IF;
v_val := SUBSTR(v_col_map, i + 1, j - i - 1 );
k := BITAND(TO_NUMBER(v_val),v_col_pos);
IF (k>0) THEN
v_ret := v_ret || ',' || v_col_nm;
END IF;
-- DBMS_OUTPUT.put_line('col='||v_col_nm ||' byte='||TO_CHAR(v_byte_nr)
-- || ' i=' || TO_CHAR(i) || ' j='||TO_CHAR(j)
-- ||' val='||v_val||' col_pos='||TO_CHAR(v_col_pos)||'res='||TO_CHAR(k));
END LOOP;
CLOSE c_tab_col;
IF (v_ret IS NOT NULL) THEN
v_ret := SUBSTR(v_ret,2);
END IF;
RETURN v_ret;
EXCEPTION
WHEN OTHERS
THEN dbms_output.put_line(SUBSTR('Error in get_modifed_col_names: ',1,220));
dbms_output.put_line(SUBSTR(sqlerrm,1,240));
return '';
END;
END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_modified_col_names
-- ******************************************************************************
END etl_util;
/
show errors
grant execute on etl_util to public;
DROP PUBLIC SYNONYM etl_util;
CREATE PUBLIC SYNONYM etl_util FOR etl_util;
--select etl_util.get_modified_col_names(schema1 => 'SCOTT'
--,table_name => 'EMP'
--,source_colmap$ => HEXTORAW('C000')) FROM dual;
--
BEGIN
declare v_res VARCHAR2(1000);
BEGIN
v_res := etl_util.get_modified_col_names('test', 'philip_test', HEXTORAW('0601'));
dbms_output.put_line('res='||v_res);
END;
END;
/
2)create the user
connect system/manager
DROP USER cdc_publisher CASCADE;
CREATE USER cdc_publisher IDENTIFIED BY pass;
GRANT EXECUTE_CATALOG_ROLE to cdc_publisher;
GRANT SELECT_CATALOG_ROLE to cdc_publisher;
3)grant the priviledge
connect test/pass
GRANT SELECT on PHILIP_TEST to cdc_publisher;
GRANT SELECT on PHILIP_TEST2 to cdc_publisher;
--the PHILIP_TEST2 is under user "test"
4)Create Change Tables
/* Login as a publisher to run this */
BEGIN
DECLARE work_sql VARCHAR2(32767);
v_publisher_id VARCHAR2(20) := 'cdc_publisher';
v_source_schema VARCHAR2(20) := 'test';
v_source_table VARCHAR2(100);
v_cdc_table VARCHAR2(100);
CURSOR c_tables IS
SELECT 'PHILIP_TEST' table_name FROM dual
UNION
SELECT 'PHILIP_TEST2' table_name FROM dual
;
BEGIN
FOR tables IN c_tables LOOP
v_source_table := tables.table_name;
v_cdc_table := 'rep_'||v_source_table;
work_sql := etl_util.get_cols_definition(
p_schema=>v_source_schema,
p_table_name =>v_source_table,
p_skip_columns => NULL,
p_pref => NULL);
DBMS_output.put_line('work_sql'||work_sql);
DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE (
OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema
,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'new'--only Captures the changed values from the source table
,RS_ID => 'n', ROW_ID => 'n', USER_ID => 'y', TIMESTAMP => 'y'
,OBJECT_ID => 'n' -- leave it as 'N' or you will have "table has no columns" error
,SOURCE_COLMAP => 'n', TARGET_COLMAP => 'n', OPTIONS_STRING => null
,SOURCE_TABLE => v_source_table, CHANGE_TABLE_NAME => v_cdc_table
,COLUMN_TYPE_LIST => work_sql);
DBMS_output.put_line('Change table '||v_cdc_table ||' was created successfully');
END LOOP;
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line('Error start ********************************************');
DBMS_output.put_line('Error during change table ' || v_cdc_table || ' creation:');
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
DBMS_output.put_line('Error end ********************************************');
END;
END;
/
5)when we change the "test.PHILIP_TEST" and "test.PHILIP_TEST2" ,under cdc_publisher , the table "cdc_philip_test will recorde this operation and the value at once.
*****************************************************************************************
另附一篇比较详细的CDC原理研究:
*****************************************************************************************
oracle学习--CDC 研究(1)
oracle学习--CDC 研究(2)