> 数据库 > Oracle >

oracle数据库同步,100万数据秒级插入

近期为了满足客户的(××电网公司)需求,先说下他们的需求,需求如下:

1.实现Ⅱ区、Ⅲ区数据库的同步,其中Ⅱ区是主数据库,Ⅲ区是需要同步的数据库。

2.两台数据库服务器之间是不能直接通讯的,因为Ⅱ、Ⅲ区之间安装了隔离装置,只能通过端口访问。

3.同步需要保证实时性,数据都是秒级的,一分钟下来至少是2万条数据。

 

看到这个需求我的第一反应估计跟大家是一样的,就是建立外连接,然后使用merger函数去定时同步,效果是最好的,但最终只能想想了,因为两台服务器根本不能通讯的。

上面的方法行不通了就得换一种方式了,后面经过和同事们讨论定下了一个方案,思路如下:

1.使用存储过程生成更新的或者插入的SQL语句,保存到表中;

2.使用后台服务定期读取表中已经生成好的SQL语句做成bat文件并压缩打包;

3.使用传输服务将Ⅱ区的文件传输到Ⅲ区指定服务器并解压到指定的目录;

4.使用Ⅲ区的同步服务调用bat文件进行数据插入

思路是没问题的,基本功能都实现了,但是问题就处在效率提不上去,同步2万数据需要一分钟以上,这显然是不符合需求的。

 

行不通就只能换一个方案了,不使用批处理来同步了,先代码来实现同步;

详细思路如下:

1.同样是使用存储过程生成指定格式的字符串(XML格式)保存到数据库中,代码如下:

1)既然是指定格式的话,肯定要模版才行,下面的代码是根据配置生成指定格式的模版

create or replace procedure P_pub_SyncData_InitTemplate is
  /*
   过程名称:P_pub_SyncData_InitTemplate
   用途:数据库同步,生成模版
   创建日期:2013-07-03 16:41:00
  */
  V_INSERT_TEMPLATE     VARCHAR2(2000);
  V_INSERT_TEMPLATE_COL VARCHAR2(1000);
  V_INSERT_TEMPLATE_VAL VARCHAR2(1000);
  V_DELETE_TEMPLATE     VARCHAR2(1000);
  V_DATA_XMLTEMPLATE    VARCHAR2(2000);
  V_DATA_TYPETEMPLATE   VARCHAR2(2000);
  V_DELETE_COND_TEMPLATE VARCHAR2(500);
  V_COL_LIST                                                    dbms_sql.Varchar2_Table;                     --字段集合
  V_COL_TYPE_LIST                                               dbms_sql.Varchar2_Table;                     --字段类型集合
  CURSOR C_SYNC_CFG IS
    SELECT upper(TABLE_NAME) TABLE_NAME,
           upper(DELETE_CONDITION) DELETE_CONDITION,
           upper(COMPARE_DATE_COL) COMPARE_DATE_COL ,
           EXEC_TYPE,
           SYNCTIME_TYPE,
           TIMETAG_TYPE
      FROM TB_PUB_SYNC_DATA_CFG_NEW;
