Decode360's Blog

业精于勤而荒于嬉 QQ:150355677 MSN:decode360@hotmail.com

  BlogJava :: 首页 :: 新随笔 :: 联系 ::  :: 管理 ::
  397 随笔 :: 33 文章 :: 29 评论 :: 0 Trackbacks
Oracle 10g CDC
 

    从Oracle9i开始,Oracle引入了CDC技术来实现对变化数据的捕获。在Oracle9i中CDC只支持同步的数据捕获(synchronous change capture),源数据的变化被实时的捕获,捕获的过程和源数据是同一个事务。它的实现需要源数据支持trigger,所以这种同步的技术会给数据源带来性能的问题。这是CDC在Oracle9i的一个缺陷(在Oracle10g中已经改进)

    在CDC中变化的数据被保存在一个变化表(change table)中,使用者通过订阅的方式(subscribe)生成一个视图(subscribe view)并且通过这个视图来得到这些变化的数据,一个change table可以有多个订阅者,也就可以有多个subscribe view。每个subscribe view可以从change table提取自己所关心的不同的columns 和rows。

    在Oracle10g中,CDC开始支持异步方式来捕获变化的数据。在Oracle10g中利用redo log来实现CDC。Redo log是一个记录所有数据库变化的独立的文件。CDC的异步方式是非入侵方式(noninvasive)的实现,对于变化数据的捕获就不需要数据库加入trigger了。
 
 
Oracle的redo log有两种类型:
 
● On-line redo log
● Archive redo log

这种从redo log中读取数据变化的方式叫做“日志挖掘”(mining the logs),这种方式使捕获数据的操作从源数据的事务中分离出来,减轻了trigger技术给数据源带来的性能问题(是减轻不是消除!)
 

在Oracle10g中,异步CDC有两种实现方式:
 
● HOTLOG
 
这种方式利用On-line redo log。变化的数据从on-line redo log中获取,然后通过(Oracle Streams技术)把获取的变化保存在本地的变化表中(变化表还是在数据源上)。然后还需要其他方式把这些数据移到数据仓库中。

AUTOLOG
 
这种方式与HOTLOG不同的是数据获取的位置不同。AUTOLOG方式利用Log Transport Services技术在不同数据库之间转换数据时进行变化数据获取操作的。如果目标数据库正好是数据仓库的话,这种方式就比HOTLOG方式减少了一次数据迁移的环节。
 
Log Transport Services技术是标准的日志迁移技术,比如Oracle的Oracle Data Guard。
 

Latency of change:
 
从发生变化的数据源事务提交到利用CDC发现变化的数据,并且把变化的数据移植到变化表之间的时间间隔。HOTLOG方式的时间滞后要比AUTOLOG方式小。
 

Oracle Streams

Oracle Streams技术可以在多个数据库之间进行数据备份,它通过在数据队列里面记录变化的数据。Oracle Streams技术也是利用数据库的redo log来解析变化的数据的。Oracle Streams除了进行数据备份外,还可以用来作为数据仓库的增量抽取。
 
Oracle Streams与CDC技术比较,可以用一个形象的比喻来形容:
 
把Oracle Streams比作砖、瓦;
把CDC看作是用砖、瓦盖成的大厦;
 
Oracle Streams可以用来实现CDC,它们是辅助的关系,不是真正竞争的关系。
 
 
 
 
 
 
 
上面是CDC的简单介绍,下面转一篇应用(但是怎么看都不太防舒服)
*****************************************************************************************
Oracle 10g CDC Test
 
 
怎样使用oracle 10G CDC(Capturing Changed Data)

1)create the user on the source DB and target DB.

sqlplus "/  as sysdba"
connect system/manager

set serveroutput on size 100000
set linesize 2000
create or replace package etl_util AUTHID CURRENT_USER
IS
  FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
  FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
  FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW)  RETURN VARCHAR2;
END etl_util;
/
show errors
 
create or replace package body etl_util
IS
-- ******************************************************************************
-- PUBLIC FUNCTION get_col_names
-- ******************************************************************************
  FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2   
    IS
  BEGIN
    DECLARE
      v_col_nm VARCHAR2(100);
      v_ret VARCHAR2(32767);
      v_work_str VARCHAR2(32767);
 
      CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS       
            SELECT column_name FROM all_tab_columns
            WHERE owner = UPPER(TRIM(p_schm))
            AND table_name = UPPER(TRIM(p_tbl_nm))
            ORDER BY column_name;
 
  BEGIN
  OPEN c_tab_col(p_table_name, p_schema);
  v_ret := '';
  v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
 
  LOOP
      FETCH c_tab_col INTO v_col_nm;
      EXIT WHEN c_tab_col%NOTFOUND;
    IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
        v_ret := v_ret || ',' || p_pref ||LOWER(v_col_nm);
    END IF;
  END LOOP;
  CLOSE c_tab_col;
  IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2);
  END IF;
  IF (v_ret IS NULL) THEN
      raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
        || ' Does not exist or has no columns');
  END IF;
 
  RETURN v_ret;
  END;
  END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_col_names
