閱讀755 返回首頁    go 阿裏雲 go 技術社區[雲棲]


MaxCompute基礎與MaxCompute SQL優化

總論:

大數據計算服務 ( MaxCompute,原名 ODPS ) 是一種快速、完全托管的 TB/PB 級數據倉庫解決方案 。MaxCompute 向用戶提供了完善的數據導入方案以及多種經典的分布式計算模型,能夠更快速的解決用戶海量數據計算問題,有效降低企業成本,並保障數據安全 。同時,大數據開發套件和 MaxCompute關係緊密,大數據開發套件為 MaxCompute 提供了一站式的數據同步,任務開發,數據工作流開發,數據管理和數據運維等功能,您可以參見?大數據開發套件簡介?來對其進行深入了解 。

MaxCompute 主要服務於批量結構化數據的存儲和計算,可以提供海量數據倉庫的解決方案以及針對大數據的分析建模服務 。隨著社會數據收集手段的不斷豐富及完善,越來越多的行業數據被積累下來 。數據規模已經增長到了傳統軟件行業無法承載的海量數據(百 GB、TB、乃至 PB)級別 。

在分析海量數據場景下,由於單台服務器的處理能力限製,數據分析者通常采用分布式計算模式 。但分布式的計算模型對數據分析人員提出了較高的要求,且不易維護 。使用分布式模型,數據分析人員不僅需要了解業務需求,同時還需要熟悉底層計算模型 。MaxCompute 的目的是為用戶提供一種便捷的分析處理海量數據的手段 。用戶可以不必關心分布式計算細節,從而達到分析大數據的目的 。

? ?首先MaxCompute不同於普通的mysql,oracle這樣的關係型數據庫,它其實是一個綜合性的數據服務平台,它並不能在毫秒級甚至秒級返回查詢結果,一條odps命令的執行通常需要經過如下流程:

  1. 提交一個SQL語句,發送 RESTful 請求給HTTP服務器
  2. HTTP 服務器做用戶認證。認證通過後,請求就會以 Kuafu通信協議方式發送給 Worker。
  3. Worker判斷該請求作業是否需要啟動Fuxi Job。如果不需要,本地執行並返回結果。如果需要,則生成一個 instance, 發送給 Scheduler。
  4. Scheduler把instance信息注冊到 OTS,將其狀態置成 Running。Scheduler 把 instance 添加到 instance 隊列。
  5. Worker把 Instance ID返回給客戶端。

  1. Scheduler會把instance拆成多個Task,並生成任務流DAG圖。
  2. 把可運行的Task 放入到優先級隊列TaskPool中。
  3. Scheduler 有一個後台線程定時對TaskPool 中的任務進行排序。Scheduler 有一個後台線程定時查詢計算集群的資源狀況。Executor在資源未滿的情況下,輪詢TaskPool,請求Task。Scheduler判斷計算資源。若集群有資源,就將該Task發給Executor。
  4. Executor調用SQL Parse Planner,生成SQL Plan。Executor 將 SQL Plan 轉換成計算層的 FuXi Job 描述文件。Executor 將該描述文件提交給計算層運行,並查詢 Task 執行狀態。Task 執行完成後,Executor更新 OTS 中的 Task信息,並匯報給Scheudler。
  5. Schduler 判斷 instance 結束,更新 OTS 中 instance 信息,置為 Terminated。

客戶端接收到返回的 Instance ID 後,可以通過 Instance ID 來查詢作業狀態:

  1. 客戶端會發送另一個 REST 的請求,查詢作業狀態。
  2. HTTP 服務器根據配置信息做用戶認證。用戶認證通過後,把查詢的請求發送給 Worker。
  3. Worker 根據 InstanceID 去 OTS 中查詢該作業的執行狀態。Worker 將查詢到的執行狀態返回給客戶端。
? ? 其實MaxCompute是一個透明的數據服務平台,用戶不需要了解分布式數據處理的細節,就可以在client上比較方便的處理PB級別的數據了。所以,在了解了以上內容之後,對於MaxCompute隻能在分鍾級別返回結果就有一個比較清楚的理解了。
ps:以上這些內容在大數據開發套件中都是透明的。


? ??
? ??MaxCompute SQL與普通關係型數據庫的SQL大體類似,不同在於MaxCompute不支持如事務、主鍵約束、索引等,可以看成標準SQL的子集。

