sqlserver创建SSIS包,以增量抽为例,做数据分区
sqlserver创建SSIS包,以增量抽为例,
1.增量抽取前的准备工作
(1)增量抽取之前,先全量抽取数据,进行全部数据初始化
(2)创建表
2个表的字段结构一样
(3)创建分区-自动按月生成分区
2.创建SSIS包
3.创建控制流
(1)清空buff表 trancate buff table
(2)执行数据流任务
源-数据转换-目标,此数据流任务详见4.(1)
从源抽取最近2个月的数据回写到BUFF表
(3)执行存储过程exec proc
自动创建分区
begin
trun_part_into('info_center','kb_v_dm_ls_shop');
end;
(4)执行数据流任务1
源-数据转换-目标,此数据流任务详见4.(2)
数据回写到表ls表
4.创建数据流任务
(1)数据流任务, 3.(2)执行的
从源抽取最近2个月的数据回写到BUFF表
源-数据转换-目标
SELECT SHOP_CD,RLB_NUM,FIN_DATE,PROD_CD,COLOR_CD,SPEC,CUST_ID,CUST_FLAG,DJ_SS_FLAG,LS_QTY,LS_SS_AMT FROM dbo.DM_LS_SHOP T WHERE t.fin_date IS NOT NULL
AND T.fin_date >= convert(varchar(10),DATEADD(MM,-1,dateadd(day,-day(getdate())+1,getdate())),120)
AND T.fin_date <CONVERT(VARCHAR(10),GETDATE(),120)
(2)数据流任务2, 3.(4)执行的,取buff表回写ls表
注:存储过程 3.(3) trun_part_into('info_center','kb_v_dm_ls_shop')
create or replace procedure trun_part_into(v_schema_name varchar2,v_table_name varchar2) is
schema_name varchar2(50):=upper(v_schema_name);
table_name varchar2(50):=upper(v_table_name);
v_sql varchar2(200);
v_partition_name varchar2(50);
begin
declare
cursor c_job is select partition_name,high_value from (SELECT table_name, partition_name, partition_position,high_value
FROM USER_TAB_PARTITIONS where table_name = 'KB_V_DM_LS_SHOP' order by partition_position desc) where rownum <= 2;
--定义一个游标变量v_cinfo c_emp%ROWTYPE ,该类型为游标c_emp中的一行数据类型
c_row c_job%rowtype;
begin
for c_row in c_job loop
--dbms_output.put_line(c_row.shop_cd||'-'||c_row.rlb_num);
execute immediate 'select '|| c_row.high_value || 'from dual';
--dbms_output.put_line(c_row.partition_name);
--dbms_output.put_line(c_row.high_value);
--dbms_output.put_line(substr(c_row.high_value,9,12)||'''');
dbms_output.put_line(substr(c_row.high_value,11,10));
if (substr(c_row.high_value,11,10) = to_char(LAST_DAY(ADD_MONTHS(SYSDATE, -1))+1 ,'YYYY-MM-DD')) then
dbms_output.put_line(c_row.partition_name);
v_partition_name:=c_row.partition_name;
v_sql:='alter table '||schema_name||'.'||table_name||' drop partition '||v_partition_name||' update global indexes';
execute immediate v_sql;
commit;
elsif (substr(c_row.high_value,11,10) = to_char(LAST_DAY(ADD_MONTHS(SYSDATE,0))+1 ,'YYYY-MM-DD')) then
dbms_output.put_line(c_row.partition_name);
v_partition_name:=c_row.partition_name;
v_sql:='alter table '||schema_name||'.'||table_name||' drop partition '||v_partition_name||' update global indexes';
execute immediate v_sql;
commit;
end if;
end loop;
end;
end trun_part_into;