數據庫工程師快速上手MaxCompute進行ETL
案例說明
本案例主要是介紹如何通過數加MaxCompute+大數據開發套件兩個產品實現簡單的網站數據統計分析。
適用人群
MaxCompute初學者,特別是無大數據開發基礎但有數據庫使用基礎。
案例側重
數據庫工程師快速上手MaxCompute進行大數據開發,簡單了解在MaxCompute做大數據ETL過程,同時了解一些MaxCompute SQL和常用數據庫SQL的基本區別。
示例介紹
房產網上經常會看到一些排行榜,如最近30日簽約的樓盤排行、簽約金額的樓盤排行等,本示例我們簡單介紹通過對二手房產數據信息表(house_basic_info)統計分析出每個城市二手房均價top 5的樓盤並且給出該樓盤所在城區,最後需要讓這些數據在房產網上呈現。
數據說明
二手房網產品數據信息表house_basic_info存儲於RDS-MySQL(區域:阿裏雲華南1可用區A;網絡:專有網絡),表數據每天全量更新。
“二手房網產品數據信息表”在數加平台公開數據集-二手房產數據集上有,可以直接使用,不過數據量會與本案例呈現的可能不完全一致。
具體表信息如下:
字段 | 字段類型 | 字段說明 |
---|---|---|
house_id | varchar | 房產 ID |
house_city | varchar | 房產所在城市 |
house_total_price | Double | 房產總價 |
house_unit_price | Double | 房產均價 |
house_type | varchar | 房產類型 |
house_floor | varchar | 房產樓層 |
house_direction | varchar | 房產方向 |
house_deckoration | varchar | 房產裝修 |
house_area | Double | 房產麵積 |
house_community_name | varchar | 房產所在小區 |
house_region | varchar | 房產所在地區 |
proj_name | varchar | 樓盤名稱 |
proj_addr | varchar | 項目地址 |
period | bigint | 產權年限 |
property | varchar | 物業公司 |
greening_rate | varchar | 綠化率 |
property_costs | varchar | 物業費用 |
datetime | varchar | 數據日期 |
數據樣例(英文逗號分隔):
000404705c6add1dc08e54ba10720698,beijing,8000000,72717,3室1廳,低樓層/共24層,南,平層/精裝,137,璽萌麗苑,豐台 草橋 三至四環,null,null,null,null,null,null,20170605
RDS-MySQL上house_basic_info表的建表語句,如:
CREATE TABLE `house_basic_info` (
`house_id` varchar(1024) NOT NULL COMMENT '房產 ID',
`house_city` varchar(1024) NULL COMMENT '房產所在城市',
`house_total_price` double NULL COMMENT '房產總價',
`house_unit_price` double NULL COMMENT '房產均價',
`house_type` varchar(1024) NULL COMMENT '房產類型',
`house_floor` varchar(1024) NULL COMMENT '房產樓層',
`house_direction` varchar(1024) NULL COMMENT '房產方向',
`house_deckoration` varchar(512) NULL COMMENT '房產裝修',
`house_area` double NULL COMMENT '房產麵積',
`house_community_name` varchar(1024) NULL COMMENT '房產所在小區',
`house_region` varchar(1024) NULL COMMENT '房產所在地區',
`proj_name` varchar(1024) NULL,
`proj_addr` varchar(1024) NULL,
`period` int(11) NULL,
`property` varchar(1024) NULL,
`greening_rate` varchar(1024) NULL,
`property_costs` varchar(1024) NULL,
`datetime` varchar(512) NULL COMMENT '數據日期'
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
COMMENT='二手房網產品數據信息表';
需求分析
核心目標:統計分析出每個城市二手房均價top 5的樓盤並且給出該樓盤所在城區,即[城市,樓盤、均價、排名、所在城區]。
數據現狀:
- 信息表中,每個樓盤可能有多條記錄,多個均價信息,本案例為簡單起見我們隻針對整個樓盤的均價求平均;
- 信息表中,house_region中包含城區、街道地址信息,需要拆分出城區信息。
- 每天都數據都有變化,每個數據日期的數據都是全量數據。
所需操作:
- 數據從RDS導入MaxCompute;
- MaxCompute上對數據進行統計分析,並得出結果表;
- 將結果表回流到網站業務係統,以便網站直接調用數據進行前端顯示。
需求實現
前置說明
大數據開發套件是架構在MaxCompute上的一站式大數據開發管理工具,大數據開發套件是免費使用;MaxComput主要提供計算和存儲能力,關於MaxCompute入門通常需要了解的信息可參考該文章。
前提條件
開通MaxCompute,創建MaxCompute項目(如項目名:house_web),具體操可參考通過大數據開發套件創建MaxCompute的介紹。
RDS數據導入MaxCompute
步驟一 配置RDS數據源
前置條件:
因RDS數據安全限製,大數據開發套件的數據同步任務要與RDS數據庫進行聯通,必須將執行數據同步任務的機器ip添加到RDS的白名單中,具體的ip請點擊此文檔查看,或者在配置數據源界麵有ip查看入口。
具體操作:
進入大數據開發套件-數據集成大數據開發套件-數據集成-**數據源**,點擊新增數據源,彈框裏配置數據源信息,測試連通性正常後,確定添加即可。
所需RDS-MySQL實例ID即下圖中的實例名稱:
注意:本示例RDS實例所在區域為華南1,網絡類型為專有網絡,通過大數據開發套件進行數據同步時,屬於跨region走專有網絡方式導數據。大數據開發套件的數據集成針對RDS通過反向代理自動檢測使得網絡能夠互通,無需其他特殊處理即可保證數據同步正常聯通。
步驟二 配置數據同步任務
進入大數據開發套件-數據集成大數據開發套件-數據集成-**數據同步**,工作台上點擊“向導模式”新建一個同步任務。
選擇來源:
表每天全量更新,每次統計的數據時隻需統計數據日期為昨天完整一天數據即可,因此數據過濾時,每天自動調度取datatime為昨天日期,可以使用係統參數${bdp.system.bizdate}代替,使得任務每天調度執行自動替換字段值,係統參數說明請看係統調度參數文檔說明。
選擇目標:
本案例是將數據導入到本項目,所以目標選擇默認的數據源odps_first(odps),這時發現一個問題,目標表沒創建,所以我們需要先創建目標表,點擊快速建表(更多建表方式可參考文檔創建表)。
彈框裏顯示係統自動根據源表結構生成對應的MaxCompute建表語句:
CREATE TABLE IF NOT EXISTS your_table_name (
house_id STRING COMMENT '*',
house_city STRING COMMENT '*',
house_total_price DOUBLE COMMENT '*',
house_unit_price DOUBLE COMMENT '*',
house_type STRING COMMENT '*',
house_floor STRING COMMENT '*',
house_direction STRING COMMENT '*',
house_deckoration STRING COMMENT '*',
house_area DOUBLE COMMENT '*',
house_community_name STRING COMMENT '*',
house_region STRING COMMENT '*',
proj_name STRING COMMENT '*',
proj_addr STRING COMMENT '*',
period BIGINT COMMENT '*',
property STRING COMMENT '*',
greening_rate STRING COMMENT '*',
property_costs STRING COMMENT '*',
datetime STRING COMMENT '*'
)
COMMENT '*'
PARTITIONED BY (pt STRING);
注意:
- 自動生成的代碼裏,表名需要修改成真正的目標表表名,可以與源表表名一致house_basic_info;
- 自動生成的代碼裏,源表中varchar類型會對應string類型,int類型會對應bigint類型。MaxCompute目前隻支持6種數據類型,與常用數據庫數據類型有所差異。
- 自動生成的代碼裏,字段不能指定默認值、不能指定是否非空默認都是可空、不能指定長度默認每個字段長度上限為8M。
- 自動生成的代碼會是創建分區表,且分區名稱為pt。MySQL數據庫中沒有分區概念,MaxCompute的分區概念與hadoop分區概念類似,具體可以參考分區介紹。本案例中目標表可以保留分區設置,以時間作為分區。
- 既然已經有時間分區,那麼源表的datetime字段可以不需要同步到目標表,表也可以不需要創建該字段。
- 常用數據庫sql與MaxCompute sql更多差異請看文檔——與主流SQL差異。
綜上所述,修改後的建表語句,並點擊提交:
CREATE TABLE IF NOT EXISTS house_basic_info (
house_id STRING COMMENT '*',
house_city STRING COMMENT '*',
house_total_price DOUBLE COMMENT '*',
house_unit_price DOUBLE COMMENT '*',
house_type STRING COMMENT '*',
house_floor STRING COMMENT '*',
house_direction STRING COMMENT '*',
house_deckoration STRING COMMENT '*',
house_area DOUBLE COMMENT '*',
house_community_name STRING COMMENT '*',
house_region STRING COMMENT '*',
proj_name STRING COMMENT '*',
proj_addr STRING COMMENT '*',
period BIGINT COMMENT '*',
property STRING COMMENT '*',
greening_rate STRING COMMENT '*',
property_costs STRING COMMENT '*'
)
COMMENT '*'
PARTITIONED BY (pt STRING);
配置目標如下:
- 分區值保留默認的${bdp.system.bizdate},與來源表的過濾條件取的datetime數據日期對應,表示該分區存放的數據為源表中datetime=${bdp.system.bizdate}的數據。
- 清理規則保留默認選項,寫入前清理已有數據(分區表時隻清理(若有)當前分區數據)。
字段映射
直接保留默認即可。源表和目標表字段名都一致會自動對應好,源表datetime字段無對應目標字段且不用同步因而無需任何處理。
通道控製
本案例中都保留默認即可,具體通道控製各項配置說明請看文檔——數據同步通道控製參數設置。
保存、提交
- 保存任務時可以創建專門的目錄存放,本案例我們接用目標表名稱作為任務名稱;
- 提交任務主要是將任務提交到調度係統,使得任務可以按照調度配置進行自動運行。本案例調度配置保留默認配置,調度周期為“天”調度。
步驟三 執行數據導入任務
在大數據開發套件中,切換到“運維中心-任務管理”找到任務house_basic_info,在任務視圖上右鍵-測試節點:
等待任務執行成功後,可以到“大數據開發套件——數據開發”的"腳本開發"中創建一個sql腳本文件,執行select語句查看表house_basic_info數據是否真的同步成功:
數據統計分析
數據導入到MaxCompute後,我們將通過MaxCompute SQL、MR等對數據進行加工處理。
創建目標表
前麵“需求分析”的目標分析(統計分析出每個城市二手房均價top 5的樓盤並且給出該樓盤所在城區,即[城市,樓盤、均價、排名、所在城區])得出表5個字段。
進入“大數據開發套件——數據開發”,工作區的工具欄上點擊“新建”選擇新建表,輸入建表語句並提交。
CREATE TABLE IF NOT EXISTS house_unit_price_top5 (
house_city STRING,
house_community_name STRING,
house_unit_price_all DOUBLE,
area STRING,
tops BIGINT
)
PARTITIONED BY (
pt STRING
);
創建任務進行數據統計分析
進入“大數據開發套件——數據開發”的"任務開發"中創建一個sql任務
編輯SQL代碼
--產出每個城市每個樓盤的均價臨時表
--分區值是對應數據導入任務配置的分區值,保證每天運行都是取當天導入的最新分區。
DROP TABLE IF EXISTS t_house_unit_price_info;
CREATE TABLE IF NOT EXISTS t_house_unit_price_info
AS
SELECT house_city
, house_community_name
, AVG(house_unit_price) AS house_unit_price_all
FROM house_basic_info
WHERE pt = '${bdp.system.bizdate}'
GROUP BY house_city,
house_community_name;
--拆分house_region字段隻取城區名稱輸出字段為area,並存儲到一個臨時表。
--分區值是對應數據導入任務配置的分區值,保證每天運行都是取當天導入的最新分區。
DROP TABLE IF EXISTS t_house_area;
CREATE TABLE IF NOT EXISTS t_house_area
AS
SELECT distinct house_city
,house_community_name
,split_part(house_region, ' ', 1) AS area
FROM house_basic_info
WHERE pt = '${bdp.system.bizdate}';
--產出最終目標表:每天每個城市二手房均價top 5的樓盤並且給出該樓盤所在城區。
--分區值是對應數據導入任務配置的分區值,保證每天運行產出的日期分區值與源表數據日期一致。
INSERT OVERWRITE TABLE house_unit_price_top5 PARTITION (pt='${bdp.system.bizdate}')
SELECT a.house_city
, a.house_community_name
, a.house_unit_price_all
, b.area
, a.tops
FROM (
SELECT house_city
, house_community_name
, house_unit_price_all
, ROW_NUMBER() OVER (PARTITION BY house_city ORDER BY house_unit_price_all DESC) AS tops
FROM t_house_unit_price_info
) a
JOIN t_house_area b
ON a.house_city = b.house_city
AND a.house_community_name = b.house_community_name
AND a.tops < 6;
注意
MaxCompoute SQL語法采用類似於常用SQL語法,可以看作是標準SQL的子集,但MaxCompute在很多方麵並不具備常用數據庫的特征,如事務、主鍵約束、索引等都不支持,因而SQL也有一定的差異。前麵介紹數據導入創建目標表時已經簡單的介紹了一些DDL語法的差異,針對這裏DML語句,我們也做簡單介紹:
"產出每個城市每個樓盤的均價臨時表"語句,整個語句隻需要修改where條件中pt條件,即可直接在mysql上執行。
“拆分house_region字段”的語句中“split_part()”函數是MaxCompute內置的字符串函數,可以直接在SQL中使用,對應MySQL上substring_index()或其他。
產出目標表語句中,ROW_NUMBER()是MaxCompute內置的窗口函數,在本案例中主要作用於計算排行,可在SQL中直接使用,MySQL上沒有可直接對應的函數。
產出目標表語句中,insert overwrite(或insert into) 後要加 table 關鍵字,MySQL或Oracle不需要table關鍵字。
MaxCompute SQL和常用SQl更多差異請看文檔——與主流SQL差異。
調度配置和參數配置
代碼編輯好後,可以點擊工具欄執行按鈕執行sql語句,對sql進行探查。確定無誤後進行調度配置主要包括調度屬性和依賴屬性:
- 調度屬性:由於每天調度一次,直接保留默認配置即可。
- 依賴屬性:由於本任務處理的數據來源是數據導入任務"house_basic_info"產出大數據,為了保證本任務執行時,數據導入已經完成,我們需要將導入任務設置為本任務的上遊任務(即父任務)。
至於“參數配置”由於本任務中隻用到係統參數${bdp.system.bizdate},這個參數在係統調度任務時會自動替換,所以無需再進行其他配置。(詳情請看係統參數說明)
保存提交
所有配置項都配置完成後點擊工具欄上的“保存”、“提交”按鈕,將任務提交到調度係統。點擊工作區右上角“前往運維”按鈕可以到運維中心查看工作流形態:
執行任務
與前麵數據導入任務執行操作類似。執行成功後可以在“數據開發”模塊sql腳本中查看目標表數據:
到目前為止,我們的目標表已經正常產出了,但是MaxCompute SQL在執行時會有一定的等待調度時間,適合做大數據批處理,網站前端讀取數據就不適合直接讀MaxCompute的數據,所以接下來我們需要把目標表回流到網站業務庫。
數據回流
與數據導入一樣,需要配置數據同步任務,不一樣的是回流任務來源是MaxCompute的表,目標庫是業務庫,如還是用本示例中的RDS-MySQL的house_web_master 數據庫中。
操作步驟:
- RDS-MySQL中創建好對應的表,若需要保留每天都數據,可以加一個字段保存日期信息;
- 在導數據開發套件的數據集成裏配置新數據源,參考前麵數據導入時配置數據源的方式;
-
創建並配置數據同步任務,假設命名為 house_unit_price_top5_2_mysql,主要用MaxCompute讀插件和RDS-MySQL寫插件,大致配置如下:
字段配置如果想直接把源表的分區字段同步到MySQL的日期信息字段
依賴屬性中,為了保證每次回流都是最新的數據,將數據加工任務house_unit_price_top5設置為父任務
保存提交任務後在運維管理可以看到工作流形態:
執行回流任務,參考前麵的任務執行方式。執行成功後,可以到RDS-MySQL上查看表數據是否正常導入。
總結
到此,我們整個需求目標都完成了,本案例在MaxCompute隻是實現一個非常簡單的統計分析,更多的高級功能組件(MapReduce、Graph等)沒有用到。通過本案例我們可以了解到:
大數據開發套件是架構在MaxCompute的web工具,提供界麵操作以及數據集成和任務調度功能,而MaxCompute提供計算和存儲服務。
MaxCompute SQL作業提交後會有幾十秒到數分鍾不等的排隊調度,所以適合處理跑批作業,一次作業批量處理海量數據,不適合直接對接需要每秒處理幾千至數萬筆事務的前台業務係統。
MaxCompute SQL采用的是類似於SQL的語法,可以看作是標準SQL的子集,但不能因此簡單的把MaxCompute 等價成一個數據庫,它在很多方麵並不具備數據庫的特征,如事務、主鍵約束、索引等(更多差異請點擊進入查看)。
大數據開發套件裏的數據同步可以實現跨region的RDS與MaxCompute的數據互傳,無需特殊處理。
最後更新:2017-06-09 10:01:56