? ??MaxCompute?操作以表為基礎,ddl中涉及到對表的一係列操作,包括create,drop,alter
? ? 我們以大數據開發套件上的一張表為例:
CREATE TABLE IF NOT EXISTS xxxx
(
    aa     STRING COMMENT 'xxxx',
    bb     STRING COMMENT 'xxxx',
    cc     STRING COMMENT 'xxxx',
    dd     STRING COMMENT 'xxxx',
    ee     STRING COMMENT 'xxxx',
    ff     STRING COMMENT 'xxx',
    gg     BIGINT COMMENT 'xxx'
)
COMMENT 'xxxx'
PARTITIONED BY (dt     STRING COMMENT '')
LIFECYCLE 10;

在工作流中我們希望任務能夠順利的執行,所以不管是DDL和DML中我們都盡量希望語句返回成功(if not exist,overwrite)
comment包括對應字段的注釋和對應表的注釋,這些都可以alter
與傳統的SQL不同,MaxCompute麵向全域數據,所以即使是用create xxxx select ?xxxx from xxx的方式也需要as加上列的名稱。
分區字段注明,由於MaxCompute操作的數據量很大,通常來說分區字段需要特別關注
生命周期:非常方便的屬性,便於用戶釋放存儲空間,簡化回收數據的流程,不需要傳統的繁雜的空間維護。靈活運用LastDataModifiedTime與touch(修改為當前時間),關注分區表和非分區表的區別。
對於大表結構的複製,odps提供非常靈活的create like語句。
drop一張表,將會把表及表中的數據丟入回收站中,加上purge關鍵字,會被直接刪除,不可恢複。
alter幾乎可以對表的所有屬性進行更改,包括列,注釋,分區,分區屬性,生命周期等等。
Archive可以用來減少大表空間占用,壓縮空間。

?


? ? 常見的比如insert,select,join
   insert overwrite|into table tablename
[partition (partcol1=val1, partcol2=val2 ...)] select_statement from from_statement;

? ? ?靜態分區,分區字段常量;動態分區,可以不指定值,適用select字句中的分區列值
? ? ?multi-insert 單次讀入,多次寫入,減少數據讀取。?

? ? ?

   select [all | distinct] select_expr, select_expr, ...
        from table_reference
        [where where_condition]
        [group by col_list]
        [order by order_condition]
        [distribute by distribute_condition [sort by sort_condition] ]
        [limit number]

? ? ?與傳統的SQL不同的是,distinct作用所有select字段

? ? ?編譯過程group > select > order/sort/distribute,理解了編譯順序也就理解了各個字句間別名的使用規範。
? ? ?distribute by:對數據按照某幾列的值做hash分片。
? ? ?sort by:局部排序,語句前必須加distribute by。實際上sort by是對distribute by的結果進行局部排序。
? ? ?從功能的理解可知:order by不和distribute by/sort by共用,同時group by也不和distribute by/sort by共用。
? ? ?
? ? ?join和傳統sql的表現較為一致,odps支持left outer join,right outer join,full outer join,inner join。
? ? ?mapjoin hint:當大表和小表join的情況下利用mapjoin將用戶指定的小表全部加載到內存中,從而加快join的執行速度,同時支持非等值連接,full join 不可用,連接的主表需為大表


? ? 主要包括數學與統計函數,字符串操作函數,時間函數,窗口函數,聚合函數,轉置函數等
? ? 就不一一列舉了,功能強大。
? ??


? ?包括udf,udtf,udaf
? ?udf:用戶自定義標量函數
? ?udtf:用戶自定義表值函數(返回多個字段)
? ?udaf:用戶自定義聚合函數
? ?UDF:
  package org.alidata.odps.udf.examples;
     import com.aliyun.odps.udf.UDF;
     public final class Lower extends UDF {
       public String evaluate(String s) {
         if (s == null) { return null; }
         return s.toLowerCase();
       }
     }

?繼承UDF類,實現evaluate方法即可。evaluate方法可以有多個,滿足多態特性。

?UDAF:
繼承com.aliyun.odps.udf.Aggregator,主要實現iterate,merge和terminate三個接口,UDAF的主要邏輯依賴於這三個接口的實現。此外,還需要用戶實現自定義的Writable buffer,因為UDAF的主要邏輯是將數據進行分片後遍曆,處理完之後進行merge。

