sqlserver创建SSIS包,以增量抽为例,做数据分区

sqlserver创建SSIS包,以增量抽为例,

1.增量抽取前的准备工作

(1)增量抽取之前,先全量抽取数据,进行全部数据初始化

(2)创建表

2个表的字段结构一样

(3)创建分区-自动按月生成分区

2.创建SSIS包

3.创建控制流

(1)清空buff表 trancate buff table

(2)执行数据流任务

源-数据转换-目标,此数据流任务详见4.(1)

从源抽取最近2个月的数据回写到BUFF表

(3)执行存储过程exec proc

自动创建分区

begin

trun_part_into('info_center','kb_v_dm_ls_shop');

end;

(4)执行数据流任务1

源-数据转换-目标,此数据流任务详见4.(2)

数据回写到表ls表

4.创建数据流任务

(1)数据流任务, 3.(2)执行的

从源抽取最近2个月的数据回写到BUFF表

源-数据转换-目标

SELECT SHOP_CD,RLB_NUM,FIN_DATE,PROD_CD,COLOR_CD,SPEC,CUST_ID,CUST_FLAG,DJ_SS_FLAG,LS_QTY,LS_SS_AMT FROM dbo.DM_LS_SHOP T WHERE t.fin_date IS NOT NULL

AND T.fin_date >= convert(varchar(10),DATEADD(MM,-1,dateadd(day,-day(getdate())+1,getdate())),120)

AND T.fin_date <CONVERT(VARCHAR(10),GETDATE(),120)

(2)数据流任务2, 3.(4)执行的,取buff表回写ls表

注:存储过程 3.(3) trun_part_into('info_center','kb_v_dm_ls_shop')

create or replace procedure trun_part_into(v_schema_name varchar2,v_table_name varchar2) is

schema_name varchar2(50):=upper(v_schema_name);

table_name varchar2(50):=upper(v_table_name);

v_sql varchar2(200);

v_partition_name varchar2(50);

begin

declare

cursor c_job is select partition_name,high_value from (SELECT table_name, partition_name, partition_position,high_value

FROM USER_TAB_PARTITIONS where table_name = 'KB_V_DM_LS_SHOP' order by partition_position desc) where rownum <= 2;

--定义一个游标变量v_cinfo c_emp%ROWTYPE ,该类型为游标c_emp中的一行数据类型

c_row c_job%rowtype;

begin

for c_row in c_job loop

--dbms_output.put_line(c_row.shop_cd||'-'||c_row.rlb_num);

execute immediate 'select '|| c_row.high_value || 'from dual';

--dbms_output.put_line(c_row.partition_name);

--dbms_output.put_line(c_row.high_value);

--dbms_output.put_line(substr(c_row.high_value,9,12)||'''');

dbms_output.put_line(substr(c_row.high_value,11,10));

if (substr(c_row.high_value,11,10) = to_char(LAST_DAY(ADD_MONTHS(SYSDATE, -1))+1 ,'YYYY-MM-DD')) then

dbms_output.put_line(c_row.partition_name);

v_partition_name:=c_row.partition_name;

v_sql:='alter table '||schema_name||'.'||table_name||' drop partition '||v_partition_name||' update global indexes';

execute immediate v_sql;

commit;

elsif (substr(c_row.high_value,11,10) = to_char(LAST_DAY(ADD_MONTHS(SYSDATE,0))+1 ,'YYYY-MM-DD')) then

dbms_output.put_line(c_row.partition_name);

v_partition_name:=c_row.partition_name;

v_sql:='alter table '||schema_name||'.'||table_name||' drop partition '||v_partition_name||' update global indexes';

execute immediate v_sql;

commit;

end if;

end loop;

end;

end trun_part_into;

编辑于 2022-03-11 11:38