begin
  FOR V_SYNC IN C_SYNC_CFG LOOP
    SELECT COLUMN_NAME,DATA_TYPE bulk collect into V_COL_LIST,V_COL_TYPE_LIST from USER_TAB_COLUMNS where table_name=V_SYNC.TABLE_NAME;
     --数据插入 字段
     V_INSERT_TEMPLATE_COL := 'INSERT INTO '|| V_SYNC.TABLE_NAME||'(';
     --数据插入  值
     V_INSERT_TEMPLATE_VAL := ' VALUES(';
     --如果是 是先清空表中的数据  使用truncate  提高效率
     IF V_SYNC.EXEC_TYPE=1 THEN
       V_DELETE_TEMPLATE :='TRUNCATE TABLE '||V_SYNC.TABLE_NAME;
     ELSE
       --删除模版
       V_DELETE_TEMPLATE     :='DELETE '||V_SYNC.TABLE_NAME||' WHERE 1=1 ';
     END IF;
     --XML中的数据行模版 
     V_DATA_XMLTEMPLATE    :='SELECT ''<'||V_SYNC.TABLE_NAME||'DataRow ';
     --表格 列的数据类型模版
     V_DATA_TYPETEMPLATE   :='<'||V_SYNC.TABLE_NAME||'Coltype';
     --数据删除时  条件列模版
     V_DELETE_COND_TEMPLATE:='<'||V_SYNC.TABLE_NAME||'DelCol DeleteCol="'||V_SYNC.DELETE_CONDITION||'"></'||V_SYNC.TABLE_NAME||'DelCol>';
     FOR  I IN 1..V_COL_LIST.COUNT LOOP
       V_INSERT_TEMPLATE_COL :=V_INSERT_TEMPLATE_COL||V_COL_LIST(I)||',';
       V_INSERT_TEMPLATE_VAL :=V_INSERT_TEMPLATE_VAL||':'||V_COL_LIST(I)||',';
       V_DATA_TYPETEMPLATE   :=V_DATA_TYPETEMPLATE||' '||V_COL_LIST(I)||'="'||initcap(V_COL_TYPE_LIST(I))||'"';
       V_DATA_XMLTEMPLATE    :=V_DATA_XMLTEMPLATE ||' '||V_COL_LIST(I)||'="'||FUN_PUB_SYNC_TEMPLATE_FORMAT(V_COL_LIST(I),V_COL_TYPE_LIST(I))||'"';
      IF V_SYNC.EXEC_TYPE=0 AND INSTR(V_SYNC.DELETE_CONDITION,V_COL_LIST(I))>0 THEN
       V_DELETE_TEMPLATE     :=V_DELETE_TEMPLATE||' AND '||V_COL_LIST(I)||'=:'||V_COL_LIST(I);
      END IF;
     END LOOP;
     V_INSERT_TEMPLATE :=substr(V_INSERT_TEMPLATE_COL,0,length(V_INSERT_TEMPLATE_COL)-1)||') '||substr(V_INSERT_TEMPLATE_VAL,0,length(V_INSERT_TEMPLATE_VAL)-1)||')';
     V_DATA_TYPETEMPLATE :=V_DATA_TYPETEMPLATE||'></'||V_SYNC.TABLE_NAME||'Coltype>';
     V_DATA_XMLTEMPLATE  :=V_DATA_XMLTEMPLATE||'></'||V_SYNC.TABLE_NAME||'DataRow>'' FROM '||V_SYNC.TABLE_NAME||' WHERE 1=1';

  --生成查询条件模版
  --不区分机组
  IF V_SYNC.TIMETAG_TYPE=1 AND V_SYNC.COMPARE_DATE_COL IS NOT NULL then
    V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND '||V_SYNC.COMPARE_DATE_COL|| '>{BEGIN_DATE} AND '||V_SYNC.COMPARE_DATE_COL||'<={END_DATE}';
  --区分机组
  ELSIF V_SYNC.TIMETAG_TYPE=0 AND V_SYNC.COMPARE_DATE_COL IS NOT NULL THEN
   V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND '||V_SYNC.COMPARE_DATE_COL|| '>{BEGIN_DATE} AND '||V_SYNC.COMPARE_DATE_COL||'<={END_DATE} AND SET_CODE={SET_CODE} AND FACTORY_CODE={FACTORY_CODE}';
  ELSIF V_SYNC.TIMETAG_TYPE=0 THEN
   V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND SET_CODE={SET_CODE} AND FACTORY_CODE={FACTORY_CODE}';
  END IF;
  UPDATE TB_PUB_SYNC_DATA_CFG_NEW SET insert_template=V_INSERT_TEMPLATE,delete_template=V_DELETE_TEMPLATE,DATA_XMLTEMPLATE=V_DATA_XMLTEMPLATE,DELETE_COND_TEMPLATE=V_DELETE_TEMPLATE,DATA_TYPE_TEMPLATE=V_DATA_TYPETEMPLATE WHERE UPPER(TABLE_NAME)=V_SYNC.TABLE_NAME;
  END LOOP;
  
  --插入数据更新状态标签
   MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (select 'SYNCSTATUSTAG' TABLE_NAME FROM DUAL) T1 ON (T.TABLE_NAME=T1.TABLE_NAME)
   WHEN NOT MATCHED THEN
   INSERT VALUES('SYNCSTATUSTAG','0','0',SYSDATE,SYSDATE,SYSDATE);  
   
   COMMIT;
   EXCEPTION WHEN OTHERS THEN
   dbms_output.put_line(substr(sqlerrm,0,200));
   rollback;
