大数据与机器学习:实践方法与行业案例
上QQ阅读APP看书,第一时间看更新

第2章
数据体系

迟序之数,非出神怪,有形可检,有数可推。事类相推,各有攸归,故枝条虽分而同本干知,发其一端而已。又所析理以辞,解体用图,庶亦约而能周,通而不黩,览之者思过半矣。

——刘徽《九章算术注》

管理就是决策。

——赫伯特·西蒙


数据存在于生产环境、数据缓冲区以及分析环境的各个节点中,并且由各种技术手段支撑着数据的存储和计算。通常,在企业中,生产环境由开发部门负责,而分析环境和数据缓冲区则由数据部门负责,物理环境分离以及管理上的隔离会让人们产生一种错觉:数据是数据部门的事情,应用系统是开发部门的事情。这对数据的应用是非常不利的。

我们应该试图从更高层次上来对待数据,要打破管理和认知上的壁垒,就要让数据像金融系统中的资本那样运转起来。隔离的、静止的数据是乏味的,就如货币一样,需要流动才能增值。

数据的流动伴随着形态的变化(回忆数据的三种形态:生产数据、原始数据、分析数据),我们知道数据最终仍然要回归于生产系统(从生产中来,到生产中去),一切离开了生产应用的数据分析和处理都是徒劳无益的。

因此,要构建一个健康的数据体系,这个体系要像货币流通系统那样能够循环和增值,这是本章将要讨论的主题。

2.1 数据闭环

基于数据流动的理念,我们想象一个完美的数据闭环:数据在三种形态之间的循环转换,从生产系统产生,经过整个闭环后,最终仍然应用于生产系统。在这个数据闭环中,数据形态的每次转化,都伴随着数据的相应增值,如图2-1所示。

图2-1 数据闭环

处于数据闭环中的分析环境不再是“数据坟墓”,而是成为闭环中的一个节点。构建数据闭环的目的是让数据自动循环下去,为数据注入动力,避免数据“沉积”下来埋入坟墓,一个完善的数据闭环具有表2-1中的特征。

表2-1 数据闭环的基本特征

数据闭环成功的关键在于松耦合、易扩展,设立数据缓冲区可以以极低的成本达成这一目标。所有需要数据交互的系统,都要先将数据存储在数据缓冲区中,然后从数据缓冲区中选择需要的数据进行加载,这既避免了多个系统之间的直接耦合,同时也具备了易扩展的特性,新的系统只需要按照数据缓冲区的格式要求将数据存储在数据缓冲区中即可。

通过定制化的数据对接系统,实现数据的自动识别、加载,并结合周期性的ETL作业和调度工具,可以实现数据缓冲区数据的自动出入,是数据闭环中的数据“自动”流转。

此外,通过BI工具和日志收集监控工具,可实现整个数据闭环的可视化监控,并可以通过短信、邮件进行预警,这为数据闭环的持续健康运行提供了保障。

下面将从数据缓冲区、ETL作业、监控预警等方面进行介绍。

2.2 数据缓冲区

数据缓冲区是处于生产环境和分析环境之间的中间区域,它是数据闭环中各个系统间的数据中转站,从各个系统接收原始数据,并将其暂存在对应的目录中。其他系统可以从数据缓冲区中获取需要的数据文件。

为了便于管理和迁移数据,我们规定存入数据缓冲区中的数据使用文本文件的格式,这样一来,数据缓冲区就可以使用一台或几台文件服务器实现。几乎所有的应用系统都支持文本文件的数据交互,新的系统可以轻松加入数据闭环之中。

数据缓冲区的一端连接生产环境中的大量应用系统,另一端连接分析环境中的数据平台,避免了生产环境和分析环境的相互影响,同时也为系统之间数据文件的交互制定了统一标准(见图2-2)。

图2-2 数据缓冲区连接生产环境和分析环境

数据缓冲区的另一个优势在于方便自动化和数据管理,多个应用系统的文件存档在同一个文件服务器中,便于数据的统一管理和分发。比如,在一个多部门、跨地域的企业中,不同地区、不同部门的数据文件之间的交互,如果没有数据缓冲区的统一收集与分发,那将会形成一个复杂的交叉网络。

表2-2列举了数据缓冲区的一些主要优点,本书主要专注于系统解耦,并基于数据缓冲区完成原始数据的自动加载过程。

表2-2 数据缓冲区的优点

2.2.1 系统解耦

1.数据直连交互

数据直连的方式是将原始数据从生产系统直接导出至分析系统,是数据交互的紧耦合方式。这种方式在数据规模较小时并不会出现问题,因此很多企业搭建数据体系时采用了数据直连的方式,但随着数据规模的增大,数据直连的弊端会逐步展现,让我们看一下如下的场景:

某个星期一的早晨,数据工程师小王走入办公室,发现开发工程师小李、系统工程师小周、开发经理和产品经理正聚在一起商讨问题。

开发工程师小李:我昨晚上线完成后,业务人员验证通过,当时系统没有任何问题的,程序肯定没问题!

产品经理:可是现在系统反应奇慢无比,基本处于瘫痪状态,很多业务人员都等着开工呢,怎么办?

系统工程师小周:你自己看,数据库服务器磁盘I/O好大……这种情况之前可是没有的……这个数据库进程是怎么回事?磁盘I/O就是被它拖垮的。

开发经理(一脸黑线):赶紧查一查,看看谁干的!

系统工程师小周:这好像是分析数据库在抽数据。……小王你刚好来了,你看这个作业是你的吧?

数据工程师小王(紧张中):这个我看看……平时都是20分钟就抽取结束了啊!今天怎么还没有完成?怎么回事?

开发经理:那先赶紧停下来,解决了生产问题再说。小王,最近数据这块怎么老出问题啊!

数据工程师小王(委屈):ETL作业跑了半年多了,都没问题。开发昨晚上线,今天就出问题了……

开发工程师小李(打断小王):上线前都经过测试了,上线后也验证了,没有问题的。现在是数据库的问题,和系统没有关系好吧?

数据工程师小王:那为什么上线后,ETL作业就这么慢呢?!也不能怪我啊……

上面的场景是数据直连方式经常会遇到的问题,这种问题可能在生产系统上线后突然出现,也可能在平常的日子里莫名奇妙地发生。由于生产系统和分析系统之间的紧耦合,一旦出现问题,生产系统和分析系统都可能受到影响,而问题产生的原因却很难查清。表2-3总结了数据直连的弊端。

表2-3 数据直连的弊端

2.数据缓冲区交互

前面已经论述使用数据缓冲区进行交互的优点,本节将进一步研究数据缓冲区进行数据交互的详细流程,图2-3是整个过程的示意图。

图2-3 数据缓冲区进行数据交互的流程

在这个过程中,数据由生产环境流入分析环境,共经过以下四个步骤。

1)批量导出。数据从生产数据库批量导出为文本文件,该过程使用DBMS系统自带的批量导出命令,对于大数据平台数据库,使用对应的命令或者使用第三方插件。2.2.2节将对批量导出命令进行详细介绍。

2)FTP传输第一阶段。将步骤1)导出的数据文件通过FTP上传至数据缓冲区。步骤1)和步骤2)的自动化过程可以通过ETL定时作业完成,实现方法参阅ETL作业章节。

