无锡百度正规代理商:再谈ETL之一(原)

来源:百度文库 编辑:中财网 时间:2024/05/07 06:55:57
再谈ETL之一(原) (2009-12-18 15:12) 分类: Data Warehouse

 ETL是DW的数据准备的前期过程,将花费DW生命周期的大约70%的时间,下面我再结合ORACLE BI谈一谈ETL的过程。一:Extraction
   1)直接提取
   2)增量提取:
 a)基于timestamp:看看下面的例子就明白了:得到订单生成当天的所有订单
  SELECT * FROM orders
  WHERE TRUNC(CAST(order_date AS date),'dd') =
    TO_DATE(SYSDATE,'dd-mon-yyyy');
 b)基于PARTITION:比如说:如果源表基于时间的WEEK做了PARTITION,那么很容易查询到某   一周的所有数据。比如:SELECT * FROM orders PARTITION (orders_jan1998);
 c)基于TRIGER:
       使用触发器,处理DML操作以后数据可以同时操作到目标数据库中。
 d)使用ORACLE CHANGE DATA CAPTURE.
 我比较推荐下面的方式提取
 CREATE TABLE country_city AS SELECT distinct t1.country_name, t2.cust_city
      FROM countries@source_db t1, customers@source_db t2
      WHERE t1.country_id = t2.country_id
      AND t1.country_name='United States of America';
 通过SQL语句和DBLINK的方式直接提取,同时也建立的表格。减少了繁琐的传输过程。  二:Transporation (如果通过分布式查询可以跳过此步)
    有3种传输的类型:
     A source system to a staging database or a data warehouse database
     A staging database to a data warehouse
     A data warehouse to a data mart
    以下传输方式:
 1)直接使用FLAT FILE 然后FTP。
 2)通过分布式查询,可以提取和传输一步到位,但是只适合ORACLE数据库之间,而且要成功配置DBLINK。
 3)Transportable tablespaces
    此方式是最快的在两个ORACLE之间传输大量数据的方式。但是有很多限制。  三:Transformations (转化过程可以使用SQL或PL/SQL实现,本文只是谈谈使用SQL实现的方式,使用PL/SQL实现将在后面的 文章中提到)
 一般有下列两种方式:
 Mutistage Transformation:
    这种方式每一步转化都生成一个临时表,转化过程用SQL或PL/SQL实现,但是比较消耗空间和时间。
 适合Transformation的SQL技巧有很多,最常用的有如下:
 1)CREATE TABLE ... AS SELECT
 2)INSERT /*+APPEND*/ ... AS SELECT
  INSERT /*+ APPEND NOLOGGING PARALLEL */ INTO sales
  SELECT product_id, customer_id, TRUNC(sales_date), 3,
       promotion_id, quantity, amount
  FROM sales_activity_direct;
 3) MERGE
  MERGE INTO products t USING products_delta s
  ON (t.prod_id=s.prod_id)
  WHEN MATCHED THEN UPDATE SET
    t.prod_list_price=s.prod_list_price, t.prod_min_price=s.prod_min_price
  WHEN NOT MATCHED THEN INSERT (prod_id, prod_name, prod_desc, prod_subcategory,
    prod_subcategory_desc, prod_category, prod_category_desc, prod_status,
    prod_list_price, prod_min_price)
  VALUES (s.prod_id, s.prod_name, s.prod_desc, s.prod_subcategory,
    s.prod_subcategory_desc, s.prod_category, s.prod_category_desc,
    s.prod_status, s.prod_list_price, s.prod_min_price);
 4) Unconditional Insert(注意insert到sales和costs的所有的值都要在后面的SELECT中查询出来,而且别   名要和insert的字段名称一致)
  insert all :对于每一行数据,对每一个when条件都进行检查,如果满足条件就执行插入操作。
  INSERT ALL
    INTO sales VALUES (product_id, customer_id, today, 3, promotion_id,
                       quantity_per_day, amount_per_day)
    INTO costs VALUES (product_id, today, promotion_id, 3,
                       product_cost, product_price)
  SELECT TRUNC(s.sales_date) AS today, s.product_id, s.customer_id,
    s.promotion_id, SUM(s.amount) AS amount_per_day, SUM(s.quantity)
    quantity_per_day, p.prod_min_price*0.8 AS product_cost, p.prod_list_price
    AS product_price
  FROM sales_activity_direct s, products p
  WHERE s.product_id = p.prod_id AND TRUNC(sales_date) = TRUNC(SYSDATE)
  GROUP BY TRUNC(sales_date), s.product_id, s.customer_id, s.promotion_id,
    p.prod_min_price*0.8, p.prod_list_price;
 5) Conditional ALL Insert(利用insert all使得INSERT语句可以同时插入多张表,还可以根据判断条件来决定  每条记录插入到哪张或哪几张表中。)
    insert all :对于每一行数据,对每一个when条件都进行检查,如果满足条件就执行插入操作。
  INSERT ALL
  WHEN promotion_id IN (SELECT promo_id FROM promotions) THEN
     INTO sales VALUES (product_id, customer_id, today, 3, promotion_id,
                         quantity_per_day, amount_per_day)
     INTO costs VALUES (product_id, today, promotion_id, 3,
                        product_cost, product_price)
  WHEN num_of_orders > 1 THEN
     INTO cum_sales_activity VALUES (today, product_id, customer_id,
       promotion_id, quantity_per_day, amount_per_day, num_of_orders)
  SELECT TRUNC(s.sales_date) AS today, s.product_id, s.customer_id,
     s.promotion_id, SUM(s.amount) AS amount_per_day, SUM(s.quantity)
     quantity_per_day, COUNT(*) num_of_orders, p.prod_min_price*0.8
     AS product_cost, p.prod_list_price AS product_price
  FROM sales_activity_direct s, products p
  WHERE s.product_id = p.prod_id
  AND TRUNC(sales_date) = TRUNC(SYSDATE)
  GROUP BY TRUNC(sales_date), s.product_id, s.customer_id,
   s.promotion_id, p.prod_min_price*0.8, p.prod_list_price;
  6)Conditional FIRST Insert(insert first:对于每一行数据,只插入到第一个when条件成立的表,不继续检查  其他条件。)
  INSERT FIRST WHEN (sum_quantity_sold > 10 AND prod_weight_class < 5) AND
  sum_quantity_sold >=1) OR (sum_quantity_sold > 5 AND prod_weight_class > 5) THEN
    INTO large_freight_shipping VALUES
        (time_id, cust_id, prod_id, prod_weight_class, sum_quantity_sold)
    WHEN sum_amount_sold > 1000 AND sum_quantity_sold >=1 THEN
    INTO express_shipping VALUES
        (time_id, cust_id, prod_id, prod_weight_class,
         sum_amount_sold, sum_quantity_sold)
  WHEN (sum_quantity_sold >=1) THEN INTO default_shipping VALUES
        (time_id, cust_id, prod_id, sum_quantity_sold)
  ELSE INTO incorrect_sales_order VALUES (time_id, cust_id, prod_id)
  SELECT s.time_id, s.cust_id, s.prod_id, p.prod_weight_class,
         SUM(amount_sold) AS sum_amount_sold,
         SUM(quantity_sold) AS sum_quantity_sold
  FROM sales s, products p
  WHERE s.prod_id = p.prod_id AND s.time_id = TRUNC(SYSDATE)
  GROUP BY s.time_id, s.cust_id, s.prod_id, p.prod_weight_class;
 但是在使用Insert All语句时要注意有如下限制:
  多表插入语句的限制条件
  只能对表执行多表插入语句,不能对视图或物化视图执行;
  不能对远端表执行多表插入语句;
  不能使用表集合表达式;
  不能超过999个目标列;
  在RAC环境中或目标表是索引组织表或目标表上建有BITMAP索引时,多表插入语句不能并行执行;
  多表插入语句不支持执行计划稳定性;
  多表插入语句中的子查询不能使用序列。  