end P_pub_SyncData_InitTemplate;

 

2)根据生成的模版去生成数据,代码如下:

create or replace procedure P_PUB_JOB_DATA_SYNC_New
/*
 过程名称:P_JOB_PUB_DATA_SYNC
 用途:数据库同步,生成数据脚本
 创建人:颜显斌
 创建日期:2013-07-03 16:41:00
*/
 is                                                        
  TYPE                       Mycursor_Type is ref cursor;                                     --游标类型变量
  CUR_SQL                    Mycursor_Type;                                                   --游标变量
  V_SQL                      VARCHAR2(2000);
  V_SYNC_TIME                date;
  V_FACTORY_CODE_LIST        dbms_sql.Varchar2_Table;
  V_SET_CODE_LIST            dbms_sql.Varchar2_Table;
  V_BEGIN_DATE               DATE;
  V_END_DATE                 DATE;
  V_TEMP_SQL                 VARCHAR2(2000);
  v_orderno                  integer:=0;                                        --记录行数
  v_count                    integer;
  v_xmltemp                  varchar2(2000);
  v_exectype                 varchar2(30);
  v_commsql                  varchar2(200);
  CURSOR C_DataSyncCfg IS
  SELECT upper(TABLE_NAME) TABLE_NAME,
           upper(DELETE_CONDITION) DELETE_CONDITION,
           upper(COMPARE_DATE_COL) COMPARE_DATE_COL ,
           insert_template,
           delete_template,
           data_xmltemplate,
           delete_cond_template,
           data_type_template,
           sync_type,
           IS_SYNC,
           EXEC_TYPE,
           SYNCTIME_TYPE,
           TIMETAG_TYPE,
           order_no
      FROM TB_PUB_SYNC_DATA_CFG_NEW where is_sync=1;

 begin
   --为了保证Xml的完整性,当表中还有数据的时候不再插入,等待后台服务读取完成再进行写入
   select count(0) into v_count from tb_pub_sync_data_sql;
   if v_count>0 then
     return;
   end if;
   --更新同步数据的状态标签,0:正在生成数据;1:数据生成完成后台可以调用(目的:保证XML文件的完整性)
   execute immediate 'update TB_PUB_SYNC_TIMESEG_TAG set factory_code=0,set_code=0,begin_time=sysdate,end_time=sysdate,update_time=sysdate where table_name=''SYNCSTATUSTAG''';
   --使用绑定变量  公用的插入数据SQL
   v_commsql :='INSERT INTO TB_PUB_SYNC_DATA_SQL (table_name, SQL_CONTENT, UDPATE_TIME,ORDER_NO)  VALUES (:x,:x,:x,:x)';
   --插入Xml头和根节点
   execute immediate v_commsql using '','<?xml version="1.0" encoding="utf-8" ?><XMLROOT>',sysdate,v_orderno;
   v_orderno :=v_orderno+1;
   --得到配置表中的信息
   for V_DataSyncCfg in C_DataSyncCfg 
   LOOP
     --拼凑程序处理中需要的  执行类型
     if V_DataSyncCfg.EXEC_TYPE=1 then
       v_exectype := 'TruncateAndInsert';
     elsif V_DataSyncCfg.EXEC_TYPE=0 then
       v_exectype := 'DeleteAndInsert';
     elsif V_DataSyncCfg.EXEC_TYPE=2 then
       v_exectype := 'Insert';
     else
       v_exectype := 'DeleteAndInsert';
     end if;
     --插入表的模版信息
     v_xmltemp := '<'||V_DataSyncCfg.TABLE_NAME||'  InsertTemplate="'||V_DataSyncCfg.insert_template||'"  DeleteTemplate="'||V_DataSyncCfg.delete_template||'" OrdernNo="'||V_DataSyncCfg.order_no||'" ExecType="'||v_exectype||'">';
     execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
     v_orderno :=v_orderno+1;
     --插入删除条件模版
     v_xmltemp := '<'||V_DataSyncCfg.TABLE_NAME||'DelCol DeleteCol="'||V_DataSyncCfg.DELETE_CONDITION||'"></'||V_DataSyncCfg.TABLE_NAME||'DelCol>';
     execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
     v_orderno :=v_orderno+1;
    --插入数据类型模版
     v_xmltemp := V_DataSyncCfg.data_type_template;
     execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
     v_orderno :=v_orderno+1;
   --时间标签不区分机组
   -------------------------------BEGIN---------------------------------------------------------------------------------
   ----------------------不区分机组的时间标签开始----------------------------------------------------------------------
   --------------------------------------------------------------------------------------------------------------------
   IF V_DataSyncCfg.TIMETAG_TYPE=1 THEN
   BEGIN
   --获取上次更新结束时间
   BEGIN
    SELECT END_TIME INTO V_BEGIN_DATE FROM TB_PUB_SYNC_TIMESEG_TAG WHERE TABLE_NAME=V_DataSyncCfg.TABLE_NAME;
    EXCEPTION WHEN NO_DATA_FOUND THEN
    V_BEGIN_DATE := SYSDATE-365;
   END;
   --如果时间类型为系统时间
   IF V_DataSyncCfg.SYNCTIME_TYPE=0 THEN
    V_END_DATE := SYSDATE;
   --如果时间类型为数据时间
   ELSE
   EXECUTE IMMEDIATE 'SELECT MAX('||V_DataSyncCfg.COMPARE_DATE_COL||') FROM '||V_DataSyncCfg.TABLE_NAME  INTO V_END_DATE;
   END IF;

   IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE THEN
     GOTO LABEL_NEXT_TABLE_SYNC;
   END IF;

   V_TEMP_SQL  :=V_DataSyncCfg.data_xmltemplate;
   V_TEMP_SQL  :=REPLACE(V_TEMP_SQL,'{BEGIN_DATE}','to_date('''||to_char(V_BEGIN_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
   V_TEMP_SQL  :=REPLACE(V_TEMP_SQL,'{END_DATE}','to_date('''||to_char(V_END_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
   --dbms_output.put_line(V_TEMP_SQL);
   ------------------------------BEGIN----------------------------------------------------------------------------------
   ----------------------开始执行动态游标----------------------------------------------------------------------
   --------------------------------------------------------------------------------------------------------------------
   --动态执行模版SQL
   OPEN CUR_SQL FOR V_TEMP_SQL;
   LOOP
   EXIT WHEN CUR_SQL%NOTFOUND;
   FETCH CUR_SQL  INTO V_SQL;
   IF V_SQL IS NOT NULL THEN
   --将结果写入表中
   V_SYNC_TIME := SYSDATE;
   execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,V_SQL,V_SYNC_TIME,v_orderno;
   v_orderno :=v_orderno+1;
   V_SQL := NULL;
   END IF;
   END LOOP;
   --------------------------------------------------------------------------------------------------------------------
   ----------------------结束执行动态游标----------------------------------------------------------------------
   ---------------------------END--------------------------------------------------------------------------------------
   MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (SELECT V_DataSyncCfg.TABLE_NAME TABLE_NAME FROM DUAL) T1
   ON (T.TABLE_NAME=T1.TABLE_NAME)
   WHEN MATCHED THEN
   UPDATE SET END_TIME=V_END_DATE,BEGIN_TIME=V_BEGIN_DATE,UPDATE_TIME=SYSDATE
   WHEN NOT MATCHED THEN
   INSERT VALUES(V_DataSyncCfg.TABLE_NAME,'','',V_BEGIN_DATE,V_END_DATE,SYSDATE);
   END;
   --------------------------------------------------------------------------------------------------------------------
   ----------------------不区分机组的时间标签结束----------------------------------------------------------
   ----------------------------END-------------------------------------------------------------------------------

 

   -------------------------------BEGIN-------------------------------------------------------------------------------
   ----------------------时间标签区分机组开始----------------------------------------------------------------------
   --------------------------------------------------------------------------------------------------------------------
   --时间标签区分机组,这种是出现表中没有UPDATE_TIME的情况下
    ELSE
    BEGIN
    SELECT FACTORY_CODE, SET_CODE BULK COLLECT INTO V_FACTORY_CODE_LIST, V_SET_CODE_LIST FROM V_PUB_SET WHERE IS_VIRTUAL<>1 ORDER BY FACTORY_CODE, SET_CODE desc;

   ----------------------------BEGIN-----------------------------------------------------------------------------------
   ----------------------循环机组列表开始----------------------------------------------------------------------
   --------------------------------------------------------------------------------------------------------------------
    --循环机组列表
    FOR I IN 1..V_SET_CODE_LIST.COUNT LOOP
    --获取开始时间
    BEGIN
    SELECT END_TIME INTO V_BEGIN_DATE FROM TB_PUB_SYNC_TIMESEG_TAG WHERE TABLE_NAME=V_DataSyncCfg.TABLE_NAME AND SET_CODE=V_SET_CODE_LIST(I) AND FACTORY_CODE=V_FACTORY_CODE_LIST(I);
    EXCEPTION WHEN NO_DATA_FOUND THEN
    V_BEGIN_DATE := SYSDATE-365;
    WHEN OTHERS THEN
    GOTO LABEL_NEXT_SET;
    END;
    --如果时间类型为系统时间
    IF V_DataSyncCfg.SYNCTIME_TYPE=0 THEN
    V_END_DATE := SYSDATE;
    --如果时间类型为数据时间
    ELSE
    BEGIN
    EXECUTE IMMEDIATE 'SELECT MAX('||V_DataSyncCfg.COMPARE_DATE_COL||')  FROM '||V_DataSyncCfg.TABLE_NAME||' WHERE SET_CODE='||V_SET_CODE_LIST(I)||'
    AND FACTORY_CODE='''||V_FACTORY_CODE_LIST(I)||''''  INTO V_END_DATE;
    EXCEPTION WHEN OTHERS THEN
    GOTO LABEL_NEXT_SET;
    END;
    END IF;

    IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE  THEN
     GOTO LABEL_NEXT_SET;
    END IF;

    V_TEMP_SQL :=V_DataSyncCfg.data_xmltemplate;
    V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{BEGIN_DATE}','to_date('''||to_char(V_BEGIN_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
    V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{END_DATE}','to_date('''||to_char(V_END_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
    V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{FACTORY_CODE}',''''||V_FACTORY_CODE_LIST(I)||'''');
    V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{SET_CODE}',''''||V_SET_CODE_LIST(I)||'''');

    --dbms_output.put_line(V_TEMP_SQL);
   -------------------------------BEGIN--------------------------------------------------------------------------------
   ----------------------开始执行动态游标----------------------------------------------------------------------
   --------------------------------------------------------------------------------------------------------------------
    --开始执行动态游标
    OPEN CUR_SQL FOR V_TEMP_SQL;
    LOOP
    EXIT WHEN CUR_SQL%NOTFOUND;
    FETCH CUR_SQL  INTO V_SQL;
    IF V_SQL IS NOT NULL THEN
    --将结果写入表中
    V_SYNC_TIME := SYSDATE;
    execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,V_SQL,V_SYNC_TIME,v_orderno;
    v_orderno :=v_orderno+1;
    V_SQL := NULL;
    END IF;
    END LOOP;
   --------------------------------------------------------------------------------------------------------------------
   ----------------------结束执行动态游标----------------------------------------------------------------------
   -----------------------------END------------------------------------------------------------------------------------
    MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (SELECT V_DataSyncCfg.TABLE_NAME TABLE_NAME,V_FACTORY_CODE_LIST(I) FACTORY_CODE,V_SET_CODE_LIST(I) SET_CODE FROM DUAL) T1
    ON (T.TABLE_NAME=T1.TABLE_NAME AND T.FACTORY_CODE=T1.FACTORY_CODE AND T.SET_CODE=T1.SET_CODE)
    WHEN MATCHED THEN
    UPDATE SET END_TIME=V_END_DATE,BEGIN_TIME=V_BEGIN_DATE,UPDATE_TIME=SYSDATE
    WHEN NOT MATCHED THEN
    INSERT VALUES(V_DataSyncCfg.TABLE_NAME,V_FACTORY_CODE_LIST(I),V_SET_CODE_LIST(I),V_BEGIN_DATE,V_END_DATE,SYSDATE);
     --每个机组提交一次
    <<LABEL_NEXT_SET>>
    NULL;
    END LOOP;
   --------------------------------------------------------------------------------------------------------------------
   ----------------------循环机组列表结束----------------------------------------------------------------------
   -----------------------------END-----------------------------------------------------------------------------------

    END;
    END IF;
    --------------------------------------------------------------------------------------------------------------------
    ----------------------时间标签区分机组结束----------------------------------------------------------
    ------------------------------END-----------------------------------------------------------------------------------

    --插入表接点结束符
    execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,'</'||V_DataSyncCfg.TABLE_NAME||'>',sysdate,v_orderno;
    v_orderno:=v_orderno+1;
    --如果是手动同步,则同步一次即修改同步标识,标识为不同步
    IF V_DataSyncCfg.SYNC_TYPE=0 AND V_DataSyncCfg.IS_SYNC=1 THEN
    UPDATE TB_PUB_SYNC_DATA_CFG_NEW SET IS_SYNC=0 WHERE upper(TABLE_NAME)=upper(V_DataSyncCfg.TABLE_NAME);
    END IF;
    <<LABEL_NEXT_TABLE_SYNC>>
     null;
    END LOOP;
    --插入根节点结束符
    execute immediate v_commsql using '','</XMLROOT>',sysdate,v_orderno;
    --更新数据同步状态为‘1’已完成,这时后台服务可以调用
    execute immediate 'update TB_PUB_SYNC_TIMESEG_TAG set factory_code=1,set_code=1,begin_time=sysdate,end_time=sysdate,update_time=sysdate where table_name=''SYNCSTATUSTAG''';    
    COMMIT;
   EXCEPTION WHEN OTHERS THEN
   ROLLBACK;
 END P_PUB_JOB_DATA_SYNC_New;

 

上面列出的就是数据库的两个核心存储过程,使用job定时去执行P_PUB_JOB_DATA_SYNC_New

然后使用查询是,用tb_pub_sync_data_sql表中的order_no字段 排序,这样能保证XML的完整性

3)使用后台服务定时读取tb_pub_sync_data_sql中的数据去生成文件,生成的文件格式如下:

<?xml version="1.0" encoding="utf-8" ?>

<XMLROOT>

--该节点保存表的插入和删除模版以及更新方式

<表名  InsertTemplate=""  DelteTemplate=""  ExecType=""  Order_No="">

--该节点保存,删除时需要用那些字段来做条件,多个字段用逗号隔开,该节点每张表只会写一次

<表名DelCol  DeleteCol="col1,col2,col3"></表名DelCol DelCol>

--该节点保存表中各字段的数据类型,用于程序进行数据转换,该节点每张表只会写一次

<表名ColType col1="Varchar2" col2="Number" col3="Varchar2"></表名ColType>

--该节点用于保存数据,一条数据对应一个节点

<表名DataRow col1="12" col2="23" col3="45"></表名DataRow>

</表名>

</XMLROOT>

3)将文件压缩打包,然后通过传输服务将文件传送到Ⅲ区服务器指定的目录

4)Ⅲ区服务器上的数据同步服务,读取指定目录下的XML文件进行解析,转换成目标格式,下面贴出主要的代码

代码如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using DataSyncSvr.DataEntity;
using Oracle.DataAccess.Client;
using DataSyncSvr.DataParse;
using Publish.Log;
using System.IO;
using System.Data;

namespace DataSyncSvr
{
    public class DataSyncMain
    {
        public ILogger mlog;
        private OracleConnection conn;
        public void DataSyncStart()
        {
            ParseXmlToDataEntity ParseEntity = new ParseXmlToDataEntity();
            Dictionary<string, Dictionary<string, DataSyncDataEntity>> FileList = ParseEntity.GetDataEnetityList();
            Dictionary<string, DataSyncDataEntity> EntityDataList;
            conn = new OracleConnection(SyncConst.Connstr);
            if (conn.State == ConnectionState.Closed)
            {
                conn.Open();
            }
            DataSyncDataEntity EntityData;
            if (FileList == null || FileList.Keys.Count < 1) return;
            foreach(string filekey in FileList.Keys)
            {
                EntityDataList=FileList[filekey];
                foreach (string key in EntityDataList.Keys)
                {
                    try
                    {
                        EntityData = EntityDataList[key];
                        mlog.LogInfo("开始同步表:\"" + key + "\"");
                        if (EntityData.ExecuteType == ExecType.DeleteAndInsert || EntityData.ExecuteType == ExecType.TruncateAndInsert)
                        {
                            //先执行删除操作
                            AddExecParamAndRun(EntityData, ExecType.Delete);
                            //再执行插入操作
                            AddExecParamAndRun(EntityData, ExecType.Insert);
                        }
                        else
                        {
                            AddExecParamAndRun(EntityData, ExecType.Insert);
                        }
                        mlog.LogInfo("\"" + key + "\"表数据同步完成,受影响行数:(" + EntityData.RecordCount + ")行");
                       
                    }
                    catch (Exception ex)
                    {
                        mlog.LogInfo(ex.ToString());
                        break;
                    }
                }
                //删除文件
                if (File.Exists(filekey))
                {
                    File.Delete(filekey);
                    mlog.LogInfo("文件:\"" + filekey + "\"已删除");
                }
            }
            conn.Close();
        }
        private void AddExecParamAndRun(DataSyncDataEntity EntityData, ExecType excutetype)
        {
            OracleParameter Param;
            string[] columns = EntityData.InsertCol;
            string ExecTemplate = EntityData.InsertTemplate;
            if (excutetype.Equals(ExecType.Delete))
            {
                columns = EntityData.DeleteCol;
                ExecTemplate = EntityData.DeleteTemplate;
            }
            OracleCommand command = new OracleCommand();
            command.Connection = conn;
            command.CommandText = ExecTemplate;
            command.ArrayBindCount = EntityData.TableData[EntityData.InsertCol[0]].Length;
            command.CommandTimeout = 600;
            if (columns != null && columns.Length > 0)
            {
                foreach (string colname in columns)
                {
                    if (!string.IsNullOrEmpty(colname))
                    {
                        Param = new OracleParameter(colname, EntityData.DataType[colname]);
                        Param.Direction = ParameterDirection.Input;
                        Param.Value = EntityData.TableData[colname];
                        command.Parameters.Add(Param);
                    }
                }
            }
            command.ExecuteNonQuery();
        }
    }
}

注意:使用了Oracle.DataAccess.dll动态库,这个可以去官网下载。

写了这么久也累了,下面就贴上我上传的源码地址吧,http://download.csdn.net/detail/yanxianbin1989/5970217  欢迎各位高手前来交流,相互学习,共同进步。

(责任编辑:IT)