3)FTP传输第二阶段。将数据文件从数据缓冲区下载至分析环境中。

4)批量加载。使用批量导入命令将文件加载至数据平台。

这种方式解决了职责不清的问题。从图2-3中可以发现,步骤1)和步骤2)处于数据缓冲区之前,它们属于生产环境的范畴,由开发工程师负责;步骤3)和步骤4)属于分析系统的范畴,由数据工程师负责。

这个框架还解决了数据直连方式面临的以下问题。

1)生产系统与分析系统的耦合问题。通过数据缓冲区实现了生产系统和分析系统的解耦,无论生产系统如何变更,只要传输至数据缓冲区中的文本文件格式不变,分析系统就不受影响;而分析系统在将数据加载至数据平台的时候,也不会影响到生产系统的性能。

2)数据权限的问题,让数据更加安全。数据缓冲区的左边完全由开发工程师负责,因此生产数据库权限不会流转至后端的数据工程师;而通过在数据批量导出过程中对敏感数据的屏蔽处理(如对手机号码加密等)后,后端数据平台无法看到敏感数据,提高了数据安全性。

3)增强了数据平台的可扩展性。由于各种数据平台,如传统数据仓库、Hadoop平台、MPP数据库等均对文本文件有良好的支持,不同平台之间的数据交互,均可以通过数据缓冲区实现数据交互。例如,Hadoop平台的Hive数据仓库可以通过数据缓冲区与传统数据仓库中的关系数据库实现交互。

2.2.2 批量导出

批量导出是将数据库中的数据一次性导出至文本文件中,导出的文件有固定的列分隔符和行分隔符,或者有固定的字段长度。批量导出的方法大致可以分为以下两种(见表2-4)。

表2-4 常用数据库批量导出方法

1)使用ODBC或JDBC接口,如ETL工具或定制的Java程序等。

2)使用批量导出命令,一般是数据库自带的命令。

表2-5中列出了常用数据库的批量导出命令,这些命令将在第3章详细介绍。

表2-5 常用数据库批量导出命令

1. SQL Server : bcp out

bcp是SQL Server自带的批量导出/导入命令,它包括批量导出命令bcp out和批量导入命令bcp in。bcp out命令的语法如代码清单2-1所示(表2-6为参数说明)。

代码清单 2-1

        bcp table_name  out data_file
          [-f format_file]
          [-U login_id]
          [-P password]
            [-S [server_name[\instance_name]]

表2-6 参数说明

例如,从SQL Server数据库表“dbo.巡检商户明细”中批量导出数据到文件“巡检商户明细.dat”中的bcp out命令如代码清单2-2所示。

代码清单 2-2

        bcp dbo.巡检商户明细out d:\巡检商户明细.dat -fd:\巡检商户明细.fmt -U test -P 123-S
    localhost

*注:实际输入命令时,应在一行,中间不能有换行。

代码清单2-1中各参数说明如下。

1)dbo.巡检商户明细是需要导出数据的表名称。

2)d:\巡检商户明细.dat为导出的文件路径及文件名称。

3)-f d:\巡检商户明细.fmt指明格式文件的路径及名称。

4)-U test指明登录数据库所用的用户名称为“test”。

5)-P 123指明数据库用户“test”的密码为“123”。

6)-S localhost指明登录的数据库为“localhost”。

bcp out命令使用了格式文件选项“-f format_file”,用来指明数据文件的格式。格式文件是用来描述数据文件格式的文件,在格式文件中需指明要导出的字段名称、长度、列分割符、行分隔符、排序方式等,可以用在bcp out和bcp in命令中。用在bcp out命令中时,它用来定义导出文件的格式;用在bcp in命令中时,它用来描述待导入文本文件的格式。图2-4所示为格式文件的式样。

图2-4 SQL Server控制文件格式

对图2-4中涉及参数的说明如下:

1)Version是微软公司对SQL Server系列产品的版本编号,如表2-7所示。

表2-7 SQL Server版本号对照表

2)Number of columns是所要导出的数据库表所含的字段个数。

3)Host file field order是输出的数据文件字段列的序号,与Server column order对应,一般与Server column order保持一致。

4)Host file data type保持默认值“SQLCHAR”即可。

5)Prefix length(前缀长度),一般导出的数据文件字段都不填充前缀,因此此处为0。

6)Host file data length为输出文件字段的长度,此处如果Terminator指明的分隔符不为“”,则该参数并不起实际作用。

7)Terminator指明数据文件的列分隔符和行分隔符(格式文件最后一行该字段为行分隔符)。

8)Server column order为数据表中字段的数据,与Host file field order保持一致。

9)Server column name是数据库中的字段名称。

10)Column collation指明字段的collation,一般仅为字符格式的字段使用,保持默认即可。

格式文件中应重点关注的部分为Number of columns、Host file field order、Terminator、Server column order、Server column name,其余保持默认即可。

如何制作格式文件呢?下面使用一个具体的例子说明如何创建格式文件。假设在数据库中创建表“dbo.巡检商户明细”,其创建的表脚本如代码清单2-3所示。

代码清单 2-3

        CREATE TABLE [dbo].[巡检商户明细]
            (
            [商户代码]     [varchar](30)    NULL,
            [商户名称]     [nvarchar](100)  NULL,
            [机构号]       [varchar](20)    NULL,
            [合作方名称]   [nvarchar](100)  NULL,
            [分公司]       [nvarchar](50)   NULL
                );

使用bcp format命令,可得到该表的控制文件模板“巡检商户明细.fmt”,命令如代码清单2-4所示。

代码清单 2-4

        bcp dbo.巡检商户明细format nul -c -f巡检商户明细.fmt -Utest -P123-Slocalhost

巡检商户明细.fmt文件中默认的列分隔符为“\t”(制表符)、行分隔符为“\r\n”(回车换行符),其内容如代码清单2-5所示。

代码清单 2-5

        9.0
        5
        1   SQLCHAR 0       30      "\t"           1       [商户代码]
            SQL_Latin1_General_CP1_CI_AS
        2   SQLCHAR 0       100     "\t"           2       [商户名称]
            SQL_Latin1_General_CP1_CI_AS
        3   SQLCHAR 0       20      "\t"           3       [机构号]
            SQL_Latin1_General_CP1_CI_AS
        4   SQLCHAR 0       100     "\t"           4       [合作方名称]
            SQL_Latin1_General_CP1_CI_AS
        5   SQLCHAR 0       50      "\r\n"         5       [分公司]
            SQL_Latin1_General_CP1_CI_AS

注:格式文件的最后一行必须为空白行,否则在使用bcp命令时会报格式错误。

得到格式文件模板之后,就可以在此基础上进行修改了,比如使用“#”作为列分隔符,使用“\n”作为行分隔符,修改后的格式文件内容如代码清单2-6所示。

代码清单 2-6

        9.0
        5
        1   SQLCHAR    0   30    "#"   1   [商户代码]       SQL_Latin1_General_CP1_CI_AS
        2   SQLCHAR    0   100   "#"   2   [商户名称]       SQL_Latin1_General_CP1_CI_AS
        3   SQLCHAR    0   20    "#"   3   [机构号]         SQL_Latin1_General_CP1_CI_AS
        4   SQLCHAR    0   100   "#"   4   [合作方名称]     SQL_Latin1_General_CP1_CI_AS
        5   SQLCHAR    0   50    "\n" 5    [分公司]         SQL_Latin1_General_CP1_CI_AS