-- ******************************************************************************
 
-- ******************************************************************************
-- PUBLIC FUNCTION get_cols_definition
-- ******************************************************************************
  FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2   
    IS
  BEGIN
    DECLARE
      v_col_nm VARCHAR2(100);
      v_col_def VARCHAR2(300);
      v_ret VARCHAR2(32767);
      v_work_str VARCHAR2(32767);
      CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS       
        SELECT column_name,
            RPAD(column_name,53) ||
        decode(data_type
            ,'NUMBER','NUMBER('||TO_CHAR(data_precision)||
                decode(data_scale,0,'',','||data_scale)||')'
            ,'CHAR','CHAR('||TO_CHAR(DATA_LENGTH)||')'
            ,'VARCHAR2','VARCHAR2('||TO_CHAR(DATA_LENGTH)||')'
            ,'DATE','DATE'
            ,data_type) def
        from all_tab_columns
        where table_name = upper(trim(p_tbl_nm))
            AND owner = upper(trim(p_schm))
        order by owner, table_name, column_name
        ;
  BEGIN
  OPEN c_tab_col(p_table_name, p_schema);
  v_ret := '';
  v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
 
  LOOP
      FETCH c_tab_col INTO v_col_nm, v_col_def;
      EXIT WHEN c_tab_col%NOTFOUND;
    IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
        v_ret := v_ret || ', ' || p_pref ||LOWER(v_col_def);
    END IF;
  END LOOP;
  CLOSE c_tab_col;
  IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2); END IF;
  IF (v_ret IS NULL) THEN
      raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
        || ' Does not exist or has no columns');
  END IF;
 
  RETURN v_ret;
  exception
          when others
            then dbms_output.put_line(SUBSTR('Error in get_cols_definition ' || sqlerrm,1,220));
  return 'error ' || sqlerrm ;
  END;
  END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_cols_definition
-- ******************************************************************************
 
-- ******************************************************************************
-- PUBLIC FUNCTION get_modified_col_names
-- ******************************************************************************
  FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW) RETURN VARCHAR2
    IS
  BEGIN
    DECLARE
      v_col_nm VARCHAR2(100);
      v_col_pos number;
      v_ret VARCHAR2(32767);
      v_work_str VARCHAR2(32767);
    v_byte_nr INTEGER;
    v_col_map VARCHAR2(32767);
    i INTEGER; j INTEGER;k INTEGER;
    v_val VARCHAR2(5);
      CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS       
        SELECT column_name, POWER(2,MOD(column_id,8)) col_pos, 1 + floor((column_id)/8) byte_nr
            FROM all_tab_cols
            WHERE table_name=UPPER(p_tbl_nm)
                AND OWNER=UPPER(p_schm)
            ORDER BY column_id;       
  BEGIN
      SELECT dump(source_colmap$,10) INTO v_col_map FROM DUAL;
      i:=INSTR(v_col_map,':');
      v_col_map:=','||SUBSTR(v_col_map,i + 2)||',';
--    dbms_output.put_line('v_col_map='||v_col_map);
      OPEN c_tab_col(table_name, schema);
      v_ret := '';
      LOOP
--    dbms_output.put_line('--1');
          FETCH c_tab_col INTO v_col_nm, v_col_pos, v_byte_nr;
          EXIT WHEN c_tab_col%NOTFOUND;
--    dbms_output.put_line('byte_nr='||to_char(v_byte_nr));
          i:=INSTR(v_col_map,',',1,v_byte_nr);
          j:=INSTR(v_col_map,',',1,v_byte_nr + 1);
          IF ((i=0) OR (j = 0)) THEN
              EXIT;
          END IF;
          v_val := SUBSTR(v_col_map, i + 1, j - i - 1 );
          k := BITAND(TO_NUMBER(v_val),v_col_pos);
          IF (k>0) THEN
              v_ret := v_ret || ',' || v_col_nm;
          END IF;
