oracle数据库同步,100万数据秒级插入
时间:2015-01-06 00:30 来源:csdn.net 作者:yanxianbin1989
近期为了满足客户的(××电网公司)需求,先说下他们的需求,需求如下:
1.实现Ⅱ区、Ⅲ区数据库的同步,其中Ⅱ区是主数据库,Ⅲ区是需要同步的数据库。
2.两台数据库服务器之间是不能直接通讯的,因为Ⅱ、Ⅲ区之间安装了隔离装置,只能通过端口访问。
3.同步需要保证实时性,数据都是秒级的,一分钟下来至少是2万条数据。
看到这个需求我的第一反应估计跟大家是一样的,就是建立外连接,然后使用merger函数去定时同步,效果是最好的,但最终只能想想了,因为两台服务器根本不能通讯的。
上面的方法行不通了就得换一种方式了,后面经过和同事们讨论定下了一个方案,思路如下:
1.使用存储过程生成更新的或者插入的SQL语句,保存到表中;
2.使用后台服务定期读取表中已经生成好的SQL语句做成bat文件并压缩打包;
3.使用传输服务将Ⅱ区的文件传输到Ⅲ区指定服务器并解压到指定的目录;
4.使用Ⅲ区的同步服务调用bat文件进行数据插入
思路是没问题的,基本功能都实现了,但是问题就处在效率提不上去,同步2万数据需要一分钟以上,这显然是不符合需求的。
行不通就只能换一个方案了,不使用批处理来同步了,先代码来实现同步;
详细思路如下:
1.同样是使用存储过程生成指定格式的字符串(XML格式)保存到数据库中,代码如下:
1)既然是指定格式的话,肯定要模版才行,下面的代码是根据配置生成指定格式的模版
create or replace procedure P_pub_SyncData_InitTemplate is
/*
过程名称:P_pub_SyncData_InitTemplate
用途:数据库同步,生成模版
创建日期:2013-07-03 16:41:00
*/
V_INSERT_TEMPLATE VARCHAR2(2000);
V_INSERT_TEMPLATE_COL VARCHAR2(1000);
V_INSERT_TEMPLATE_VAL VARCHAR2(1000);
V_DELETE_TEMPLATE VARCHAR2(1000);
V_DATA_XMLTEMPLATE VARCHAR2(2000);
V_DATA_TYPETEMPLATE VARCHAR2(2000);
V_DELETE_COND_TEMPLATE VARCHAR2(500);
V_COL_LIST dbms_sql.Varchar2_Table; --字段集合
V_COL_TYPE_LIST dbms_sql.Varchar2_Table; --字段类型集合
CURSOR C_SYNC_CFG IS
SELECT upper(TABLE_NAME) TABLE_NAME,
upper(DELETE_CONDITION) DELETE_CONDITION,
upper(COMPARE_DATE_COL) COMPARE_DATE_COL ,
EXEC_TYPE,
SYNCTIME_TYPE,
TIMETAG_TYPE
FROM TB_PUB_SYNC_DATA_CFG_NEW;
begin
FOR V_SYNC IN C_SYNC_CFG LOOP
SELECT COLUMN_NAME,DATA_TYPE bulk collect into V_COL_LIST,V_COL_TYPE_LIST from USER_TAB_COLUMNS where table_name=V_SYNC.TABLE_NAME;
--数据插入 字段
V_INSERT_TEMPLATE_COL := 'INSERT INTO '|| V_SYNC.TABLE_NAME||'(';
--数据插入 值
V_INSERT_TEMPLATE_VAL := ' VALUES(';
--如果是 是先清空表中的数据 使用truncate 提高效率
IF V_SYNC.EXEC_TYPE=1 THEN
V_DELETE_TEMPLATE :='TRUNCATE TABLE '||V_SYNC.TABLE_NAME;
ELSE
--删除模版
V_DELETE_TEMPLATE :='DELETE '||V_SYNC.TABLE_NAME||' WHERE 1=1 ';
END IF;
--XML中的数据行模版
V_DATA_XMLTEMPLATE :='SELECT ''<'||V_SYNC.TABLE_NAME||'DataRow ';
--表格 列的数据类型模版
V_DATA_TYPETEMPLATE :='<'||V_SYNC.TABLE_NAME||'Coltype';
--数据删除时 条件列模版
V_DELETE_COND_TEMPLATE:='<'||V_SYNC.TABLE_NAME||'DelCol DeleteCol="'||V_SYNC.DELETE_CONDITION||'"></'||V_SYNC.TABLE_NAME||'DelCol>';
FOR I IN 1..V_COL_LIST.COUNT LOOP
V_INSERT_TEMPLATE_COL :=V_INSERT_TEMPLATE_COL||V_COL_LIST(I)||',';
V_INSERT_TEMPLATE_VAL :=V_INSERT_TEMPLATE_VAL||':'||V_COL_LIST(I)||',';
V_DATA_TYPETEMPLATE :=V_DATA_TYPETEMPLATE||' '||V_COL_LIST(I)||'="'||initcap(V_COL_TYPE_LIST(I))||'"';
V_DATA_XMLTEMPLATE :=V_DATA_XMLTEMPLATE ||' '||V_COL_LIST(I)||'="'||FUN_PUB_SYNC_TEMPLATE_FORMAT(V_COL_LIST(I),V_COL_TYPE_LIST(I))||'"';
IF V_SYNC.EXEC_TYPE=0 AND INSTR(V_SYNC.DELETE_CONDITION,V_COL_LIST(I))>0 THEN
V_DELETE_TEMPLATE :=V_DELETE_TEMPLATE||' AND '||V_COL_LIST(I)||'=:'||V_COL_LIST(I);
END IF;
END LOOP;
V_INSERT_TEMPLATE :=substr(V_INSERT_TEMPLATE_COL,0,length(V_INSERT_TEMPLATE_COL)-1)||') '||substr(V_INSERT_TEMPLATE_VAL,0,length(V_INSERT_TEMPLATE_VAL)-1)||')';
V_DATA_TYPETEMPLATE :=V_DATA_TYPETEMPLATE||'></'||V_SYNC.TABLE_NAME||'Coltype>';
V_DATA_XMLTEMPLATE :=V_DATA_XMLTEMPLATE||'></'||V_SYNC.TABLE_NAME||'DataRow>'' FROM '||V_SYNC.TABLE_NAME||' WHERE 1=1';
--生成查询条件模版
--不区分机组
IF V_SYNC.TIMETAG_TYPE=1 AND V_SYNC.COMPARE_DATE_COL IS NOT NULL then
V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND '||V_SYNC.COMPARE_DATE_COL|| '>{BEGIN_DATE} AND '||V_SYNC.COMPARE_DATE_COL||'<={END_DATE}';
--区分机组
ELSIF V_SYNC.TIMETAG_TYPE=0 AND V_SYNC.COMPARE_DATE_COL IS NOT NULL THEN
V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND '||V_SYNC.COMPARE_DATE_COL|| '>{BEGIN_DATE} AND '||V_SYNC.COMPARE_DATE_COL||'<={END_DATE} AND SET_CODE={SET_CODE} AND FACTORY_CODE={FACTORY_CODE}';
ELSIF V_SYNC.TIMETAG_TYPE=0 THEN
V_DATA_XMLTEMPLATE := V_DATA_XMLTEMPLATE||' AND SET_CODE={SET_CODE} AND FACTORY_CODE={FACTORY_CODE}';
END IF;
UPDATE TB_PUB_SYNC_DATA_CFG_NEW SET insert_template=V_INSERT_TEMPLATE,delete_template=V_DELETE_TEMPLATE,DATA_XMLTEMPLATE=V_DATA_XMLTEMPLATE,DELETE_COND_TEMPLATE=V_DELETE_TEMPLATE,DATA_TYPE_TEMPLATE=V_DATA_TYPETEMPLATE WHERE UPPER(TABLE_NAME)=V_SYNC.TABLE_NAME;
END LOOP;
--插入数据更新状态标签
MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (select 'SYNCSTATUSTAG' TABLE_NAME FROM DUAL) T1 ON (T.TABLE_NAME=T1.TABLE_NAME)
WHEN NOT MATCHED THEN
INSERT VALUES('SYNCSTATUSTAG','0','0',SYSDATE,SYSDATE,SYSDATE);
COMMIT;
EXCEPTION WHEN OTHERS THEN
dbms_output.put_line(substr(sqlerrm,0,200));
rollback;
end P_pub_SyncData_InitTemplate;
2)根据生成的模版去生成数据,代码如下:
create or replace procedure P_PUB_JOB_DATA_SYNC_New
/*
过程名称:P_JOB_PUB_DATA_SYNC
用途:数据库同步,生成数据脚本
创建人:颜显斌
创建日期:2013-07-03 16:41:00
*/
is
TYPE Mycursor_Type is ref cursor; --游标类型变量
CUR_SQL Mycursor_Type; --游标变量
V_SQL VARCHAR2(2000);
V_SYNC_TIME date;
V_FACTORY_CODE_LIST dbms_sql.Varchar2_Table;
V_SET_CODE_LIST dbms_sql.Varchar2_Table;
V_BEGIN_DATE DATE;
V_END_DATE DATE;
V_TEMP_SQL VARCHAR2(2000);
v_orderno integer:=0; --记录行数
v_count integer;
v_xmltemp varchar2(2000);
v_exectype varchar2(30);
v_commsql varchar2(200);
CURSOR C_DataSyncCfg IS
SELECT upper(TABLE_NAME) TABLE_NAME,
upper(DELETE_CONDITION) DELETE_CONDITION,
upper(COMPARE_DATE_COL) COMPARE_DATE_COL ,
insert_template,
delete_template,
data_xmltemplate,
delete_cond_template,
data_type_template,
sync_type,
IS_SYNC,
EXEC_TYPE,
SYNCTIME_TYPE,
TIMETAG_TYPE,
order_no
FROM TB_PUB_SYNC_DATA_CFG_NEW where is_sync=1;
begin
--为了保证Xml的完整性,当表中还有数据的时候不再插入,等待后台服务读取完成再进行写入
select count(0) into v_count from tb_pub_sync_data_sql;
if v_count>0 then
return;
end if;
--更新同步数据的状态标签,0:正在生成数据;1:数据生成完成后台可以调用(目的:保证XML文件的完整性)
execute immediate 'update TB_PUB_SYNC_TIMESEG_TAG set factory_code=0,set_code=0,begin_time=sysdate,end_time=sysdate,update_time=sysdate where table_name=''SYNCSTATUSTAG''';
--使用绑定变量 公用的插入数据SQL
v_commsql :='INSERT INTO TB_PUB_SYNC_DATA_SQL (table_name, SQL_CONTENT, UDPATE_TIME,ORDER_NO) VALUES (:x,:x,:x,:x)';
--插入Xml头和根节点
execute immediate v_commsql using '','<?xml version="1.0" encoding="utf-8" ?><XMLROOT>',sysdate,v_orderno;
v_orderno :=v_orderno+1;
--得到配置表中的信息
for V_DataSyncCfg in C_DataSyncCfg
LOOP
--拼凑程序处理中需要的 执行类型
if V_DataSyncCfg.EXEC_TYPE=1 then
v_exectype := 'TruncateAndInsert';
elsif V_DataSyncCfg.EXEC_TYPE=0 then
v_exectype := 'DeleteAndInsert';
elsif V_DataSyncCfg.EXEC_TYPE=2 then
v_exectype := 'Insert';
else
v_exectype := 'DeleteAndInsert';
end if;
--插入表的模版信息
v_xmltemp := '<'||V_DataSyncCfg.TABLE_NAME||' InsertTemplate="'||V_DataSyncCfg.insert_template||'" DeleteTemplate="'||V_DataSyncCfg.delete_template||'" OrdernNo="'||V_DataSyncCfg.order_no||'" ExecType="'||v_exectype||'">';
execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
v_orderno :=v_orderno+1;
--插入删除条件模版
v_xmltemp := '<'||V_DataSyncCfg.TABLE_NAME||'DelCol DeleteCol="'||V_DataSyncCfg.DELETE_CONDITION||'"></'||V_DataSyncCfg.TABLE_NAME||'DelCol>';
execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
v_orderno :=v_orderno+1;
--插入数据类型模版
v_xmltemp := V_DataSyncCfg.data_type_template;
execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,v_xmltemp,sysdate,v_orderno;
v_orderno :=v_orderno+1;
--时间标签不区分机组
-------------------------------BEGIN---------------------------------------------------------------------------------
----------------------不区分机组的时间标签开始----------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
IF V_DataSyncCfg.TIMETAG_TYPE=1 THEN
BEGIN
--获取上次更新结束时间
BEGIN
SELECT END_TIME INTO V_BEGIN_DATE FROM TB_PUB_SYNC_TIMESEG_TAG WHERE TABLE_NAME=V_DataSyncCfg.TABLE_NAME;
EXCEPTION WHEN NO_DATA_FOUND THEN
V_BEGIN_DATE := SYSDATE-365;
END;
--如果时间类型为系统时间
IF V_DataSyncCfg.SYNCTIME_TYPE=0 THEN
V_END_DATE := SYSDATE;
--如果时间类型为数据时间
ELSE
EXECUTE IMMEDIATE 'SELECT MAX('||V_DataSyncCfg.COMPARE_DATE_COL||') FROM '||V_DataSyncCfg.TABLE_NAME INTO V_END_DATE;
END IF;
IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE THEN
GOTO LABEL_NEXT_TABLE_SYNC;
END IF;
V_TEMP_SQL :=V_DataSyncCfg.data_xmltemplate;
V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{BEGIN_DATE}','to_date('''||to_char(V_BEGIN_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{END_DATE}','to_date('''||to_char(V_END_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
--dbms_output.put_line(V_TEMP_SQL);
------------------------------BEGIN----------------------------------------------------------------------------------
----------------------开始执行动态游标----------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
--动态执行模版SQL
OPEN CUR_SQL FOR V_TEMP_SQL;
LOOP
EXIT WHEN CUR_SQL%NOTFOUND;
FETCH CUR_SQL INTO V_SQL;
IF V_SQL IS NOT NULL THEN
--将结果写入表中
V_SYNC_TIME := SYSDATE;
execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,V_SQL,V_SYNC_TIME,v_orderno;
v_orderno :=v_orderno+1;
V_SQL := NULL;
END IF;
END LOOP;
--------------------------------------------------------------------------------------------------------------------
----------------------结束执行动态游标----------------------------------------------------------------------
---------------------------END--------------------------------------------------------------------------------------
MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (SELECT V_DataSyncCfg.TABLE_NAME TABLE_NAME FROM DUAL) T1
ON (T.TABLE_NAME=T1.TABLE_NAME)
WHEN MATCHED THEN
UPDATE SET END_TIME=V_END_DATE,BEGIN_TIME=V_BEGIN_DATE,UPDATE_TIME=SYSDATE
WHEN NOT MATCHED THEN
INSERT VALUES(V_DataSyncCfg.TABLE_NAME,'','',V_BEGIN_DATE,V_END_DATE,SYSDATE);
END;
--------------------------------------------------------------------------------------------------------------------
----------------------不区分机组的时间标签结束----------------------------------------------------------
----------------------------END-------------------------------------------------------------------------------
-------------------------------BEGIN-------------------------------------------------------------------------------
----------------------时间标签区分机组开始----------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
--时间标签区分机组,这种是出现表中没有UPDATE_TIME的情况下
ELSE
BEGIN
SELECT FACTORY_CODE, SET_CODE BULK COLLECT INTO V_FACTORY_CODE_LIST, V_SET_CODE_LIST FROM V_PUB_SET WHERE IS_VIRTUAL<>1 ORDER BY FACTORY_CODE, SET_CODE desc;
----------------------------BEGIN-----------------------------------------------------------------------------------
----------------------循环机组列表开始----------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
--循环机组列表
FOR I IN 1..V_SET_CODE_LIST.COUNT LOOP
--获取开始时间
BEGIN
SELECT END_TIME INTO V_BEGIN_DATE FROM TB_PUB_SYNC_TIMESEG_TAG WHERE TABLE_NAME=V_DataSyncCfg.TABLE_NAME AND SET_CODE=V_SET_CODE_LIST(I) AND FACTORY_CODE=V_FACTORY_CODE_LIST(I);
EXCEPTION WHEN NO_DATA_FOUND THEN
V_BEGIN_DATE := SYSDATE-365;
WHEN OTHERS THEN
GOTO LABEL_NEXT_SET;
END;
--如果时间类型为系统时间
IF V_DataSyncCfg.SYNCTIME_TYPE=0 THEN
V_END_DATE := SYSDATE;
--如果时间类型为数据时间
ELSE
BEGIN
EXECUTE IMMEDIATE 'SELECT MAX('||V_DataSyncCfg.COMPARE_DATE_COL||') FROM '||V_DataSyncCfg.TABLE_NAME||' WHERE SET_CODE='||V_SET_CODE_LIST(I)||'
AND FACTORY_CODE='''||V_FACTORY_CODE_LIST(I)||'''' INTO V_END_DATE;
EXCEPTION WHEN OTHERS THEN
GOTO LABEL_NEXT_SET;
END;
END IF;
IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE THEN
GOTO LABEL_NEXT_SET;
END IF;
V_TEMP_SQL :=V_DataSyncCfg.data_xmltemplate;
V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{BEGIN_DATE}','to_date('''||to_char(V_BEGIN_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{END_DATE}','to_date('''||to_char(V_END_DATE,'yyyy-mm-dd hh24:mi:ss')||''',''yyyy-mm-dd hh24:mi:ss'')');
V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{FACTORY_CODE}',''''||V_FACTORY_CODE_LIST(I)||'''');
V_TEMP_SQL :=REPLACE(V_TEMP_SQL,'{SET_CODE}',''''||V_SET_CODE_LIST(I)||'''');
--dbms_output.put_line(V_TEMP_SQL);
-------------------------------BEGIN--------------------------------------------------------------------------------
----------------------开始执行动态游标----------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
--开始执行动态游标
OPEN CUR_SQL FOR V_TEMP_SQL;
LOOP
EXIT WHEN CUR_SQL%NOTFOUND;
FETCH CUR_SQL INTO V_SQL;
IF V_SQL IS NOT NULL THEN
--将结果写入表中
V_SYNC_TIME := SYSDATE;
execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,V_SQL,V_SYNC_TIME,v_orderno;
v_orderno :=v_orderno+1;
V_SQL := NULL;
END IF;
END LOOP;
--------------------------------------------------------------------------------------------------------------------
----------------------结束执行动态游标----------------------------------------------------------------------
-----------------------------END------------------------------------------------------------------------------------
MERGE INTO TB_PUB_SYNC_TIMESEG_TAG T USING (SELECT V_DataSyncCfg.TABLE_NAME TABLE_NAME,V_FACTORY_CODE_LIST(I) FACTORY_CODE,V_SET_CODE_LIST(I) SET_CODE FROM DUAL) T1
ON (T.TABLE_NAME=T1.TABLE_NAME AND T.FACTORY_CODE=T1.FACTORY_CODE AND T.SET_CODE=T1.SET_CODE)
WHEN MATCHED THEN
UPDATE SET END_TIME=V_END_DATE,BEGIN_TIME=V_BEGIN_DATE,UPDATE_TIME=SYSDATE
WHEN NOT MATCHED THEN
INSERT VALUES(V_DataSyncCfg.TABLE_NAME,V_FACTORY_CODE_LIST(I),V_SET_CODE_LIST(I),V_BEGIN_DATE,V_END_DATE,SYSDATE);
--每个机组提交一次
<<LABEL_NEXT_SET>>
NULL;
END LOOP;
--------------------------------------------------------------------------------------------------------------------
----------------------循环机组列表结束----------------------------------------------------------------------
-----------------------------END-----------------------------------------------------------------------------------
END;
END IF;
--------------------------------------------------------------------------------------------------------------------
----------------------时间标签区分机组结束----------------------------------------------------------
------------------------------END-----------------------------------------------------------------------------------
--插入表接点结束符
execute immediate v_commsql using V_DataSyncCfg.TABLE_NAME,'</'||V_DataSyncCfg.TABLE_NAME||'>',sysdate,v_orderno;
v_orderno:=v_orderno+1;
--如果是手动同步,则同步一次即修改同步标识,标识为不同步
IF V_DataSyncCfg.SYNC_TYPE=0 AND V_DataSyncCfg.IS_SYNC=1 THEN
UPDATE TB_PUB_SYNC_DATA_CFG_NEW SET IS_SYNC=0 WHERE upper(TABLE_NAME)=upper(V_DataSyncCfg.TABLE_NAME);
END IF;
<<LABEL_NEXT_TABLE_SYNC>>
null;
END LOOP;
--插入根节点结束符
execute immediate v_commsql using '','</XMLROOT>',sysdate,v_orderno;
--更新数据同步状态为‘1’已完成,这时后台服务可以调用
execute immediate 'update TB_PUB_SYNC_TIMESEG_TAG set factory_code=1,set_code=1,begin_time=sysdate,end_time=sysdate,update_time=sysdate where table_name=''SYNCSTATUSTAG''';
COMMIT;
EXCEPTION WHEN OTHERS THEN
ROLLBACK;
END P_PUB_JOB_DATA_SYNC_New;
上面列出的就是数据库的两个核心存储过程,使用job定时去执行P_PUB_JOB_DATA_SYNC_New
然后使用查询是,用tb_pub_sync_data_sql表中的order_no字段 排序,这样能保证XML的完整性
3)使用后台服务定时读取tb_pub_sync_data_sql中的数据去生成文件,生成的文件格式如下:
<?xml version="1.0" encoding="utf-8" ?>
<XMLROOT>
--该节点保存表的插入和删除模版以及更新方式
<表名 InsertTemplate="" DelteTemplate="" ExecType="" Order_No="">
--该节点保存,删除时需要用那些字段来做条件,多个字段用逗号隔开,该节点每张表只会写一次
<表名DelCol DeleteCol="col1,col2,col3"></表名DelCol DelCol>
--该节点保存表中各字段的数据类型,用于程序进行数据转换,该节点每张表只会写一次
<表名ColType col1="Varchar2" col2="Number" col3="Varchar2"></表名ColType>
--该节点用于保存数据,一条数据对应一个节点
<表名DataRow col1="12" col2="23" col3="45"></表名DataRow>
</表名>
</XMLROOT>
3)将文件压缩打包,然后通过传输服务将文件传送到Ⅲ区服务器指定的目录
4)Ⅲ区服务器上的数据同步服务,读取指定目录下的XML文件进行解析,转换成目标格式,下面贴出主要的代码
代码如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using DataSyncSvr.DataEntity;
using Oracle.DataAccess.Client;
using DataSyncSvr.DataParse;
using Publish.Log;
using System.IO;
using System.Data;
namespace DataSyncSvr
{
public class DataSyncMain
{
public ILogger mlog;
private OracleConnection conn;
public void DataSyncStart()
{
ParseXmlToDataEntity ParseEntity = new ParseXmlToDataEntity();
Dictionary<string, Dictionary<string, DataSyncDataEntity>> FileList = ParseEntity.GetDataEnetityList();
Dictionary<string, DataSyncDataEntity> EntityDataList;
conn = new OracleConnection(SyncConst.Connstr);
if (conn.State == ConnectionState.Closed)
{
conn.Open();
}
DataSyncDataEntity EntityData;
if (FileList == null || FileList.Keys.Count < 1) return;
foreach(string filekey in FileList.Keys)
{
EntityDataList=FileList[filekey];
foreach (string key in EntityDataList.Keys)
{
try
{
EntityData = EntityDataList[key];
mlog.LogInfo("开始同步表:\"" + key + "\"");
if (EntityData.ExecuteType == ExecType.DeleteAndInsert || EntityData.ExecuteType == ExecType.TruncateAndInsert)
{
//先执行删除操作
AddExecParamAndRun(EntityData, ExecType.Delete);
//再执行插入操作
AddExecParamAndRun(EntityData, ExecType.Insert);
}
else
{
AddExecParamAndRun(EntityData, ExecType.Insert);
}
mlog.LogInfo("\"" + key + "\"表数据同步完成,受影响行数:(" + EntityData.RecordCount + ")行");
}
catch (Exception ex)
{
mlog.LogInfo(ex.ToString());
break;
}
}
//删除文件
if (File.Exists(filekey))
{
File.Delete(filekey);
mlog.LogInfo("文件:\"" + filekey + "\"已删除");
}
}
conn.Close();
}
private void AddExecParamAndRun(DataSyncDataEntity EntityData, ExecType excutetype)
{
OracleParameter Param;
string[] columns = EntityData.InsertCol;
string ExecTemplate = EntityData.InsertTemplate;
if (excutetype.Equals(ExecType.Delete))
{
columns = EntityData.DeleteCol;
ExecTemplate = EntityData.DeleteTemplate;
}
OracleCommand command = new OracleCommand();
command.Connection = conn;
command.CommandText = ExecTemplate;
command.ArrayBindCount = EntityData.TableData[EntityData.InsertCol[0]].Length;
command.CommandTimeout = 600;
if (columns != null && columns.Length > 0)
{
foreach (string colname in columns)
{
if (!string.IsNullOrEmpty(colname))
{
Param = new OracleParameter(colname, EntityData.DataType[colname]);
Param.Direction = ParameterDirection.Input;
Param.Value = EntityData.TableData[colname];
command.Parameters.Add(Param);
}
}
}
command.ExecuteNonQuery();
}
}
}
注意:使用了Oracle.DataAccess.dll动态库,这个可以去官网下载。
写了这么久也累了,下面就贴上我上传的源码地址吧,http://download.csdn.net/detail/yanxianbin1989/5970217 欢迎各位高手前来交流,相互学习,共同进步。
(责任编辑:IT)
近期为了满足客户的(××电网公司)需求,先说下他们的需求,需求如下: 1.实现Ⅱ区、Ⅲ区数据库的同步,其中Ⅱ区是主数据库,Ⅲ区是需要同步的数据库。 2.两台数据库服务器之间是不能直接通讯的,因为Ⅱ、Ⅲ区之间安装了隔离装置,只能通过端口访问。 3.同步需要保证实时性,数据都是秒级的,一分钟下来至少是2万条数据。
看到这个需求我的第一反应估计跟大家是一样的,就是建立外连接,然后使用merger函数去定时同步,效果是最好的,但最终只能想想了,因为两台服务器根本不能通讯的。 上面的方法行不通了就得换一种方式了,后面经过和同事们讨论定下了一个方案,思路如下: 1.使用存储过程生成更新的或者插入的SQL语句,保存到表中; 2.使用后台服务定期读取表中已经生成好的SQL语句做成bat文件并压缩打包; 3.使用传输服务将Ⅱ区的文件传输到Ⅲ区指定服务器并解压到指定的目录; 4.使用Ⅲ区的同步服务调用bat文件进行数据插入 思路是没问题的,基本功能都实现了,但是问题就处在效率提不上去,同步2万数据需要一分钟以上,这显然是不符合需求的。
行不通就只能换一个方案了,不使用批处理来同步了,先代码来实现同步; 详细思路如下: 1.同样是使用存储过程生成指定格式的字符串(XML格式)保存到数据库中,代码如下: 1)既然是指定格式的话,肯定要模版才行,下面的代码是根据配置生成指定格式的模版
create or replace procedure P_pub_SyncData_InitTemplate is
--生成查询条件模版
2)根据生成的模版去生成数据,代码如下:
create or replace procedure P_PUB_JOB_DATA_SYNC_New
begin
IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE THEN
V_TEMP_SQL :=V_DataSyncCfg.data_xmltemplate;
-------------------------------BEGIN-------------------------------------------------------------------------------
----------------------------BEGIN-----------------------------------------------------------------------------------
IF V_END_DATE IS NULL OR V_END_DATE<=V_BEGIN_DATE THEN
V_TEMP_SQL :=V_DataSyncCfg.data_xmltemplate;
--dbms_output.put_line(V_TEMP_SQL);
END;
--插入表接点结束符
上面列出的就是数据库的两个核心存储过程,使用job定时去执行P_PUB_JOB_DATA_SYNC_New 然后使用查询是,用tb_pub_sync_data_sql表中的order_no字段 排序,这样能保证XML的完整性 3)使用后台服务定时读取tb_pub_sync_data_sql中的数据去生成文件,生成的文件格式如下: <?xml version="1.0" encoding="utf-8" ?> <XMLROOT> --该节点保存表的插入和删除模版以及更新方式 <表名 InsertTemplate="" DelteTemplate="" ExecType="" Order_No=""> --该节点保存,删除时需要用那些字段来做条件,多个字段用逗号隔开,该节点每张表只会写一次 <表名DelCol DeleteCol="col1,col2,col3"></表名DelCol DelCol> --该节点保存表中各字段的数据类型,用于程序进行数据转换,该节点每张表只会写一次 <表名ColType col1="Varchar2" col2="Number" col3="Varchar2"></表名ColType> --该节点用于保存数据,一条数据对应一个节点 <表名DataRow col1="12" col2="23" col3="45"></表名DataRow> </表名> </XMLROOT> 3)将文件压缩打包,然后通过传输服务将文件传送到Ⅲ区服务器指定的目录 4)Ⅲ区服务器上的数据同步服务,读取指定目录下的XML文件进行解析,转换成目标格式,下面贴出主要的代码 代码如下:
using System;
namespace DataSyncSvr 注意:使用了Oracle.DataAccess.dll动态库,这个可以去官网下载。 写了这么久也累了,下面就贴上我上传的源码地址吧,http://download.csdn.net/detail/yanxianbin1989/5970217 欢迎各位高手前来交流,相互学习,共同进步。 (责任编辑:IT) |