四:LOAD
 LOAD的过程是把stageing area中的flat文件导入到ROLAP模型的DW中的过程。
 1)Oracle Sql Loader是一个选择。例如:
 先写控制文件:sh_sales.ctl
 LOAD DATA INFILE sh_sales.dat APPEND INTO TABLE sales
 FIELDS TERMINATED BY "|"
 (PROD_ID, CUST_ID, TIME_ID, CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD)
 运用下列命令执行:
 $  sqlldr sh/sh control=sh_sales.ctl direct=true  2)External Table
 先建立外部表,例如:
 CREATE TABLE sales_transactions_ext
 (PROD_ID NUMBER, CUST_ID NUMBER,
  TIME_ID DATE, CHANNEL_ID NUMBER,
  PROMO_ID NUMBER, QUANTITY_SOLD NUMBER,
  AMOUNT_SOLD NUMBER(10,2), UNIT_COST NUMBER(10,2),
  UNIT_PRICE NUMBER(10,2))
 ORGANIZATION external (TYPE oracle_loader
   DEFAULT DIRECTORY data_file_dir ACCESS PARAMETERS
   (RECORDS DELIMITED BY NEWLINE CHARACTERSET US7ASCII
     BADFILE log_file_dir:'sh_sales.bad_xt'
     LOGFILE log_file_dir:'sh_sales.log_xt'
     FIELDS TERMINATED BY "|" LDRTRIM
     ( PROD_ID, CUST_ID,
       TIME_ID         DATE(10) "YYYY-MM-DD",
       CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD,
       UNIT_COST, UNIT_PRICE))
   location ('sh_sales.dat')
 )REJECT LIMIT UNLIMITED;
 然后直接把数据LOAD到事实表中:
 INSERT /*+ APPEND */ INTO COSTS
 (TIME_ID, PROD_ID, UNIT_COST, UNIT_PRICE)
 SELECT TIME_ID, PROD_ID, AVG(UNIT_COST), AVG(amount_sold/quantity_sold)
 FROM sales_transactions_ext GROUP BY time_id, prod_id;