--    DBMS_OUTPUT.put_line('col='||v_col_nm ||' byte='||TO_CHAR(v_byte_nr)
--          || ' i=' || TO_CHAR(i) || ' j='||TO_CHAR(j)
--          ||' val='||v_val||' col_pos='||TO_CHAR(v_col_pos)||'res='||TO_CHAR(k));
      END LOOP;
      CLOSE c_tab_col;
      IF (v_ret IS NOT NULL) THEN
          v_ret := SUBSTR(v_ret,2);
      END IF;
      RETURN v_ret;
  EXCEPTION
        WHEN OTHERS
            THEN dbms_output.put_line(SUBSTR('Error in get_modifed_col_names: ',1,220));
                dbms_output.put_line(SUBSTR(sqlerrm,1,240));
      return '';
  END;
  END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_modified_col_names
-- ******************************************************************************
 
END etl_util;
/
show errors

grant execute on etl_util to public;
DROP PUBLIC SYNONYM etl_util;
CREATE PUBLIC SYNONYM etl_util FOR etl_util;
--select etl_util.get_modified_col_names(schema1 => 'SCOTT'
--,table_name => 'EMP'
--,source_colmap$ => HEXTORAW('C000')) FROM dual;
--
BEGIN
declare v_res VARCHAR2(1000);
BEGIN
    v_res := etl_util.get_modified_col_names('test', 'philip_test', HEXTORAW('0601'));
    dbms_output.put_line('res='||v_res);
END;
END;
/

2)create the user

connect system/manager
DROP USER cdc_publisher CASCADE;
CREATE USER cdc_publisher IDENTIFIED BY pass;
GRANT EXECUTE_CATALOG_ROLE to cdc_publisher;
GRANT SELECT_CATALOG_ROLE to cdc_publisher;
 
3)grant the priviledge

connect test/pass
GRANT SELECT on PHILIP_TEST to cdc_publisher;
GRANT SELECT on PHILIP_TEST2 to cdc_publisher;
--the PHILIP_TEST2  is under  user   "test"

4)Create Change Tables
--connect cdc_publisher/pass@sde223_dvweb
set serveroutput on size 1000000
set linesize 120
 
/* Login as a publisher to run this */
BEGIN
    DECLARE work_sql VARCHAR2(32767);
        v_publisher_id VARCHAR2(20)  := 'cdc_publisher';
        v_source_schema VARCHAR2(20) := 'test';
        v_source_table VARCHAR2(100);
        v_cdc_table VARCHAR2(100);
        CURSOR c_tables IS
            SELECT 'PHILIP_TEST' table_name FROM dual
                UNION
            SELECT 'PHILIP_TEST2' table_name FROM dual
        ;
BEGIN
FOR tables IN c_tables LOOP
  v_source_table := tables.table_name;
  v_cdc_table := 'rep_'||v_source_table;
  work_sql := etl_util.get_cols_definition(
        p_schema=>v_source_schema,
        p_table_name =>v_source_table,
        p_skip_columns => NULL,
        p_pref => NULL);
    DBMS_output.put_line('work_sql'||work_sql);
  DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE (
    OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema
    ,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'new'--only Captures the changed values from the source table
    ,RS_ID => 'n', ROW_ID => 'n', USER_ID => 'y', TIMESTAMP => 'y'
    ,OBJECT_ID => 'n' -- leave it as 'N' or you will have "table has no columns" error
    ,SOURCE_COLMAP => 'n', TARGET_COLMAP => 'n', OPTIONS_STRING => null
    ,SOURCE_TABLE => v_source_table, CHANGE_TABLE_NAME => v_cdc_table
    ,COLUMN_TYPE_LIST => work_sql);
  DBMS_output.put_line('Change table '||v_cdc_table ||' was created successfully');
END LOOP;
 
EXCEPTION
    WHEN OTHERS THEN
    DBMS_output.put_line('Error start ********************************************');
    DBMS_output.put_line('Error during change table ' || v_cdc_table || ' creation:');
    DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
    DBMS_output.put_line('Error end  ********************************************');
END;
END;
/

5)when we change the "test.PHILIP_TEST" and "test.PHILIP_TEST2" ,under cdc_publisher , the table "cdc_philip_test will recorde this operation and the value at once.
 
 
*****************************************************************************************
 
 
 
 
 
另附一篇比较详细的CDC原理研究:
*****************************************************************************************
oracle学习--CDC 研究(1)
http://hi.baidu.com/ybgba/blog/item/898d93366f79c8d5a2cc2b01.html
 
oracle学习--CDC 研究(2)
http://hi.baidu.com/ybgba/blog/item/988d113d7c18ddcf9e3d6201.html
 
 
 
posted on 2009-06-15 19:49 decode360 阅读(1531) 评论(0)  编辑  收藏 所属分类: 10.DB_Tools

只有注册用户登录后才能发表评论。


网站导航: