这篇文章给大家介绍抽取oracle数据到mysql数据库的实现,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql

2、建立一个目录ETL_DIR

3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql

4、导入mysql数据,文件内容如下

loaddatainfile"alarm_hist_inc.csv"intotablealarm_hist_incfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";loaddatainfile"button_authority.csv"intotablebutton_authorityfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";loaddatainfile"c3_sms_hist_inc.csv"intotablec3_sms_hist_incfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";loaddatainfile"datapermisson.csv"intotabledatapermissonfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";

附:数据库脚本P_ETL_ORA_DATA

CREATEORREPLACEPROCEDUREP_ETL_ORA_DATA(P_ORA_DIRVARCHAR2,P_DATA_PATHVARCHAR2)ISTYPET_RECISRECORD(TBNVARCHAR2(40),WHRVARCHAR2(4000));TYPET_TABSISTABLEOFT_REC;V_TABST_TABS:=T_TABS();V_ETL_DIRVARCHAR2(40):=P_ORA_DIR;V_LOAD_FILEUTL_FILE.FILE_TYPE;PROCEDUREETL_DATA(P_SQL_STMTVARCHAR2,P_DATA_PATHVARCHAR2,P_TB_NAMEVARCHAR2)ISBEGINDECLAREV_VAR_COLVARCHAR2(32767);V_NUM_COLNUMBER;V_DATE_COLDATE;V_TMZTIMESTAMP;V_COLSNUMBER;V_COLS_DESCDBMS_SQL.DESC_TAB;V_ROW_STRVARCHAR2(32767);V_COL_STRVARCHAR2(32767);V_SQL_IDNUMBER;V_SQL_REFSYS_REFCURSOR;V_EXP_FILEUTL_FILE.FILE_TYPE;V_DATA_PATHVARCHAR2(200);BEGINV_DATA_PATH:=P_DATA_PATH;IFREGEXP_SUBSTR(V_DATA_PATH,'\\$')ISNULLTHENV_DATA_PATH:=V_DATA_PATH||'\';ENDIF;V_DATA_PATH:=REPLACE(V_DATA_PATH,'\','\\');OPENV_SQL_REFFORP_SQL_STMT;V_SQL_ID:=DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF);DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID,V_COLS,V_COLS_DESC);FORIINV_COLS_DESC.FIRST..V_COLS_DESC.LASTLOOPCASEWHENV_COLS_DESC(I).COL_TYPEIN(1,9,96)THENDBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_VAR_COL,32767);WHENV_COLS_DESC(I).COL_TYPE=2THENDBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_NUM_COL);WHENV_COLS_DESC(I).COL_TYPE=12THENDBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_DATE_COL);WHENV_COLS_DESC(I).COL_TYPE=180THENDBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_TMZ);ENDCASE;ENDLOOP;DECLAREV_FLUSH_OVERPLS_INTEGER:=1;V_FILE_OVERPLS_INTEGER:=1;V_FILE_NOPLS_INTEGER:=1;V_FILE_NAMEVARCHAR2(200);V_LINEVARCHAR2(400);BEGINWHILEDBMS_SQL.FETCH_ROWS(V_SQL_ID)>0LOOPIFV_FILE_OVER=1THENV_FILE_NAME:=P_TB_NAME||'_'||V_FILE_NO||'.csv';V_EXP_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,V_FILE_NAME,OPEN_MODE=>'w',MAX_LINESIZE=>32767);ENDIF;V_ROW_STR:='';FORIIN1..V_COLSLOOPV_COL_STR:='\N';BEGINCASEWHENV_COLS_DESC(I).COL_TYPEIN(1,9,96)THENDBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_VAR_COL);IFV_VAR_COLISNOTNULLTHENV_COL_STR:='^'||V_VAR_COL||'^';ENDIF;WHENV_COLS_DESC(I).COL_TYPE=2THENDBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_NUM_COL);IFV_NUM_COLISNOTNULLTHENV_COL_STR:=V_NUM_COL;ENDIF;WHENV_COLS_DESC(I).COL_TYPE=12THENDBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_DATE_COL);IFV_DATE_COLISNOTNULLTHENV_COL_STR:='^'||TO_CHAR(V_DATE_COL,'yyyy-mm-ddhh34:mi:ss')||'^';ENDIF;WHENV_COLS_DESC(I).COL_TYPEIN(180,181,231)THENDBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_TMZ);IFV_TMZISNOTNULLTHENV_COL_STR:='^'||TO_CHAR(V_TMZ,'yyyy-mm-ddhh34:mi:ss.ff6')||'^';ENDIF;ENDCASE;IFI=1THENV_ROW_STR:=V_COL_STR;ELSEV_ROW_STR:=V_ROW_STR||','||V_COL_STR;ENDIF;END;ENDLOOP;UTL_FILE.PUT_LINE(V_EXP_FILE,CONVERT(V_ROW_STR,'UTF8'));IFV_FILE_OVER>200000/*每200000条记录就产生一个新的文件*/THENV_FILE_OVER:=1;V_FLUSH_OVER:=1;V_FILE_NO:=V_FILE_NO+1;UTL_FILE.FCLOSE(V_EXP_FILE);V_LINE:='loaddatainfile"'||V_DATA_PATH||V_FILE_NAME||'"intotable'||P_TB_NAME;V_LINE:=V_LINE||'fieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";';UTL_FILE.PUT_LINE(V_LOAD_FILE,V_LINE);UTL_FILE.FFLUSH(V_LOAD_FILE);CONTINUE;ENDIF;V_FILE_OVER:=V_FILE_OVER+1;IFV_FLUSH_OVER>2000/*每2000条记录就刷新缓存,写到文件中*/THENUTL_FILE.FFLUSH(V_EXP_FILE);V_FLUSH_OVER:=1;ELSEV_FLUSH_OVER:=V_FLUSH_OVER+1;ENDIF;ENDLOOP;DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);IFUTL_FILE.IS_OPEN(V_EXP_FILE)THENUTL_FILE.FCLOSE(V_EXP_FILE);V_LINE:='loaddatainfile"'||V_DATA_PATH||V_FILE_NAME||'"intotable'||P_TB_NAME;V_LINE:=V_LINE||'fieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";';UTL_FILE.PUT_LINE(V_LOAD_FILE,V_LINE);UTL_FILE.FFLUSH(V_LOAD_FILE);ENDIF;END;EXCEPTIONWHENOTHERSTHENIFDBMS_SQL.IS_OPEN(V_SQL_ID)THENDBMS_SQL.CLOSE_CURSOR(V_SQL_ID);ENDIF;IFUTL_FILE.IS_OPEN(V_EXP_FILE)THENUTL_FILE.FCLOSE(V_EXP_FILE);ENDIF;DBMS_OUTPUT.PUT_LINE(SQLERRM);DBMS_OUTPUT.PUT_LINE(P_SQL_STMT);END;END;BEGINBEGINEXECUTEIMMEDIATE'createtablemysql_etl_tbs(tnvarchar2(40),cnvarchar2(40),cinumber)';EXCEPTIONWHENOTHERSTHENNULL;END;EXECUTEIMMEDIATE'truncatetablemysql_etl_tbs';DECLAREV_CIPLS_INTEGER;V_CNVARCHAR2(40);V_ETL_COLSVARCHAR2(32767);V_TBNVARCHAR2(30);V_ETL_CFGVARCHAR2(32767);V_CNF_FILEUTL_FILE.FILE_TYPE;V_FROM_POSPLS_INTEGER;BEGINV_CNF_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,'ETL_TABS.CNF','r',32767);LOOPUTL_FILE.GET_LINE(V_CNF_FILE,V_ETL_CFG,32767);V_FROM_POS:=REGEXP_INSTR(V_ETL_CFG,'from',1,1,0,'i');V_ETL_COLS:=SUBSTR(V_ETL_CFG,1,V_FROM_POS-1);V_ETL_COLS:=REGEXP_SUBSTR(V_ETL_COLS,'(select)(.+)',1,1,'i',2);V_TBN:=REGEXP_SUBSTR(V_ETL_CFG,'(\s+from\s+)(\w+)(\s*)',1,1,'i',2);V_TBN:=UPPER(V_TBN);V_TABS.EXTEND();V_TABS(V_TABS.LAST).TBN:=V_TBN;V_TABS(V_TABS.LAST).WHR:=REGEXP_SUBSTR(V_ETL_CFG,'\s+where.+',1,1,'i');V_CI:=1;LOOPV_CN:=REGEXP_SUBSTR(V_ETL_COLS,'\S+',1,V_CI);EXITWHENV_CNISNULL;V_CN:=UPPER(V_CN);EXECUTEIMMEDIATE'insertintomysql_etl_tbs(tn,cn,ci)values(:1,:2,:3)'USINGV_TBN,V_CN,V_CI;COMMIT;V_CI:=V_CI+1;ENDLOOP;ENDLOOP;EXCEPTIONWHENUTL_FILE.INVALID_PATHTHENDBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"'||'"无效!');RETURN;WHENUTL_FILE.INVALID_FILENAMETHENDBMS_OUTPUT.PUT_LINE('指定的文件:"ETL_TABS.CNF'||'"无效!');RETURN;WHENNO_DATA_FOUNDTHENUTL_FILE.FCLOSE(V_CNF_FILE);WHENOTHERSTHENDBMS_OUTPUT.PUT_LINE(SQLERRM);RETURN;END;DECLAREV_CUR_MATCHSYS_REFCURSOR;V_SQL_SMTVARCHAR2(32767);V_TNVARCHAR2(40);V_CNVARCHAR2(40);V_CIPLS_INTEGER;V_COLUMN_NAMEVARCHAR2(40);V_ETL_COLSVARCHAR2(32767);V_LINEVARCHAR2(4000);V_TBNVARCHAR2(40);BEGINV_LOAD_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,'load_data.sql',OPEN_MODE=>'w',MAX_LINESIZE=>32767);FORT_IXINV_TABS.FIRST..V_TABS.LASTLOOPV_SQL_SMT:='selecttn,cn,column_name,cifrom(select*frommysql_etl_tbswheretn='':tbn:'')lleftjoinuser_tab_columnsronl.tn=r.table_nameandl.cn=r.column_nameorderbyci';V_TBN:=V_TABS(T_IX).TBN;V_SQL_SMT:=REPLACE(V_SQL_SMT,':tbn:',V_TBN);V_ETL_COLS:=NULL;OPENV_CUR_MATCHFORV_SQL_SMT;LOOPFETCHV_CUR_MATCHINTOV_TN,V_CN,V_COLUMN_NAME,V_CI;EXITWHENV_CUR_MATCH%NOTFOUND;IFV_CI>1THENV_ETL_COLS:=V_ETL_COLS||',';ENDIF;IFV_COLUMN_NAMEISNULLTHENV_ETL_COLS:=V_ETL_COLS||'cast(nullasnumber)'||V_CN;ELSEV_ETL_COLS:=V_ETL_COLS||V_CN;ENDIF;ENDLOOP;CLOSEV_CUR_MATCH;V_TBN:=LOWER(V_TBN);V_SQL_SMT:='select'||V_ETL_COLS||'from'||V_TBN||V_TABS(T_IX).WHR;ETL_DATA(V_SQL_SMT,P_DATA_PATH,V_TBN);ENDLOOP;IFUTL_FILE.IS_OPEN(V_LOAD_FILE)THENUTL_FILE.FCLOSE(V_LOAD_FILE);ENDIF;END;ENDP_ETL_ORA_DATA;

关于抽取oracle数据到mysql数据库的实现就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。