日誌服務ETL(1):基於函數服務完成自定義計算
概述
ETL(Extract-Transform-Load)用來描述將數據從來源端經過抽取(Extract)、轉換(Transform)、加載(Load)至目的端的過程。
傳統ETL是構建數據倉庫的重要一環,用戶從數據源抽取出所需的數據,經過數據清洗,最終按照預先定義好的數據倉庫模型,將數據加載到數據倉庫中去。
在今天,隨著業務需求的日益增加,不同係統的相互大批量數據交互也已成為常態。數據在不同係統中流動起來,有助於充分發掘日誌大數據的價值。
在雲上,AWS Glue是一個功能完備的ETL產品,通過DevEndpoint、Crawler、MetaStore等模塊,分別在ETL函數編寫、數據識別加載、meta管理等方向做到了不錯的用戶體驗。如同大部分業界方案,Glue也是batch觸發的ETL模型,建議數據源是S3。但對於流式數據(如Kinesis Stream)則不能直接使用,提供的方案是:用戶先將數據導入到S3,再使用Glue分析S3上的數據。
日誌服務的LogHub是流式的數據中心,從日誌寫入到可消費延遲在1秒。日誌服務ETL麵向的正是這些流式寫入的數據,提供準實時(1分鍾級別)的ETL作業。
日誌服務ETL
兩個場景
- 一站式建模分析
通過日誌服務,快速完成日誌采集、加工、查詢、分析。
- 數據交換
為數據的目的端落地提供支撐,構建雲上大數據產品間的數據管道。
ETL模型
實時數據流處理,基於流的模型。ETL Trigger輪詢源logsotre下各shard的寫入位置,並定時生成三元組信息觸發函數執行,該三元組用於標識本次ETL任務所應該處理的數據範圍。
通過shard的並發做到水平擴展,shard彈性伸縮保證了ETL的動態伸縮,通過定時器觸發作業完成持續的數據加載。
在ETL任務執行層麵,考慮UDF的靈活性,加工邏輯會跑在函數服務的函數上,而函數服務提供了按需付費、彈性伸縮能力以及自定義代碼執行功能,正是很多雲上用戶所需要的。另一方麵,從用戶數據端到端延時、大數據吞吐、SQL易用性角度,日誌服務未來也考慮把ETL的runtime擴展到流計算引擎(例如阿裏雲流計算)上,去服務更多的用戶場景。
ETL日誌
- ETL過程日誌
這是一類是執行過程日誌,這一部分日誌是在ETL執行過程中每執行一步的記錄關鍵點和錯誤,包括某一步驟的開始、結束時間、初始化動作完成情況,模塊出錯信息等。記錄日誌的目的是隨時可以知道ETL運行情況,如果出錯了,可以知道哪裏出錯。
函數運行產生的日誌記錄了數據加工過程中關鍵點、異常:
- ETL調度日誌
調度日誌隻記錄ETL任務開始的時間、結束時間,任務是否成功以及成功返回的信息。如果ETL任務出錯了,不僅要形成ETL出錯日誌,而且要向係統管理員發送報警郵件或短信。
在調度日誌的基礎上,可以構建出報表統計ETL的總體運行狀況,會在下文實踐部分介紹。
“日誌服務+函數服務”ETL的優勢
- 一站式采集、存儲、加工、分析
- 全托管加工任務,按時間觸發,自動重試
- 資源按shard水平擴展,滿足大數據需求
- 基於函數服務提供數據加工,彈性資源,按需付費
- ETL對用戶透明,提供日誌、報警功能
- 持續增加內置函數模板,降低主流需求下的函數開發代價
日誌服務ETL實戰
對於數據分析工程師而言,ETL過程往往占據整個項目工作60%~70%的工作量。日誌服務的目標是使用內置的函數模板的前提下,將構建ETL的時間縮短到15分鍾內。
題目:ip歸屬查找
通過Nginx、apache等HTTP服務器構建的軟件,可以記錄每一個用戶訪問日誌。本次實踐的題目是:看看我們到底服務了哪些地區的用戶,這些用戶通過什麼鏈路訪問我們的服務。
第一步:日誌集中化存儲
我們使用日誌服務的Logtail客戶端快速接入機器上的日誌文件。本節請參考日誌服務實時采集數據,本文不作贅述。
客戶端采集nginx訪問日誌將會集中存儲到日誌服務的一個logstore中,如下圖,forawrd字段的ip記錄了用戶請求的來源:
第二步:雲端數據加工
1. 登錄函數服務控製台創建servcie
在高級配置中,建議為ETL function配置加工過程中的日誌記錄的存儲logstore,方便通過日誌來定位加工過程中的異常行為。為函數授予日誌服務AliyunLogFullAccess權限,函數在運行過程中會讀源logstore數據,數據處理後再寫到目標logstore。
2. 通過內置模板創建函數
默認的函數配置如下:
3. 在函數上新建日誌服務觸發器
日誌服務觸發器配置如下:
指定數據源為第一步中采集到中心化nginx日誌logstore,例如本例子的project:etl-test/logstore:nginx_access_log。
日誌服務將輪詢logstore的數據,當數據持續產生時,每60秒創建一次ETL任務,並調用函數執行。觸發函數執行以及函數執行結果將會記錄到觸發器日誌logstore:etl-trigger-log中。
函數配置因不同函數的實現和功能而已,ip-lookup的詳細配置項說明請參考README。
4. 保存配置,等待1分鍾後ETL任務開始執行
可以關注一下ETL過程日誌、調度日誌,按如上配置,分別在logstore:etl-function-log、etl-trigger-log。
可以通過查詢語句構建出如本文日誌部分所示的報表:
左上圖是每分鍾調度函數執行的觸發次數,構建自查詢語句:
project_name : etl-test and job_name : ceff019ca3d077f85acaad35bb6b9bba65da6717 | select from_unixtime(__time__ - __time__ % 60) as t, count(1) as invoke_count group by from_unixtime(__time__ - __time__ % 60) order by t asc limit 1000
右上圖是ETL任務成功、失敗的比例,構建自查詢語句:
project_name : etl-test and job_name : ceff019ca3d077f85acaad35bb6b9bba65da6717 | select task_status, count(1) group by task_status
左下圖是每5分鍾的攝入的日誌字節數,構建自查詢語句:
project_name : etl-test and job_name : ceff019ca3d077f85acaad35bb6b9bba65da6717 and task_status : Success | select from_unixtime(__time__ - __time__ % 300) as t, sum(ingest_bytes) as ingest_bytes group by from_unixtime(__time__ - __time__ % 300) order by t asc limit 1000
右下圖則是每5分鍾攝入處理的日誌行數,構建自查詢語句:
project_name : etl-test and job_name : ceff019ca3d077f85acaad35bb6b9bba65da6717 and task_status : Success | select from_unixtime(__time__ - __time__ % 300) as t, sum(ingest_lines) as ingest_lines group by from_unixtime(__time__ - __time__ % 300) order by t asc limit 1000
第三步:加工後數據建模
機器上的nginx日誌經由Logtail實時采集到源logstore,再由ETL準實時加工後寫出到目標logstore。經函數處理後帶ip信息數據如下:
對比加工前後,我們發現,新的數據增加了四個字段(country、省province、city、isp),可以知道:ip源117.136.90.160的請求來自中國山西太原,運營商是中國移動。
接下來,使用日誌服務的日誌分析功能查詢一個時間段內請求ip的城市和isp分布。通過如下兩個查詢語句構建報表:
* | select city, count(1) as c group by city order by c desc limit 15
* | select isp, count(1) as c group by isp order by c desc limit 15
至此,本節的實踐內容結束。歡迎大家試用自定義ETL。
最後更新:2017-10-18 10:03:20