这样,将修改后的格式文件用于bcp out命令,则输出的数据文件将使用“#”作为列分隔符,使用“\n”作为行分隔符。作为验证,可以执行如代码清单2-7所示的bcp out命令。

代码清单 2-7

        bcp dbo.巡检商户明细out D:\巡检商户明细.dat -f D:\巡检商户明细.fmt -Utest -P123
        -Slocalhost

查看输出的数据文件“巡检商户明细数据.dat”,其内容如代码清单2-8所示。

代码清单 2-8

        000391952#新白鹿餐厅(百联中环店)#0880#安智餐饮有限公司#安徽分公司
        000472873#颐和四季体验馆#0880#万源城娱乐城#北京分公司
        000901032#书院人家#0880#万家灯火餐饮文化传播公司#北京分公司
        000900109#三阳湾食府#0880#三阳众城文化管理有限公司#北京分公司
        ......

从以上代码清单中可以看到,输出的文本文件使用“#”作为列分隔符,使用“\n”作为行分隔符。可以根据需要修改格式文件,从而得到满足要求的数据文件输出。更详细的bcp命令可参阅微软官方帮助文档。

2. DB2 : export

IBM DB2数据库中数据的批量导出可以使用export命令,其基础语法如代码清单2-9所示(表2-8为参数说明)。

代码清单 2-9

        export to filename of {ixf |del | wsf }
            [ modified by {filetype-mod …} ]        { select-statement |[ where … ]}

表2-8 DB2 export命令的参数说明

例如,在DB2中自带sample数据库中的一张表,其创建的表脚本如代码清单2-10所示。

代码清单 2-10

        CREATE TABLE HB_STATIC (
            STA_MTH        VARCHAR(8)     DEFAULT NULL,
            FACE_VALUE     INTEGER        DEFAULT 5,
            TOTAL_AMT      BIGINT         DEFAULT 0
        );

现在需要把这张表中的数据导出成文本文件,运行db2cmd,依次运行如代码清单2-11所示的命令。

代码清单 2-11

        db2 connect to sample
        db2 export to d:\\hb_static.del of del modified by chardel'' coldel; select *
        from hb_static

export命令指明导出文件路径及名称“d:\\hb_static.del”, chardel“指明使用”作为字符串定界符(数据库表中的字符类型的数据使用该字符串包裹), coldel;指明使用;作为列分隔符。最终得到的文本文件d:\\hb_static.del的内容(部分)如代码清单2-12所示。

代码清单 2-12

        '201402';20;480
        '201402';5;165
        '201402';10;200
        '201402';50;1100
        '201403';5;915

由于DB2的export命令没有提供形如SQL Server格式文件之类的控制文件,而仅通过命令选项指定列分隔符、字符串界定符,使得DB2的export命令导出的文本文件格式较为单一(例如,只能使用一个字符作为列分隔符),但在大多数场景中,export可以满足要求。

3. Oracle : sqluldr2

Oracle数据库未提供批量导出命令,一般采用第三方工具进行批量导出。SQLPLUS提供的Spool工具虽然可以进行数据导出,但是它并不适合大量数据的快速导出,主要原因是其导出效率很低,大量数据导出会非常耗时。

另一款批量导出工具sqluldr2适用于大批量数据的导出,速度非常快,可以将数据以csv、txt等格式导出。

首先需要下载sqluldr2.exe(可上网搜索),如果安装的是64位的Oracle,则需要下载sqluldr264.exe,然后将sqluldr2.exe复制到$ORACLE_HOME的BIN目录(该目录中有Oracle自带的sqlldr.exe,这是Oracle的批量导入工具。没错,Oracle提供了批量导入工具。却没有提供批量导出工具)中。现在就可以开始使用sqluldr2.exe了,sqluldr2的命令格式如代码清单2-13所示(表2-9为其参数说明)。

代码清单 2-13

        sqluldr2 logon_str {query="select_statement" | sql=sql_file }
        [file=output_file]
        [field=col_del]
        [record=row_del]
        [quote=quote]

表2-9 sqluldr2的参数说明

例如,在Oracle数据库中有一张表,其创建表的脚本如代码清单2-14所示。

代码清单 2-14

        CREATE TABLE “ODS"."DP_COMMENT"
        (
            "MEMBERID"             VARCHAR2(50),
            "TASTE"                VARCHAR2(50),
            "ENVIRONMENT"          VARCHAR2(50),
            "SERVICE"              VARCHAR2(50),
            "LEVEL_SCORE"          VARCHAR2(50),
            "CONTENT"              VARCHAR2(2000),
            "SHOPID"               VARCHAR2(50)
        );

使用sqluldr2将表中的数据导出至文本文件DP_COMMENT.txt中,如代码清单2-15所示。

代码清单 2-15

        sqluldr2 ods/ods@yfb_orc query="select * from ODS.DP_COMMENT"
              file=d:\\DP_COMMENT.txt field=#$

sqluldr2命令将表中的数据导出至DP_COMMENT.txt, field=#$指明导出的文本文件中的列使用#$进行分割。查看文件DP_COMMENT.txt,其内容如代码清单2-16所示(部分)。

代码清单 2-16

        195084790#$3#$3#$3#$40#$菜品味道中规中矩,价钱稍贵!#$17222108
        630568#$4#$4#$4#$50#$很满意 这是很好的一次体验 谢谢#$17222108
        178216498#$3#$3#$3#$50#$尝试了真不错#$17222108

该命令的一个很有用的选项为table选项,该选项可以生成一个默认的控制文件,该控制文件可以用于Oracle的sqlldr命令进行数据批量导入。在上述导出的命令中加入table选项,如代码清单2-17所示。

代码清单 2-17

        sqluldr2 ods/ods@yfb_orc query="select * from ods.dp_comment"
        file=d:\\dianping_comment.txt field=#$ table=dp_comment

执行上述命令后,除了输出文件d:\\DP_COMMENT.txt外,还生成了控制文件dp_comment_sqlldr.ctl,代码清单2-18是dp_comment_sqlldr.ctl的内容(略做了修改,删除了注释部分)。

代码清单 2-18

        load data
        infile 'd:\\dp_comment.txt'
        insert into table dp_comment
        fields terminated by x'2324' trailing nullcols
        (
          "memberid"       char(50)               nullif  "memberid"=blanks,
          "taste"          char(50)               nullif  "taste"=blanks,
          "environment"    char(50)               nullif  "environment"=blanks,
          "service"        char(50)               nullif  "service"=blanks,
          "level_score"            char(50)               nullif  "level_score"=blanks,
          "content"                char(2000)             nullif  "content"=blanks,
          "shopid"                 char(50)               nullif  "shopid"=blanks
        )

代码清单2-18中,x'2324’是十六进制的字符#$(由上文中sqluldr2的field选项指明)。上述控制文件可以用于数据批量导入Oracle数据库中,控制文件用于批量导入的方法请参阅批量导入章节的内容。

4. Hive : hadoop fs

Hive是Hadoop平台上一款非常流行的数据仓库分析工具,由于类似SQL的语言风格,使得其学习成本很低,所以是大数据分析的必学工具。

Hive数据导出,主要的应用场景是将Hive表中的数据导出到Linux操作系统中,然后供其他数据产品使用。该导出过程可以使用ETL工具(参阅第2.3节),也可以使用hadoop shell命令完成。

例如,现在有一张Hive表,其创建表的脚本如代码清单2-19所示(未列出全部字段)。

代码清单 2-19

        create table adobe_nwd_prd
        (
            accept_language        string  comment        ’浏览器中可接受的语言标题’,
            browser                bigint  comment        ’实际用于单击的浏览器ID' ,
            domain                 string  comment        ’用户ISP域’           ,
            duplicate_events       string  comment        ’列出计为重复的每个事件’ ,
            ......
        )
        comment 'adobe pc端原始数据’
        partitioned by(load_day string)
        row format delimited
        fields terminated by '\t'
        stored as textfile;

Hive表使用了load_day字段作为partition字段,即每天的数据存放在一个partition中,通过hive shell命令可以查看当前表中的partition情况,如代码清单2-20所示。

代码清单 2-20

        hive> show partitions adobe_nwd_app;
        OK
        load_day=20150908
        load_day=20150909
        load_day=20150910
        load_day=20150911
        load_day=20150912
        load_day=20150913
        load_day=20150914
        Time taken: 0.379 seconds, Fetched: 7 row(s)

每个partition对应一个hdfs目录,按照1.2.3节中“Hive分区表与增量更新”中的规则,Hive表的数据存放路径如代码清单2-21所示。

代码清单 2-21

        [root@qzy ~]# hadoop fs -ls /data/adobe.ADOBE_NWD_APP
        Found 7 items
        drwxr-xr-x   -root supergroup     02015-09-09 00:40
        /data/adobe.ADOBE_NWD_APP/20150908
        drwxr-xr-x   - root supergroup    02015-09-10 00:39
        /data/adobe.ADOBE_NWD_APP/20150909
        drwxr-xr-x   - root supergroup    02015-09-11 06:18
        /data/adobe.ADOBE_NWD_APP/20150910
        drwxr-xr-x   - root supergroup    02015-09-12 00:33
        /data/adobe.ADOBE_NWD_APP/20150911
        drwxr-xr-x   - root supergroup    02015-09-13 00:33
        /data/adobe.ADOBE_NWD_APP/20150912
        drwxr-xr-x   - root supergroup    02015-09-14 00:38
        /data/adobe.ADOBE_NWD_APP/20150913
        drwxr-xr-x   - root supergroup    02015-09-14 10:33
        /data/adobe.ADOBE_NWD_APP/20150914

使用代码清单2-22所示命令将partition(load_day=20150908)的所有数据导出至Linux系统的本地目录/tmp/datafiles中。

代码清单 2-22

        hadoop fs -copyToLocal /data/adobe.ADOBE_NWD_APP/20150908 /tmp/datafiles

通过程序可以循环导出指定时间范围内的数据,第3章将提供一种Java批量导出Hive数据的多线程实现。

上述方式实现的是不含条件参数的批量导出,如果希望导出where条件限定的数据,则需要将数据事先生成一张中间表,然后将此中间表的全部数据导出,此处不再赘述。

2.2.3 FTP传输

由于数据缓冲区实际上是文件服务器,在内网环境中使用FTP进行传输是一种很方便的方式。在数据闭环中,FTP传输连接了数据缓冲区的上游和下游,稳定高效的文件传输对整个数据闭环起重要作用。

对数据工程师来说,FTP自动传输可以通过ETL工具、命令行、定制程序等方式实现。

ETL工具一般都自带文件传输模块,可以直接使用FTP文件传输功能。例如,开源ETL工具Pentaho Kettle的作业功能中,即有文件传输模块,包含FTP上传、FTP下载等组件,图2-5所示的为Kettle的文件传输组件。

图2-5 Kettle中的文件传输模块

但ETL工具提供的FTP传输模块有一个局限,就是待传输的文件名如果是动态的,例如文件名称以日期作为后缀,或者根据条件选择不同的文件进行传输,则使用ETL工具实现起来会比较困难(虽然ETL工具本身也提供简单的参数输入及文件名称表达式匹配,但总体实现成本比其他两种方式要高很多)。

一种替代的方案是使用脚本语言,也就是通过脚本将FTP命令包装起来,以实现参数的传入,解决动态文件名问题。例如在Linux系统上,使用shell脚本或Python脚本,调用FTP命令来实现文件的FTP上传或FTP下载。

本书推荐的方式是使用高级程序语言进行FTP文件传输的包装,因为高级程序语言一般都提供FTP文件传输的程序包,可以根据需要记录传输日志,并且可以方便实现多线程FTP文件传输,提高传输效率。比如Java的commons-net-x.x.x.jar(x.x.x代表版本号)包就提供了FTP命令的API接口,代码清单2-23是该接口的调用示例(具体实现请参阅第3章)。

代码清单 2-23

        import org.apache.commons.net.ftp.FTPClient;
        FTPClient ftpClient = new FTPClient();
        ftpClient.connect(serverIP);
        ftpClient.login(user, passsWord));
        ftpClient.retrieveFile(remoteFileName, new FileOutputStream(localFilePath));

2.2.4 批量导入

1. SQL Server:bcp in

SQL Server提供的批量导入命令bcp in是bcp out的反向操作。其命令的格式与bcp out的也基本一致,相比bcp out, bcpin增加了几个重要选项,即错误文件-e选项、最大允许错误行数-m选项,如代码清单2-24所示(各选项的详细说明见表2-10)。

代码清单 2-24

        bcp table_name  in data_file
        [-f format_file]
        [-e err_file]
        [-m max_errors]
        [-U login_id]
        [-P password]
        [-S server_name]

表2-10 bcp in的选项说明

-m选项在批量导入大文件时是非常有用的,由于一些数据会含有少量的错误数据,而这些错误数据并不影响整体的数据效果,这些场景本身对数据的完整性要求并不高(不同于交易明细数据),这时使用-m选项指定一个阈值,当错误行数小于阈值的时候,bcp继续执行,可以将正确的数据导入,确保业务的正常进行。

-m选项通常与-e选项配合使用,可以从-e选项指定的err_file中查看出现错误的数据行。

目前许多互联网公司网站中会嵌入网站日志分析工具,实时收集用户的单击行为,如Adobe omniture、WebTrends等,这些工具会产生大量日志数据,而这些日志数据中不可避免地存在部分无法正常导入数据库的记录,通过-m选项可以跳过这些错误数据,从而保证绝大部分数据可用。

2. DB2:import

DB2批量数据导入可使用db2 import命令,该命令是db2 export命令的反向命令,其格式与db2 export命令的一致。例如将第2.2.2节中“DB2:export”导出的文本文件“d:\\hb_static.del”再导入至表HB_STATIC2(该表结构与表HB_STATIC的相同)中,方法如代码清单2-25所示。

代码清单 2-25

        db2 import from d:\\hb_static.del of del modified by chardel'' coldel; insert
        into HB_STATIC2

上述命令成功执行后,文件中的数据即被导入至数据库中。该命令执行后的反馈信息如代码清单2-26所示。

代码清单 2-26

        SQL3109N  实用程序正在开始从文件 "d:\\hb_static.del" 装入数据中。
        SQL3110N  实用程序已完成处理。从输入文件读了 "115" 行。
        SQL3221W  ...开始COMMIT WORK。输入记录计数 = "115"。
        SQL3222W  ...对任何数据库更改的COMMIT都成功。
        SQL3149N  处理了输入文件中的 "115" 行。已将 "115" 行成功插入表中。拒绝了 "0"行读取行数         = 115
        跳过行数         = 0
        插入行数         = 115
        更新行数         = 0
        拒绝行数         = 0
        落实行数         = 115

3. Oracle:sqlldr

Oracle自带的批量导入工具sqlldr可以实现数据的快速批量导入,其命令格式如代码清单2-27所示(表2-11为其参数说明)。

代码清单 2-27

        sqlldr logon_str control=ctr_file log=log_file bad=bad_file errors=max_errors

表2-11 sqlldr的参数说明

其中控制文件的作用类似于SQL Server中的格式文件,可以通过此控制文件指明输入文本的格式,以及导入数据库时的加载方式,图2-6所示的为Oracle控制文件的格式说明。

图2-6 Oracle控制文件的格式

Oracle sqlldr批量导入有多种方式,表2-12对此做了总结。

表2-12 sqlldr的四种导入方式

下面尝试将Adobe omniture对某网站的监控日志导入Oracle的数据库中,命令如代码清单2-28所示。

代码清单 2-28

        sqlldr ods/ods@yfb_orc control=e:\Adobe\adobe_src_data.ctl
        log=e:\Adobe\log.txt bad=e:\Adobe\error_record.txt errors=1000000

由于该文件是对网站单击行为的日志记录,因此对错误行的容忍度是比较大的,为了保证能够将正确的数据导入,我们设置errors=1000000,即允许导入过程出现1000000条错误记录,这些错误记录会被记录到bad选项指定的“e:\Adobe\error_record.txt”中。

控制文件“e:\Adobe\adobe_src_data.ctl”指明了数据文件、文件分隔符信息、加载方式等内容,如代码清单2-29所示。

代码清单 2-29

        load data infile 'E:\Adobe\01-niwodai-prd_2015-07-26.tsv'
        append into table adobe_src_data
        fields terminated by '\t'
        trailing nullcols
        (
          accept_language
        ,browser
        ,browser_height
        ......
        )

上述sqlldr命令在Windows的cmd窗口中执行后,查看log文件“e:\Adobe\log.txt ”的内容,如代码清单2-30(仅列出部分内容)所示。

代码清单 2-30

        SQL*Loader: Release 11.2.0.1.0- Production on星期五 7月 31 09:22:222015
        Copyright (c) 1982, 2009, Oracle and/or its affiliates.  All rights reserved.
        控制文件:      e:\Adobe\adobe_src_data.ctl
        数据文件:      e:\Adobe\01-niwodai-prd_2015-07-26.tsv
        错误文件:      e:\Adobe\error_record.txt
        废弃文件:      未作指定
          (可废弃所有记录)
        要加载的数: ALL
        要跳过的数: 0
        允许的错误: 1000000
        绑定数组: 64 行,最大 256000 字节
        继续:    未作指定
        所用路径:       常规
        表ADOBE_SRC_DATA,已加载从每个逻辑记录
        插入选项对此表APPEND生效
        TRAILING NULLCOLS选项生效
            列名                         位置        长度        中止      包装数据类型
        ------------------------------ ---------- ----- ---- ---- ---------------------
        ACCEPT_LANGUAGE                  FIRST       *          WHT       CHARACTER
        BROWSER                          NEXT        *          WHT       CHARACTER
        ……
        记录 4245: 被拒绝 - 表ADOBE_SRC_DATA的列USER_AGENT出现错误。
        数据文件的字段超出最大长度
        ......
        ADOBE_SRC_DATA:
          417707 行 加载成功。
          由于数据错误,所以913 行 没有加载。
          由于所有WHEN子句失败,所以 0 行 没有加载。
          由于所有字段都为空的,所以 0 行 没有加载。
        为绑定数组分配的空间:       168216 字节 (1 行)
        读取缓冲区字节数:           1048576
        跳过的逻辑记录总数:         0
        读取的逻辑记录总数:         418620
        拒绝的逻辑记录总数:         913
        废弃的逻辑记录总数:         0
        从 星期五 7月   31 09:22:222015 开始运行,
        在 星期五 7月   31 10:19:562015 处运行结束
        经过时间为: 00: 57: 34.13
        CPU时间为: 00: 03: 21.71

从以上代码中可以看到,日志文件记录了很多重要信息,如sqlldr命令中指定的控制文件(control=e:\Adobe\adobe_src_data.ctl)、错误文件(bad= e:\Adobe\ error_record.txt)、允许的错误条数(errors=1000000)。

日志还记录了控制文件指定的数据文件(e:\Adobe\01-niwodai-prd_2015-07-26.tsv)、插入选项对表APPEND生效(即加载方式,append into)、TRAILING NULLCOLS选项生效(trailing nullcols,该选项生效时,当数据文件中出现连续两个列分隔符时,对应字段值将被置为null)。

在罗列了表中所有字段信息之后,日志文件记录了被拒绝的记录在数据文件中所在的行,以及被拒绝的原因,之后的信息还展示了加载成功的行数,以及由于错误被拒绝的行数,最后记录了导入该文件的耗时为57分钟34秒,之所以消耗这么长时间,是由于数据文件每行包含了265个字段,且字段长度都比较大。

日志文件有很多潜在的用途,通过程序读取日志文件可以将其中的重要信息展示在页面上,或写入日志数据库,从而便于作业的管理和监控。对日志文件的处理是数据闭环监控中的重要手段之一。

4. Hive:add partition

Hive有多种批量加载方式,根据数据文件存放的位置不同,Hive加载数据面临两种情形:从本地文件系统加载数据以及从HDFS中加载数据。Hive shell提供load data命令可以完成上述两种情形下的数据批量导入,如代码清单2-31所示。

代码清单 2-31

        load data [local] inpath 'data_file' into table table_name;

当导入本地文件至Hive表中时,需要指明local关键字,并且随后的data_file参数用于指明基于本地文件系统的完整文件路径;当导入hdfs文件至Hive表中时,不需要local关键字,且data_file为hdfs文件系统的文件路径或hdfs文件url。

为了便于程序化实现,这里采用Hadoop shell结合Hve shell的方式实现Hive表的批量导入。

根据第1章的Hive表更新规则,我们通过两个步骤完成数据的批量导入。首先通过Hive shell为Hive表增加一个partition,并指定location;然后使用Hadoop shell将文件copy至该partition对应的location目录,图2-7展示了这个过程。

图2-7 Hive shell + Hadoop shell批量导入数据至Hive表

按照图2-7所示的方式,我们尝试将上述Adobe数据文件“e:\Adobe\01-niwodai-prd_2015-07-26.tsv”导入至Hive表中。

为了确保location指定的hdfs目录存在,先执行hadoop shell命令,创建一个目录,如代码清单2-32所示。

代码清单 2-32

        hadoop fs -mkdir /data/adobe_log_app/20150726

下面就可以分两步完成数据的导入。首先为表adobe_log_app增加一个新的partition,在hive shell环境中,执行如代码清单2-33所示的命令。

代码清单 2-33

        hive>alter table adobe_log_app add partition(load_day='20150726') location
        '/data/adobe_log_app/20150726'

然后,使用hadoop shell将文件复制到hdfs目录“/data/adobe_log_app/20150726”中,如代码清单2-34所示。

代码清单 2-34

        hadoop fs -copyFromLocal
        /usr/queziyang/data/01-niwodai-prd_2015-07-26.tsv
        /data/adobe_log_app/20150726

至此,即完成了数据文件从本地文件系统批量导入至Hive表中,整个过程相对比较耗时的操作仅出现在数据文件复制的过程中,且复制效率要比hive shell的load data的高。上述的各个命令,可以通过编程语言调用,从而实现自动化。

5. Hbase:bulk load

Hbase的数据加载方式也有很多种,但最高效的方式是使用Hbase的bulk load命令。Hbase的bulk load命令分为两步:

1)使用一个MapReduce作业将数据转换为Hbase的内部数据格式。

2)将生成的StoreFiles直接加载到Hbase集群中。

代码清单2-35展示了MapReduce生成HFile的过程,这个过程仅包含Map,不需要Reduce。

代码清单 2-35

        public class HFileGenerator {
            public static class HFileMapper extends Mapper<LongWritable, Text,
                ImmutableBytesWritable, KeyValue> {
                    @Override
                    protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                            // 获取RowKey
                            ImmutableBytesWritable rowkey = new
                            ImmutableBytesWritable(items[0].getBytes());
                            //此处添加处理输入数据文件的代码……
                            //按照org.apache.hadoop.hbase.KeyValue的格式输出
                            KeyValue kv = new KeyValue(Bytes.toBytes(items[0]),
                            Bytes.toBytes("item"), Bytes.toBytes(column), System.currentTimeMillis(),
                            Bytes.toBytes(prefValue));
                            context.write(rowkey, kv);
                    }
            }
            public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
                    Configuration conf = new Configuration();
                    String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                    Job job = new Job(conf, "HFile bulk load Job");
                    job.setJarByClass(HFileGenerator.class);
                    job.setMapperClass(HFileMapper.class);
                    job.setReducerClass(KeyValueSortReducer.class);
                    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                    job.setMapOutputValueClass(KeyValue.class);
                    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
                    FileInputFormat.addInputPath(job, new Path(dfsArgs[0]));
                    FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));
                    HFileOutputFormat.configureIncrementalLoad(job, hbaseTableName);
                    System.exit(job.waitForCompletion(true) ? 0 : 1);
            }
        }

代码主要包含HFileMapper和一个main()函数,在HFileMapper的map方法中处理输入数据文件,并按照org.apache.hadoop.hbase.KeyValue的格式输出。

在main()函数中,FileInputFormat.addInputPath用于指定输入文件的路径(HDFS路径), FileOutputFormat.setOutputPath则用于指定MapReduce作业的输出路径,即生成的HFile最终的存放路径。

最后通过HFileOutputFormat.configureIncrementalLoad(job, hbaseTableName)导入hbase表的相关信息,从而使得Map的输出最终与hbaseTableName相匹配。

上述过程在指定的输出路径中生成HFile文件,可以通过org.apache.hadoop.hbase. mapreduce.LoadIncrementalHFiles的doBulkLoad方法将其挂载到对应的hbase表中,代码清单2-36展示了这个过程。

代码清单 2-36

        public class HFileLoader {
            public static void main(String[] args) {
                String[] dfsArgs = null;
                try {
                    dfsArgs = new
                    GenericOptionsParser(HbaseUtils.getConfiguration(),
                    args).getRemainingArgs();
                    LoadIncrementalHFiles loader = new
                    LoadIncrementalHFiles(HbaseUtils.getConfiguration());
                    loader.doBulkLoad(new Path(dfsArgs[0]),
                    HbaseUtils.getTable(hbaseTableName));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

这个过程非常简单,只需要指明已经生成的HFile的路径,其他的工作交给doBulkLoad方法即可。加载非常快,因为它实际上是执行类似hadoop的mv操作。

bulk load主要的耗时阶段在于生成HFile,而在加载阶段则非常迅速。这种方式比较适用于往Hbase空表中加载数据的情况,因此当对Hbase进行全量更新时,这是首选方式。

但这种方式在增量加载时,就没有那么高效了,因为新的HFile的加入,会触发Hbase的split和rebalance操作,这会使doBulkLoad的过程非常缓慢。所以在对Hbase进行批量加载的时候,应该尽量使用全量更新的方式,如果增量更新不可避免,则使用原生的API接口逐条put入库将是最后的选择。

2.3 ETL

ETL是Extract-Transform-Load的缩写,是数据的抽取、转换、加载过程,当需要将数据从一个环境转移到另一个环境时(例如从生产环境到分析环境),或者需要对数据进行进一步加工处理时(例如在分析环境中,在数据仓库基础上产出每日交易量指标),即需要借助ETL过程。

ETL是构建数据闭环自循环过程的重要途径,几乎每个环节均可以通过ETL来完成。通过专门的ETL工具,定制满足业务要求的ETL作业,并结合自动调度工具,即可以实现数据的自动循环。

2.3.1 ETL工具

目前国内商用ETL工具以IBM的Datastage为代表,该ETL工具在金融行业有广泛的应用,但Datastage价格昂贵,许多公司从成本上考虑,采用了Pentaho的开源ETL工具Kettle。

在本书成书时,尽管Kettle本身仍然存在一些Bug,但由于部署简单、使用方便并且完全免费的特点,使其成为越来越多用户的首选。

1.开源ETL工具:Kettle

Kettle是国外的一款开源ETL工具,中文名称为水壶。在写作本书的时候,官网上最新的发布版本为Data Integration 5.4.0,下载后会得到一个pdi-ce-5.4.x.x-xxx.zip的压缩包,使用解压缩工具解压该压缩包,便可以开始使用Kettle了。

Kettle可以运行在Windows环境或者Linux环境,如果运行在Windows环境,则进入解压后的data-integration目录,可以看到Spoon.bat(Linux环境为Spoon.sh),双击Spoon. bat打开Kettle的图形界面,如图2-8所示。

图2-8 Kettle的图形界面

图2-8展示了Kettle的两个基本组件:转换(transformation)和作业(job)。转换用来定义数据处理的一个或多个步骤(step),如读取文件、过滤输出行、数据清洗、数据加载至目的数据库等。作业用来将多个定制完成的转换串接起来,使转换能够按照一定的顺序和规则执行。

定义完成的转换和作业可以使用程序或者脚本进行调用,首先将定义的转换或者作业存储在Kettle的资源库(Repository)中,然后通过Kettle提供的Pan和Kitchen组件分别进行调用(Pan用来调用transformation, Kitchen用来调用job),我们将在随后的内容介绍调用方法。

通常,首次接触“资源库”这个词汇会让人感觉难以理解,但撇开这个名词本身,它本质上就是创建关系数据库中的一些配置表,这些配置表用来存储转换或者作业的相关信息(如转换的名称、数据库连接的字符串等), Pan和Kitchen组件可以根据资源库里的这些信息来调用对应的转换或者作业。

在使用资源库之前,需要先创建一个资源库,在图形界面中进入工具→资源库→连接资源库,即可以出现相应的创建向导,通过向导可以轻松完成资源库的创建。

一旦成功创建并连接了资源库,随后进行的文件读取和保存操作就可以对应到该资源库。例如,作者在自己的测试环境中连接资源库后,单击菜单文件→打开,Kettle会自动读取资源库中保存的转换和作业信息,并将所有的转换和作业展示出来以供选择,如图2-9所示。

图2-9 Kettle资源库中的转换和作业

这些存放在资源库中的转换和作业,可以通过Pan和Kitchen组件进行调用,Pan是Kettle提供的用于批量调用转换的工具,Kitchen是用于批量调用作业的工具。在data-integration目录中可以找到Pan.bat和Kitchen.bat(Windows环境对应Pan.sh, Linux环境对应Kitchen.sh)。

在Linux环境中可以使用如代码清单2-37所示的脚本调用转换。

代码清单 2-37

        pan.sh -rep=kettle_rep_test -trans="batch-into-table" -dir=/ -user=admin
        -pass=admin
            -level=Basic

调用作业可以使用如代码清单2-38所示的命令。

代码清单 2-38

        kitchen.sh -rep=kettle_rep_test -job="hive-oracle-test" -dir=/ -user=admin
        -pass=admin
        -level=Basic

如果有很多转换和作业需要运行,那么可以将这些命令写在一个shell脚本中,然后通过Linux系统自带的Crontab进行调度,或者通过专门的调度工具进行调度(请参阅2.4节)。

Kettle方面的推荐书籍:《Pentaho Kettle解决方案:使用PDI构建开源ETL解决方案》(作者:Matt Casters、Roland Bouman、Jos van Dongen著,初建军、曹雪梅译,电子工业出版社)。

2.商用ETL工具:DataStage

DataStage是IBM InfoSphere开发的一款商用ETL工具,是IBM InfoSphere Information Server套件的简称。该套件包含三个组件:InfoSphere DataStage and QualityStage Designer、InfoSphere DataStage and QualityStage Director、InfoSphere DataStage and QualityStage Administrator。

InfoSphere DataStage and QualityStage Designer用于创建DataStage作业。

InfoSphere DataStage and QualityStage Director用于验证、调度、运行和监视DataStage作业。

InfoSphere DataStage and QualityStage Administrator用于系统管理(例如设置IBM InfoSphere Information Server用户,记录、创建和移动项目,设置清除记录的条件等)。

显然,相对于Kettle的轻量级部署,DataStage本身的架构已经非常复杂,相应的部署要求也比较高。DataStage作为IBM公司的一款产品,其目标客户群为大型企业,它甚至支持在大型机上运行ETL作业(能够生成可在大型机上运行的COBOL代码)。由于目前国内的银行仍然沿用IBM公司的大机系统,所以DataStage在国内的客户多存在于金融行业。

DataStage价格昂贵,一般需要支付年服务费、购买License等。同样,由于收取年服务费,所以能够提供很好的培训和技术支持。因此,需要根据企业自身特点选择商用ETL工具或者开源ETL工具,对于小型公司而言,开源工具仍是首选。

2.3.2 ETL作业

ETL作业是按照一定顺序组织的数据处理过程,它将数据处理的各个环节关联起来,并定义各个环节的触发规则,从而完成整个数据处理流程。

以Kettle为例,ETL作业由多个步骤(或称为作业项)组成,如图2-10所示。该作业除了开始的“START”与最后的“成功”步骤外,还包含以下三个实体作业项:

图2-10 ETL作业示例

1)检测昨日交易明细文件是否存在。

2)SQL Server批量加载。

3)统计昨日交易。

作业项1)负责检测昨日的交易明细文件是否存在,如果该步骤返回“true”,则进行下一个作业项,否则退出作业。

作业项2)将昨日的交易明细文件批量加载至SQL Server数据库中,该作业项需要指定文件名称、格式文件等相关信息,使用的命令即是“SQL Server :bcp in”章节中讲述的方式。如果该作业项执行成功,那么昨日交易明细数据将增量更新至SQL Server的数据库表中;如果该作业项执行失败,则退出作业。

作业项3)对作业项2)中加载的交易数据进行统计,该作业项执行一段SQL脚本,并将计算结果存储在对应的结果表中。

图2-10中的ETL作业按照预定的顺序将多个作业项串联起来,完成一个完整的数据加载和统计过程,该过程的每个步骤作为一个作业项独立存在,仅当上游的作业项执行成功后,才开始下一个作业项的执行。

需要注意的是,ETL工具仅用于作业的创建和简单调度,如果需要周期性地执行ETL作业,则需要使用专门的调度工具。

为了使ETL作业便于调度和监控,为ETL作业制定规范是一项非常重要的工作,良好的ETL作业命名规范和日志规范可以极大地方便作业监控和错误排查。接下来深入介绍这两个实用性的操作规范:ETL作业命名规范和ETL作业日志规范。

1. ETL作业命名规范

ETL作业命名规范主要是为了通过作业名称来标识作业的归属、重要程度、主要用途等,以便于作业的自动调度和监控,它不是ETL工具的强制要求。

通常需要根据企业具体的管理要求为ETL作业制定命名规范,该规范要尽可能地反应作业的归属用途等,并且长度不能太长,下面给出一个范例:

[员工编号].[作业类型].[作业描述]

该命名规范包含三个部分,用“. ”分割:

1)员工编号,用于描述作业的归属,一般使用作业创建人或者负责人的员工编号。

2)作业类型,用于描述作业的重要程度,比如将作业类型定义为analysis、report、product等,分别对应分析、报表、生产。不同的作业类型的作业出现错误时,可以根据重要程度进行不同等级的报警通知。

3)作业描述,用于描述作业的主要功能,比如图2-10中的作业可以描述为trx_load_and_static,或者使用中文描述(如果ETL工具支持中文名称)。

命名规范同样可以规定ETL作业中出现的字母统一使用大写或者小写,本书采用小写的方式。按照这个规范,图2-10中的ETL作业将命名为:z06837. analysis.trx_load_and_static,其中z06837是员工编号,analysis说明该作业属于分析型的作业。

按照规范进行作业命名后,作业监控进程便可以自动发现运行失败的作业,并且根据作业名称中的员工编号找到该员工的邮箱地址和手机号码(需预先在数据库中保存员工编号与邮箱地址和手机号码的对应关系),并发送邮件通知和短信提醒,还可以根据作业类型在邮件中标记紧急程度,这部分内容将在2.5节进一步展开。

2. ETL作业日志规范

ETL作业一般包含多个步骤(作业项),作业运行中某些步骤可能运行失败,记录下失败原因对于错误排查非常重要。

虽然ETL工具都自带日志记录功能,但系统自动记录的日志信息一般可读性很差且缺乏灵活性。ETL作业日志规范就是要自定义一个统一且灵活的日志记录方式,以便于作业的监控和错误排查。下面给出一个ETL作业日志规范的范例:

1)ETL作业中需包含记录作业开始和作业完成的作业项。

2)每个作业项均需增加作业项运行失败分支,并发送邮件通知。

3)日志记录统一记录在数据库表etl_job_log中。

4)日志记录中的状态在作业状态表etl_job_status中统一定义。

5)使用统一的存储过程进行日志记录。

根据规范1、2的要求,图2-10中的ETL作业将修改为图2-11所示的样子。

图2-11 满足规范1和规范2的ETL作业

图2-11中,作业开始后增加了一个作业项“记录日志:作业开始”,这个作业项往etl_job_log表中插入一条新记录,记录今日该作业的开始时间等相关信息。另外,在作业的最后加入了作业项“记录日志:作业完成”,用于更新作业的最终状态,图2-12是表etl_job_log中记录的部分ETL作业日志。

图2-12 作业日志表中的部分记录

在其余的三个作业项上面,分别增加了运行错误分支。这些错误分支分别记录对应的作业项出错信息,同时记录下作业项出错时的系统时间,并在错误日志记录完成后,发送失败通知邮件。

修改后的ETL作业会在运行过程中将作业状态自动记录到数据库中,随后BI工具可以根据数据库中的日志记录展示监控报表或者进行错误报警。

日志记录表etl_job_log创建表脚本如代码清单2-39所示。

代码清单 2-39

        create table etl_job_log
        (
        id                 bigint          not null auto_increment comment ’自增长id',
        job_name   varchar(100)   not null comment ’作业名称’,
        run_date   varchar(20)    comment ’运行日期’,
        start_time         datetime        comment ’作业开始时间’,
        end_time   datetime       comment ’作业结束时间’,
        upt_time   datetime       default current_timestamp comment ’更新时间’,
        job_status         int             not null comment ’作业状态id',
        remark             varchar(1000)   comment ’作业状态补充说明’,
        primary key (id)
        );

其中,remark字段的记录原则为:作业项名称+错误说明,例如,“SQL Server批量加载:出错。”可以方便追踪到作业出错的作业项。

job_status字段是表etl_job_status的外键,记录的是状态id,其对应的状态描述可以通过关联表etl_job_status得到。

作业状态表etl_job_status创建表脚本如代码清单2-40所示。

代码清单 2-40

        create table etl_job_status
        (
        id                 bigint         not null auto_increment comment ’自增长id',
        status_desc        varchar(1000)  not null comment ’状态描述’,
        primary key (id)
        );

2.4 作业调度

调度工具用来对作业进行调度,通过ETL工具创建的作业如果需要周期性运行,就需要使用调度工具来完成。调度工具是一个相对复杂的系统,尤其是在跨操作系统、跨应用平台的作业环境中更是如此。

在复杂的作业环境中,需要使用商用调度工具,目前国内使用较多的商用调度工具为Control-M。该工具是BMC Software提供的企业级集中作业调度管理解决方案,能够集中管理跨平台、跨应用的生产控制和调度过程,因此适用于大型复杂的ETL调度场景。

一些相对简单的调度场景可以不使用专门的调度工具实现。比如,如果公司的作业环境全部是Linux系统,则可以使用系统自带的crontab进行调度。

例如,图2-11中的作业配置为每日凌晨1点10分开始运行,那么可以将代码清单2-41的内容保存在文件run_trx_load_and_static.sh中。

代码清单 2-41

        kitchen.sh -rep=kettle_rep_test -job="z06837. analysis.trx_load_and_static"
        -dir=/ -user=admin -pass=admin -level=Basic

然后编辑Linux系统的crontab文件(通过crontab -e命令),再在该文件中加入如代码清单2-42所示的内容。

代码清单 2-42

        10 01 * * * /home/queziyang/shell/run_trx_load_and_static.sh

crontab会在每天凌晨1点10分运行指定的脚本run_trx_load_and_static.sh,这样便可以完成每日定时调度该ETL作业。显然,使用crontab调度作业,本身是没有作业日志记录的,这也是为什么需要遵循ETL作业日志规范的原因之一。

2.5 监控和预警

监控和预警存在于数据闭环的各个阶段,在所有的自动执行环节均可以植入监控和预警点。前期对ETL所做的规范,现在是体现其应用价值的时候了。可以利用这些满足规范的日志记录进行自动监控和预警。

如果有专职的运维人员负责作业的运行监控,使用专门的监控工具,运维工程师可以监控各个服务器的运行信息,并通过监控工具发送预警邮件。

如果仅仅是ETL工程师负责监控自己的ETL作业,那么可以使用BI工具进行监控和预警。

2.5.1 使用监控工具进行监控

有众多的开源监控工具可供使用,如Zipkin、Ramona、zabbix、Ganglia、Nagios等,这些监控工具提供了许多定制的监控和预警服务,但它们通常比较偏于底层日志,如Zabbix主要用来监控CPU负荷、内存使用、磁盘使用、网络状况、端口监视和日志监视。这些监控信息对于保证数据环境的健康运行至关重要,可以根据CPU负荷、内存和磁盘的使用情况进行预警,比如在CPU负荷持续达到90%时进行预警,或者在磁盘使用90%时进行预警等。

监控工具专注于系统可用性方面的监控,如果要专注于ETL作业的运行情况,那么可以使用BI报表工具进行监控。

2.5.2 使用BI工具进行监控

BI(business intelligence)工具是企业环境中广泛使用的数据可视化工具,它可提供丰富的数据可视化能力,同时可提供短信、邮件等通知服务。

鉴于数据的监控和预警本身是基于日志信息的,因此可以使用BI工具丰富的展示和通知服务进行数据系统的监控和预警。

基于图2-12中的ETL作业日志表,BI工具可以定制图形化监控报表,并以Web页面的形式展示出来。作业负责人或者运营人员可以登录该BI系统,查看监控相应的页面,便可以监控作业是否正常。

例如,近期表现抢眼的BI工具Tableau,可以设置每15分钟扫描一下ETL作业日志表,一旦发现有作业异常,就自动发送邮件通知作业负责人。

通过BI工具实现ETL作业的监控和预警,这种方式可以推广到整个数据闭环,其图形化的界面让监控变得简单明了。

2.6 本章小结

本章围绕数据体系提出了数据闭环的概念,对数据闭环的特征进行了描述,并且进一步介绍了构建数据闭环所涉及的方法和技术。

这里着重介绍了在数据闭环中扮演重要作用的“数据缓冲区”的设立理念和实现方式。通过设立数据缓冲区,可以实现系统解耦,让数据闭环具备良好的扩展性,让公司组织间职责更加分明,使数据环境更加安全等。

数据缓冲区中涉及的数据批量导出/导入技术,需要使用到各个RDMS系统的批量操作命令。另外在大数据平台中,需要使用hadoop shell和hive shell脚本来实现批量操作,对于Hbase,则提供了Java实现的bulk load批量导入方式。

ETL作业为数据闭环中定义数据流转方式的环节。为了实现ETL作业的自动化运行和监控,需要引入ETL作业规范:命名规范和日志规范。

作业调度则是保证ETL作业能够实现自动化的手段,监控和预警则进一步保证了ETL作业能够正常运行。

第3章将根据这两章提出的数据理念,通过实战的方式完成数据闭环中关键环节的构建。