UDTF:
繼承com.aliyun.odps.udf.UDTF類,主要實現process和forward兩個接口,SQL中每一條記錄都會對應調用一次process,process的參數為UDTF的輸入參數。輸入參數以Object[]的形式傳入,輸出結果通過調用forward函數輸出。

UDF統一添加方法:
add jar xxx
create function xxx as packagename.classname using 'jarname'
? ??


與傳統sql類型,區別在於變量引用時前麵加$
DECLARE
 var_name var_type;
BEGIN
 可執行語句
END;

? ??其他命令:


? ? ?explain,show instance,merge smallfile,添加/移除/顯示統計信息(add/remove/show statistc 統計值或者符合某個表達式的值)

  • 選擇滿足需求的小表,比如匯總表。維表盡量選擇全量表,事實表盡量選擇增量表;
  • 選擇產出早的表;
  • 選擇可回滾的表,比如使用加購事件表代替加購全流程表;
  • 依賴的N個上遊表,盡量保證上遊產出時間要均勻,如果有差異,考慮換依賴表;

  • 行數小於100萬的表認為是小表,這個時候使用mapjoin性能會提高很多;
  • 讀取數據的時候要加上分區等過濾條件,大表變小表。常用過濾條件字段,做成動態分區,方便下遊過濾;
  • 不得不讀取N天大表的時候,使用unionall方式合並多天數據;

  • Join關聯要盡可能是主鍵關聯。關聯字段類型要一致;
  • 多天匯總,先生成1天輕度匯總表,多天使用1天數據再匯總;
  • multiinsert,實現一次讀取多次寫入;
  • 使用係統UDF代替自己的寫的UDF;

  • 依賴max_pt的,要排除當天依賴;
  • 上遊是小時任務,使用max_pt要慎重;
  • 執行超過1個小時任務要關注;
大數據開發套件:

大數據開發套件提供了直觀的數據操作入口,數據研發過程代碼的編寫,調試,優化,發布都可以在大數據開發套件中進行。
拿一個任務耗時過長作例子,看看在大數據開發套件上我們是怎麼處理碰到的問題的。

一個task執行時間過長,除掉本身代碼的性能問題,那麼有兩種比較大的可能:
一種是等待問題,一種是數據傾斜問題
等待問題可能是由於係統資源不足,係統繁忙,優先級不夠,數據量太大,碰到了壞盤等原因導致的
我們可以通過調整優先級,重跑,過濾初始數據等方法來處理。

傾斜問題則一般是數據本身的問題,常見的數據傾斜是怎麼造成的?

Shuffle的時候,將各個節點上相同的key拉取到某個節點的一個task進行處理,比如按照key進行聚合或join等操作,如果某個key對應的數據量特別大的話,就會發生數據傾斜現象。數據傾斜就成為了整個task運行時間的短板。

觸發shuffle的常見算子:distinct、groupBy、join等。

要解決數據傾斜的問題,首先要定位數據傾斜發生在什麼地方,首先是哪個stage,直接在D2?UI上看就可以,查看數據是否傾斜了
logview--odps task--detail--stage--longtail

根據stage日誌,判斷出數據傾斜發生在哪個算子上。

根據傾斜發生的階段,我們又可以把它們分為map傾斜,reduce傾斜,join傾斜

通常來說,對於傾斜現象,我們首先查看導致數據傾斜的key的數據分布情況,接下來大概有幾種處理方案:

1:過濾數據
過濾掉某些髒數據,比如說是否可以去掉null,去掉某些條件對應的值
2:加大並行度
給任務添加處理資源,加大instance的數量,暴力
3:對數據進行拆分,分而治之
如果大表join小表,我們可以用mapjoin,將小表cache進內存
二次分發,加上隨機前綴(數據膨脹),拆分數據集為熱點+非熱點再進一步處理
大表join超大表,還可以考慮bloomfilter
4:組合使用
上述方法,組合使用
5:修改業務
實在沒有進步空間,從業務上過濾數據

最後更新:2017-05-09 20:10:08

  上一篇:go 5月9日雲棲精選夜讀:不斷變化下的阿裏雲:2017阿裏雲產品和解決方案全向圖(5月版)
  下一篇:go 程序員與程序員之間碰撞的火花