閱讀624 返回首頁    go 阿裏雲 go 技術社區[雲棲]


MaxCompute與OSS非結構化數據讀寫互通(及圖像處理實例)

0. 前言

MaxCompute作為阿裏巴巴集團內部絕大多數大數據處理需求的核心計算組件,擁有強大的計算能力,隨著集團內外大數據業務的不斷擴展,新的數據使用場景也在不斷產生。在這樣的背景下,MaxCompute(ODPS)計算框架持續演化,而原來主要麵對內部特殊格式數據的強大計算能力,也正在一步步的通過新增的非結構化數據處理框架,開放給不同的外部數據。 我們相信阿裏巴巴集團的這種需求,也代表著業界大數據領域的最前沿實踐和走向,具有相當的普適性。在之前我們已經對MaxCompute 2.0新增的非結構化框架做過整體介紹,描述了在MaxCompute上如何處理存儲在OSS上麵的非結構化數據,側重點在怎樣從OSS**讀取**各種非結構化數據並在MaxCompute上進行**計算**。 而一個完整數據鏈路,讀取和計算處理之後,必然也會涉及到非結構化數據的 寫出。 在這裏我們著重介紹一下從MaxCompute往OSS**輸出**非結構化數據,並提供一個具體的在MaxCompute上進行圖像處理的實例, 來展示從【OSS->MaxCompute->OSS】的整個數據鏈路閉環的實現。 至於對於KV NoSQL類型數據的輸出,在對TableStore數據處理介紹 中已經有所介紹,這裏就不再重複。

1. 使用前提和假設

1.1 MaxCompute 2.0 功能申請與打開

首先要說明的是MaxCompute新一代的2.0計算框架還在灰度上線的過程中,默認設置下許多功能沒有打開,所以要使用新引進的非結構化數據處理框架,**需要申請MaxCompute 2.0試用**,具體開通使用方法請參見 如何申請試用MaxCompute 2.0, 簡單來說就是在開通2.0非結構化功能的前提下,在每個SQL query執行時必須帶上如下setting:

set odps.task.major.version=2dot0_demo_flighting;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.type.system.odps2=true;

下麵的範例中就不再重複了,**但是本文介紹的所有功能均基於以上假設**,當然這些特殊設置在近期MaxCompute 2.0計算框架完全上線後就可以省略了。

在上麵這些settgins中,一個需要特別說明的是 set odps.sql.type.system.odps2=true, 這個設置其實和非結構化功能本身沒有直接關係,但是在後麵的圖像處理例子中,我們用到了MaxCompute 2.0新引進的一個BINARY類型,用於表示/存儲二進製bytes數據。 所以這裏指定使用了2.0的類型係統,這個新的類型係統大大擴展了原來MaxCompute(ODPS) 1.0的類型,比如BINARY類型就為存儲非文本的數據提供了一個自然的容器。 這裏作為BINARY類型的使用範例,就不對整個新類型係統做展開介紹了。

1.2 網絡連通性與訪問權限

另外因為MaxCompute與OSS是兩個分開的雲計算,與雲存儲服務,所以在不同的部署集群上的網絡連通性有可能影響MaxCompute訪問OSS的數據的可達性。 關於OSS的節點,實例,服務地址等概念,可以參見OSS相關介紹。 在MaxCompute公共雲服務訪問OSS存儲,推薦使用OSS**私網**地址(即以-internal.aliyuncs.com結尾的host地址)。

此外需要指出的是,MaxCompute計算服務要訪問TableStore數據需要有一個安全的授權通道。 在這個問題上,MaxCompute結合了阿裏雲的訪問控製服務(RAM)和令牌服務(STS)來實現對數據的安全反問:

首先需要在RAM中授權MaxCompute訪問OSS的權限。登錄RAM控製台,創建角色AliyunODPSDefaultRole,並將策略內容設置為:

{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "odps.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}

然後編輯該角色的授權策略,將權限AliyunODPSRolePolicy授權給該角色。

如果覺得這些步驟太麻煩,還可以登錄阿裏雲賬號點擊此處完成一鍵授權

2. MaxCompute內置的OSS數據輸出handler

2.1 創建External Table

MaxCompute非結構化數據框架希望從根本上提供MaxCompute與各種數據的聯通,這裏的“各種數據”是兩個維度上的:

  1. 各種存儲介質,比如OSS
  2. 各種數據格式, 比如文本文件,視頻,圖像,音頻,基因,氣象等格式的數據

而數據的這兩個維度的特征,都是通過EXTERNAL TABLE的概念來引入MaxCompute的計算體係的。 與讀取OSS數據的使用方法類似,對OSS數據進行寫操作,在如上**打開安全授權通道後**,也是先通過CREATE EXTERNAL TABLE語句創建出一個外部表,再通過標準MaxCompute SQL的INSERT INTO/OVERWRITE等語句來實現的,這裏先用MaxCompute內置的TsvStorageHandler為例來說明一下用法:

DROP TABLE IF EXISTS tpch_lineitem_tsv_external;

CREATE EXTERNAL TABLE IF NOT EXISTS tpch_lineitem_tsv_external
(
orderkey BIGINT,
suppkey BIGINT,
discount DOUBLE,
tax DOUBLE,
shipdate STRING,
linestatus STRING,
shipmode STRING,
comment STRING
)
STORED BY 'com.aliyun.odps.TsvStorageHandler'   ----------------------------------------- (1)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/tsv_output_folder/';  --(2)

這個DDL語句建立了一個外部表tpch_lineitem_tsv_external,並將前麵提到的**兩個維度**的外部數據信息關聯到這個外部表上。

  1. 數據存儲介質: LOCATION 將一個OSS上的地址關聯到外部表上,對這個外部表的進行讀寫操作都會反映到這個OSS地址上。
  2. 數據存儲格式: StorageHandler用來表明對這些數據的讀寫操作方式,這裏使用了MaxCompute內置的 com.aliyun.odps.TsvStorageHandler, 用戶可以使用這個由係統自帶的實現來讀取和寫出TSV文件。 同時用戶也可以通過MaxCompute的SDK來自定義StorageHandler, 這個將在後麵的章節介紹。

其中OSS數據存儲的具體地址的URI格式為:

LOCATION 'oss://${endpoint}/${bucket}/${userPath}/'

最後還要提到的是,在上麵的DDL語句中定義了外部表的Schema, 對於數據輸出而言,這表示輸出的數據格式將由這個Schema描述。 就TSV格式而言,這個schema描述比較直觀容易理解; 而在用戶自定義的輸出數據格式上,這個schema與輸出數據的聯係則更鬆散一些,有著更大的自由度。 在後麵介紹通過自定義StorageHandler/Outputer的時候會詳細展開。

2.2 通過對External Table的 INSERT 操作實現TSV文本文件的寫出

在將OSS數據通過External Table關聯上後,對OSS文件的寫出可以對External Table做標準的SQL INSERT OVERWRITE/INSERT INTO來操作。 具體輸出數據的**來源**可以有兩種

  1. 數據源為MaxCompute的內部表: 也就是說可以通過對外表INSERT操作來實現MaxCompute**內部表數據到外部存儲介質的寫出**。

  2. 數據源為之前通過External Table引入MaxCompute計算體係的外部數據: 這可以用來將外部數據引入MaxCompute進行計算,然後再存儲到(不同的)外部存儲地址,或者甚至是不同的外部存儲介質(比如將TableStore數據經由MaxCompute導出到OSS)。

2.2.1 從MaxCompute內部表輸出數據到OSS

這裏先來看第一種場景:假設我們已經有一個名為tpch_lineitem的MaxCompute**內部表**,其schema可以通過

DESCRIBE tpch_lineitem; 

得到:

+------------------------------------------------------------------------------------+
| InternalTable: YES      | Size: 241483831680                                       |
+------------------------------------------------------------------------------------+
| Native Columns:                                                                    |
+------------------------------------------------------------------------------------+
| Field           | Type       | Label | Comment                                     |
+------------------------------------------------------------------------------------+
| l_orderkey      | bigint     |       |                                             |
| l_partkey       | bigint     |       |                                             |
| l_suppkey       | bigint     |       |                                             |
| l_linenumber    | bigint     |       |                                             |
| l_quantity      | double     |       |                                             |
| l_extendedprice | double     |       |                                             |
| l_discount      | double     |       |                                             |
| l_tax           | double     |       |                                             |
| l_returnflag    | string     |       |                                             |
| l_linestatus    | string     |       |                                             |
| l_shipdate      | string     |       |                                             |
| l_commitdate    | string     |       |                                             |
| l_receiptdate   | string     |       |                                             |
| l_shipinstruct  | string     |       |                                             |
| l_shipmode      | string     |       |                                             |
| l_comment       | string     |       |                                             |
+------------------------------------------------------------------------------------+

其中有**16**個columns。 現在我們希望將其中的一部分數據以TSV格式導出到OSS上麵。 那麼在用上述DDL創建出External Table之後,使用如下INSERT OVERWRITE操作就可以實現:

INSERT OVERWRITE TABLE tpch_lineitem_tsv_external
SELECT l_orderkey, l_suppkey, l_discount, l_tax, l_shipdate, l_linestatus, l_shipmode, l_comment
    FROM tpch_lineitem
    WHERE l_discount = 0.07 and l_tax = 0.01;

這裏將從內部的tpch_lineitem表中,在符合l_discount = 0.07 並 l_tax = 0.01的行中選出8個列(對應tpch_lineitem_tsv_external這個外部表的schema)按照TSV的格式寫到OSS上。 在上麵這個INSERT OVERWRITE操作成功完成後,就可以看到OSS上的對應LOCATION產生了一係列文件:

osscmd ls oss://oss-odps-test/tsv_output_folder/

2017-01-14 06:48:27 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta
2017-01-14 06:48:12 4.80MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_0_0-0.tsv
2017-01-14 06:48:05 4.78MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_1_0-0.tsv
2017-01-14 06:47:48 4.79MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_2_0-0.tsv
...

這裏可以看到,通過上麵LOCATION指定的oss-odps-test這個OSS bucket下的tsv_output_folder文件夾下產生了一個.odps文件夾,這其中將有一些.tsv文件,以及一個.meta文件。 這樣子的文件結構是MaxCompute(ODPS)往OSS上輸出所特有的:

  1. 通過MaxCompute對一個OSS地址,使用INSERT INTO/OVERWRITE 外部表來做寫出操作,所有的數據將在指定的LOCATION下的.odps文件夾產生;
  2. 其中.odps文件夾中的.meta文件為MaxCompute額外寫出的宏數據文件,其中用於記錄當前文件夾中有效的數據。 正常情況下,如果**INSERT操作成功完成的話**,可以認為當前文件夾的所有數據均是有效數據。 隻有在有作業失敗的情況下需要對這個宏數據進行解析。 即使是在作業中途失敗或被kill的情況下,對於INSERT OVERWRITE操作,再跑一次成功即可。 如果對於高級用戶,一定需要解析.meta文件的話,可以聯係MaxCompute技術團隊。

這裏迅速看一下這些tsv文件的內容:

osscmd cat oss://oss-odps-test/tsv_output_folder/.odps/20170113232648738gam6csz7/M1_0_0-0.tsv  

4236000067      9992377 0.07    0.01    1992-11-06      F       RAIL     across the ideas nag
4236000290      3272628 0.07    0.01    1998-04-28      O       RAIL    uriously. furiously unusual dinos int
4236000386      8081402 0.07    0.01    1994-02-19      F       RAIL    its. express, iron
4236000710      3879271 0.07    0.01    1995-03-10      F       AIR     es are carefully fluffily spe
...

可以看到確實在OSS上產生了對應的TSV數據文件。

最後,大家可能也注意到了,這個INSERT OVERWRITE操作產生了多個TSV文件,對於MaxCompute內置的TSV/CSV處理來說,產生的文件數目與對應SQL stage的並發度是相同的,在上麵這個例子中,INSER OVERWITE ... SELECT ... FROM ...; 的操作在源數據表(tpch_lineitem) 上分配了1000個mapper,所以最後產生了1000個TSV文件的。 如果需要控製TSV文件的數目,可以配合MaxCompute的各種靈活語義和配置來實現。 比如如果需要強製產生一個TSV文件,那在這個特定例子中,可以在INSER OVERWITE ... SELECT ... FROM ...最後加上一個DISTRIBUTE BY l_discount, 就可以在最後插入僅有一個Reducer的Reduce stage, 也就會隻輸出一個TSV文件了:

osscmd ls oss://oss-odps-test/tsv_output_folder/

2017-01-14 08:03:41 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta
2017-01-14 08:03:35 4.20GB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113234037735gcm6csz7/R2_1_33_0-0.tsv

可以看到在增加了DISTRIBUTE BY l_discount後,現在同樣的數據隻了一個輸出TSV文件,當然這個文件的size就大多了。 這方麵的調控技巧還有很多,都是可以依賴SQL語言的靈活性,數據本身的特性,以及MaxCompute計算相關設置來實現的,這裏就不深入展開了。

2.2.2 以MaxCompute為計算介質,實現不同存儲介質之間的數據轉移

External Table作為一個MaxCompute與外部存儲介質的一個切入點,之前已經介紹過對OSS數據的讀取以及TableStore數據的操作,結合對外部數據讀取和寫出的功能,就可以實現通過External Table實現各種各樣的數據計算/存儲鏈路,比如:

  1. 讀取External Table A關聯的**OSS**數據,在MaxCompute上做複雜計算處理,並輸出到External Table B關聯的**OSS**地址

  2. 讀取External Table A關聯的**TableStore**數據,在MaxCompute上做複雜計算處理,並輸出到External Table B關聯的**OSS**地址

而這些操作與上麵數據源為MaxCompute內部表的場景, 唯一的區別隻是SELECT的來源變成一個External table,而不是MaxCompute內置表。

3. 通過自定義StorageHandler來實現數據輸出

除了使用內置的StorageHandler來實現在OSS上輸出TSV/CSV等常見文本格式,MaxCompute非結構化框架提供了通用的SDK,允許用戶對外輸出自定義數據格式文件,包括圖像,音頻,視頻等等。 這種對於用戶自定義的完全非結構化數據格式支持,也是MaxCompute從結構化/文本類數據的一個向外擴展,在這裏我們會以一個圖像處理的例子,來走通整個【OSS->MaxCompute->OSS】數據鏈路,尤其著重介紹對OSS輸出文件的功能。

為了方便大家理解,這裏先提供一個在**使用用戶自定義代碼的場景**下,數據在MaxCompute計算平台上的流程:

Fig.1.數據計算鏈路

從上圖可以看出,從數據的流動和處理邏輯上理解,用戶可以簡單地把非結構化處理框架理解成在MaxCompute計算平台兩端有機耦合的的數據導入(Ingres)以及導出(Egress):

  1. 外部的(OSS)數據經過非結構化框架轉換,會使用Java用戶容易理解的InputStream類提供給自定義代碼接口。 用戶自實現Extract邏輯隻需要負責對輸入的InputStream做讀取/解析/轉化/計算,最終返回MaxCompute計算平台通用的Record格式;
  2. 這些Record可以自由的參與MaxCompute的SQL邏輯運算,這一部分計算是基於MaxCompute內置的強大結構化SQL運算引擎,並可能產生新的Record
  3. 運算過後的Record中再傳遞給用戶自定義的Output邏輯,用戶在這裏可以進行進一步的計算轉換,並最終將Record裏麵需要輸出的信息通過係統提供的OutputStream輸出,由係統負責寫到OSS。

值得指出的是,這裏麵所有的步驟都是可以由用戶根據需要來進行**自由的選擇與拚接**的。 比如如果用戶的輸入就是MaxCompute的內部表,那步驟1.就沒有必要了,事實上在前麵的章節2中的例子,我們就實現了將內部表直接寫成OSS上的TSV文件的流程。 同理, 如果用戶沒有輸出的需求,步驟3. 就沒有必要,比如我們之前介紹的OSS數據的讀取。 最後,步驟2.也是可以省略的,比如如果用戶的所有計算邏輯都是在自定義的Extract/Output中完成,沒有進行SQL邏輯運算的需要,那步驟1.是可以直接連接到步驟3.的。

理解了上麵這個數據變換的流程,我們就可以來通過一個圖像處理例子來看看怎麼具體的通過非結構化框架在MaxCompute SQL上完整的實現非結構化數據的讀取,計算以及輸出了:

3.1 範例:OSS圖像文件 -> MaxCompute計算處理 -> OSS圖像輸出

這裏我們先提供實現這整個【OSS->MaxCompute->OSS】數據鏈路需要用到的MaxCompute SQL query,並做簡單的注解,詳細的用戶代碼實現邏輯將在後麵的3.2子章節中介紹SDK接口的時候做展開解釋。

3.1.1 關聯OSS上的原始輸入圖像到External Table: images_input_external

DROP TABLE IF EXISTS images_input_external;
CREATE EXTERNAL TABLE IF NOT EXISTS images_input_external
(
name STRING,
width BIGINT,
height BIGINT,
image BINARY
)
STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler'   --- (1)
WITH SERDEPROPERTIES ('inputImageFormat'='BMP' , 'transformedImageFormat' = 'JPG')  --- (2)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SampleData/test_images/mixed_bmp/'  --- (3)
USING 'odps-udf-example.jar';   --- (4)

說明:

  1. 用戶指明使用的用戶代碼wrapper class名字是com.aliyun.odps.udf.example.image.ImageStorageHandler,這個class及其依賴的三方庫用戶通過jar提供,具體jar名字會通過下麵的USING語句(見第4點)指定。
  2. 通過SERDEPROPERTIES來實現參數傳遞,格式為'key'='value', 具體用法可以參見基本功能介紹 以及下麵的用戶代碼說明
  3. 指定輸入圖像地址,這個地址上存放了一係列**不同分辨率的bmp圖像文件**。
  4. 指定包含用戶JAR包,內含自定義的StorageHandler/Extractor/Outputer,以及需要的三方庫(這裏用到了Java ImageIO庫,具體見下麵用戶代碼範例)。JAR包通過ADD JAR命令上傳,可以參見基本功能介紹

另外要說明的是這裏指定的External Table的schema就是用戶在進行Extract操作後構造的Record格式,具體怎麼構造這個Schema用戶可以根據需要自己根據能從輸入數據中抽取到的信息定義。 在這裏我們定義了對於輸入圖片數據,會將圖片名稱,圖片的長和寬,以及圖片的二進製bytes抽取出來放進Record(見後麵的Extractor代碼說明),所以就有了上麵的【STRING,BIGINT,BIGINT,BINARY】的schema。

3.1.2 關聯OSS輸出地址到External Table: images_output_external

CREATE EXTERNAL TABLE IF NOT EXISTS images_output_external
(
image_name STRING,
image_width BIGINT,
image_height BIGINT,
outimage BINARY
)
STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler'
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/output/images_output/' ---(1)
USING 'odps-udf-example.jar';

說明: 可以看到這裏創建關聯**輸出圖像文件**的External Table,使用的DDL語句,與前麵關聯**輸入圖像**時使用的DDL語句是非常類似的:隻是LOCATION不一樣,表明圖像數據處理後將輸出到另外一個地址。 另外還有一點就是這裏我們沒有使用SERDEPROPERTIES來進行傳參,這個隻是在這個場景上沒有需求,在有需求的時候可以用同樣的方法把參數傳遞給outputer。 當然這裏兩個DDL語句如此相似,有一個原因是因為我們這個例子中用戶代碼中對於Extract出的Record以及輸入給Outputer的Record使用了一樣的schema, 同時這一對Extractor和Outputer都被封裝在了同一個ImageStorageHandler裏放在同一個JAR包裏。 在實際應用中,這些都是可以根據實際需求自己調整的,由用戶自己選擇組合和打包方式

3.1.3 從OSS讀取原始圖片數據到MaxCompute, 計算處理,並輸出圖像到OSS

在上麵的3.1.1以及3.1.2子章節中的兩個DDL語句,分別實現了把輸入OSS數據,以及計劃輸出OSS數據,分別綁定到兩個LOCATION以及指定對應的用戶處理代碼,參數等設置。 然而這兩個DDL語句對係統而言,隻是進行了一些宏數據的記錄操作,並**不會涉及具體的數據計算操作**。 在這兩個DDL語句運行成功後,運行如下SQL語句才會引發真正的運算。 換句話說,在Fig.1中描述的整個【OSS->MaxCompute->OSS】數據讀取/計算/輸出鏈路,實際上都是通過下麵一個簡單的SQL 語句完成的:

INSERT OVERWRITE TABLE images_output_external
SELECT * FROM images_input_external
WHERE width = 1024;

這看起來就是一個標準的MaxCompute SQL語句,隻不過因為涉及了images_output_externalimages_input_external這兩個外部表,所以真正進行的物理操作與傳統的SQL操作會有一些區別:在這個過程中,涉及了讀寫OSS,以及通過ImageStorageHandler這個wrapper,調用自定義的Extractor,Outputer代碼來對數據進行操作。 下麵就來具體看看在這個例子中的用戶自定義代碼實現了怎樣的功能,以及具體是如何實現的。

3.2 ImageStorageHandler實現

如同之前介紹過的,MaxCompute非結構化框架通過StorageHandler這個接口來描述對各種數據存儲格式的處理。 具體來說,StorageHandler作為一個wrapper class, 讓用戶指定自己定義的Exatractor(用於數據的讀入,解析,處理等) 以及Outputer(用於數據的處理和輸出等)。 用戶自定義的StorageHandler 應該繼承 OdpsStorageHandler,實現getExtractorClass以及getOutputerClass 兩個接口。

通常作為wrapper class, StorageHandler的實現都很簡單,比如這裏的ImageStorageHandler 就隻是通過這兩個接口指定了我們將使用ImageExtractor以及ImageOutputer:

package com.aliyun.odps.udf.example.image;

public class ImageStorageHandler extends OdpsStorageHandler {
  @Override
  public Class<? extends Extractor> getExtractorClass() {
    return ImageExtractor.class;
  }

  @Override
  public Class<? extends Outputer> getOutputerClass() {
    return ImageOutputer.class;
  }
}

另外要說明的是如果確定在使用某個StorageHandler的時候,隻需要用到Extractor,或者隻需要用到Outputer功能,那**不需要的接口則不用實現**。 比如如果我們隻需要讀取OSS數據而不需要做INSERT操作,那getOutputerClass()的實現隻需要扔個NotImplemented exception就可以了,不會被調用到。

3.3 ImageExtractor實現

因為對於SDK中Extractor接口的介紹以及對用戶如何寫一個自定義的Extractor,在之前介紹的OSS數據的讀取中已經有所涉及,所以這裏就不再對這方麵做深入的介紹。

Extractor的工作在於讀取輸入數據並進行用戶自定義處理,那麼我們首先來看看這裏由images_input_external這個外表綁定的OSS輸入LOCATION上存放的具體數據內容:

osscmd ls oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/                                                                                                                       

2017-01-09 14:02:01 1875.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/barbara.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/cameraman.bmp
2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/fishingboat.bmp
2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/goldhill.bmp
2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/house.bmp
2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/jetplane.bmp
2017-01-09 14:02:01 2.32MB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lake.bmp
2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lena.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/livingroom.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/pirate.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/walkbridge.bmp
2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_blonde.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_darkhair.bmp

可以看到**這個LOCATION存放了一係列bmp圖像數據,分辨率從 400 x 400 到 1200 x 1200不等**。 具體在這個例子中用到的ImageExtractor的詳細代碼在github上可以找到, 這裏隻做一些簡單介紹說明該Extractor做了些什麼工作:

  1. 從輸入的OSS地址上使用非結構化框架提供的InputStream接口讀取圖像數據,並在本地進行如下操作

    • 對於圖像寬度小於1024的圖片,統一放大到1024 x 1024; 對於圖像寬度大於1024的圖片,跳過不進行處理
    • 處理過的圖片,在內存中轉存成由輸入參數指定的格式(JPG)
  2. 把處理後在內存中的JPG數據的原始字節存入輸出的Record中的BINARY field, 同一個Record中還將存放處理後圖像的長和寬(都是1024), 以及原始的圖像名字(這個可以從輸入的InputStream上獲取);

  3. 填充後的Record從Extract接口返回進入MaxCompute係統;

  4. 在這個過程中,用戶可以靈活的進行各種操作,比如額外的參數驗證等。

另外要說明的是,目前Record作為MaxCompute結構化數據處理的基本單元,有一些額外的限製,比如BINARY/STRING類型都有**8MB**大小的限製,但是在大部分場景下這個大小應該是能滿足存儲需求的。

3.4 ImageOutputer的實現

接下來我們著重講一下ImageOutputer的實現。 首先所有的用戶輸出邏輯都必須實現Outputer接口,具體來說有如下三個:setup, output和close, 這和Extractor的setup, extract和close三個接口基本上是對稱的。

// Base outputer class, custom outputer shall extend from this class
public abstract class Outputer{

  public abstract void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes);

  public abstract void output(Record record) throws IOException;

  public abstract void close() throws IOException;
}

這其中setup()和close()在一個outputer中隻會調用一次。 用戶可以在setup裏麵做初始化準備工作,另外通常需要把setup()傳遞進來的這三個參數保存成ouputerd的class variable, 方便之後output()或者close()接口中使用。 而close()這個接口用於方便用戶代碼的掃尾工作。

通常情況下大部分的數據處理發生在output(Record)這個接口內。 MaxCompute係統會根據當前outputer分配處理的Record數目不斷調用,也就是**對每個輸入Record係統會調用一次** output(Record)。 係統假設在一個output(Record) 調用返回的時候,用戶代碼已經消費完這個Record, 因此在當前output(Record)返回後,係統可能將這個Record所使用的內存用作它用: 所以**不推薦一個Record中的信息在跨多個output()函數調用被使用**,如果一定有這個需求的話,用戶必須把相關信息通過class variable等方式自行另外保存。

3.4.1 ImageOutputer.setup()

setup用於初始化整個outputer, 在這個接口上提供了整個outputer操作過程中可能需要的參數:

  • ExecutionContext: 用於提供一些係統信息和接口,比如讀取resource等,在ImageOutputer這個例子中我們沒有用到這個參數;
  • OutputStreamSet: 用戶可以從這個類的next()接口獲取對外輸出所需要的OutputStream,具體用法我們在下麵詳細介紹;
  • DataAttributes: 用戶通過SERDEPROPERTIES設置的key-value參數可以通過這個類獲取,參數獲取這裏ImageOutputer例子中沒有用到,但是Extractor上的setup參數中也有這個類,在上麵的ImageExtractor用到了改功能,可以參考一下。 同時這個類上麵還提供了一些helper接口,比如方便用戶驗證schema等。

在我們這個ImageOutputer裏,setup()的實現比較簡單:

@Override
  public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) {
    this.outputStreamSet = outputStreamSet;
    this.attributes = attributes;
    this.attributes.verifySchema(new OdpsType[]{ OdpsType.STRING, OdpsType.BIGINT, OdpsType.BIGINT, OdpsType.BINARY });
  }

隻是做了簡單的初始化以及對schema的驗證。

3.4.2 ImageOutputer.output(Record) 以及 OutputStreamSet的使用

在介紹具體output()接口之前,首先我們要來看看 OutputStreamSet, 這個類有兩個接口:

public interface OutputStreamSet{

SinkOutputStream next();
SinkOutputStream next(String fileNamePostfix);

}

兩個接口都是用來獲取一個新的SinkOutputStream(一個Java OutputStream的實現,可以按照OutputStream使用),兩個接口唯一的區別是next()獲取的OutputStream寫出的文件名完全由MaxCompute係統決定,而next(String fileNamePostfix)則允許用戶提供文件名的postfix。 提供這個postfix的意義是,在輸出文件具體地址和名字格式總體由MaxCompute係統決定的前提下,用戶依然可以定製一個方便理解的postfix。 比如使用next("_boat.jpg") 得到的OutputStream可能對應如下一個輸出文文件:

oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0_boat.jpg

這其中尾端的"_boat.jpg"可以幫助用戶理解輸出文件的涵義。 如果這個 OutputStream是由next()獲得的話,那對應的輸出文件可能就是這樣的:

oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0

用戶可能就需要具體讀取這個文件才能知道這個文件中具體存放了什麼內容。

前麵提到output(Record)這個接口會由係統不斷調用,但是應該強調的是,並不一定在每一個Record都需要調用一次OutputStreamSet.next()接口來獲得一個新的OutputStream事實上在大多數情況下,我們建議在一個Outputer裏麵盡可能減少調用next()的次數(最好隻調用一次)。 也就是說理想情況下,一個outpuer隻應該產生一個輸出文件。 比如處理TSV這種文本格式文件,假設有5000個Record對應5000行TSV數據,那麼最理想的情況是應該把這5000行數據全部寫到一個TSV文件中。 當然用戶可能會有各種各樣不同的切分輸出文件的需求:比如希望每個文件大小控製在一定範圍,或比如文件的邊界有顯著的意義等等。

具體到當前這個圖像例子,從下麵的ImageOutputer代碼實現中可以看出,這個例子中確實是處理每個Record就調用一次next()的,因為在當前場景中,每一個輸入的Record都表示一張圖片的信息(binary bytes, 圖像名字,圖像長寬),所以這裏通過多次調用next()來輸出多個圖片文件。 但是我們還是需要再次強調,調用next()的次數過多可能有一些其他弊端,比如造成碎片化小數據在OSS上的存儲等等。 尤其在MaxCompute這種分布式計算係統上,因為係統本身就會調度起多個outputer進行並行計算處理,如果每個outpuer都輸出過多文件的話,最後產生的文件數目會有一個乘性效應。 回頭來看我們這個例子中,即使在這裏,多個圖像其實也可以通過一個OutputStream,按照tar/tar.gz的方式寫到單個文件中,**這些都是在實現具體係統中用戶需要根據自己的場景, 以及處理邏輯,輸出數據類型等信息來進行優化和tradeoff的。**

在理解了這些之後,現在來具體看看ImageOutputer的實現output接口實現:

  @Override
  public void output(Record record) throws IOException {
    String name = record.getString(0);
    Long width = record.getBigint(1);
    Long height = record.getBigint(2);
    ByteArrayInputStream input =  new ByteArrayInputStream(record.getBytes(3));
    BufferedImage sobelEdgeImage = getEdgeImage(input);
    OutputStream outputStream = this.outputStreamSet.next(name + "_" + width + "x" + height + "." + outputFormat);
    ImageIO.write(sobelEdgeImage, this.outputFormat, outputStream);
  }

可以看到這裏主要就做了三件事情:

  1. 根據之前保存的圖像名字,長寬信息,和編碼方式(".jpg")拚出一個帶擴展名的輸出文件名postfix。
  2. 讀取圖像binary bytes,並用getEdgeImage()來利用sobel算子對圖像做邊緣檢測。 具體getEdgeImage()的實現這裏就不進行深入解釋了: 使用了標準的sobel模板卷積算法, 有興趣看ImageOutputer源碼即可。
  3. 對每一個圖像產生一個新的OutputStream並將數據寫出,至此當前Record處理完畢,寫出一張圖片到OSS,output()函數返回。

3.4.3 ImageOutputer.close()

在這個例子中,outputer.close()接口沒有包含具體的實現邏輯,是個no-op。

至此我們就介紹完了一個output的實現,現在可以看看在運行完這個SQL query,對應OSS地址的數據:

osscmd ls oss://oss-odps-test/dev/output/images_output/ 

2017-01-15 14:36:50 215.19KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0-barbara_1024x1024.jpg
2017-01-15 14:36:50 108.90KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-1-cameraman_1024x1024.jpg
2017-01-15 14:36:50 169.54KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-2-fishingboat_1024x1024.jpg
2017-01-15 14:36:50 214.94KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-3-goldhill_1024x1024.jpg
2017-01-15 14:36:50 71.00KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-4-house_1024x1024.jpg
2017-01-15 14:36:50 126.50KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-5-jetplane_1024x1024.jpg
2017-01-15 14:36:50 169.63KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-6-lake_1024x1024.jpg
2017-01-15 14:36:50 194.18KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-7-lena_1024x1024.jpg
...

可以看到圖像數據按照期待格式寫到了指定地址,這裏我們就選一個輸入圖像(lena.bmp)以及對應的輸出圖像(M1_0_-1--1-7-lena_1024x1024.jpg)看一下對比:

Fig.2 輸入輸出圖像對比

這個例子中整個圖像處理流程已經通過如上的SQL query完成。 而從上麵展示的ImageExtractor以及ImageOutputer 源代碼,我們可以看出整個過程中用戶的邏輯基本與寫單機圖像處理程序無異,用戶的代碼隻需要在Extractor上做InputStreamRecord的準換,而在Outputer上做反向的RecordOutputSteam的寫出處理,其他核心的處理邏輯實現基本和單機算法實現相同,**在用戶的層麵,並不用去操心底層分布式係統的細節以及MaxCompute和OSS的交互**。

3.5 數據處理步驟的靈活性

從上麵這個例子中我們也可以看出,在一個完整的【OSS->MaxCompute->OSS】數據流程中,Extractor和Outputer中涉及的具體計算邏輯其實也並不一定會有一個非常明確的邊界。 Extractor和Outputer隻要各自完成所需的轉換Record/Stream的轉換,**具體的額外算法邏輯在兩個地方都有機會完成**。 比如上麵這個例子的整個流程涉及了如下圖像處理相關的運算:

  1. 圖像的縮放 (統一到 1024 x 1024)
  2. 圖像格式的轉換 (BMP -> JPG)
  3. 圖像的Sobel邊緣檢測

上麵的例子實現中,把1. 和 2. 放在ImageExtractor中完成,而3.則放在ImageOutputer中完成,**但並不是唯一的選擇**。 我們完全可以把所有3個步驟都放在ImageExtractor中完成,讓ImageOutputer隻做Record到寫出最後圖像的操作;也可以在ImageExtractor中隻做讀取原始binary到Recrod, 而把所有3個圖像處理步驟都放在ImageOutputer中進行,等等。 具體進行怎樣的選擇,用戶可以完全根據需要自己實現。

另外一個係統設計的點是如果對於一個數據需要做重複的運算,那可以考慮將數據從OSS中通過Extractor讀出進MaxCompute,然後存儲成MaxCompute的內置表格再進行(多次)的計算。 這個對於MaxCompute和OSS沒有進行混布,不在一個物理網絡上的場景尤其有意義: MaxCompute從內置表中讀取數據無疑要比從外部OSS存儲服務中讀出數據要有效得多。 在上麵3.1.3子章節中的圖像處理例子,這個INSER OVERWITE操作:

INSERT OVERWRITE TABLE images_output_external
SELECT * FROM images_input_external
WHERE width = 1024;

就可以改寫成兩個分開的語句:

INSERT OVERWRITE TABLE images_internal
SELECT * FROM images_input_external
WHERE width = 1024;

INSERT OVERWRITE TABLE images_output_external
SELECT * FROM image_internal;

通過把數據寫到一個內部images_internal表中,後麵如果有**多次**讀取數據的需求的話,就可以不再去訪問外部OSS了。 這裏也可以看到MaxCompute非結構化框架以及SQL語法本身提供了非常高的靈活性和可擴展性,用戶可以根據實際計算的不同模式/場景/需求,來在上麵完成各種各樣的數據計算工作流。

5. 結語

非結構化數據處理框架隨著MaxCompute 2.0一起推出,意在豐富MaxCompute平台的數據處理生態,來打通阿裏雲核心計算平台與阿裏雲各個重要存儲服務之間的數據鏈路。 在之前介紹過的讀取OSS以及處理TableStore數據的整體方案後,本文側重介紹數據往OSS的輸出方案,並依托一個圖像處理的處理實例,展示了【OSS->MaxCompute->OSS】整個數據鏈路的實現。 在這些新功能的基礎上,我們希望實現整個阿裏雲計算與數據的生態融合: 在不同的項目上,我們已經看到了在MaxCompute上處理OSS上的海量視頻,圖像等非結構化數據的巨大潛力。 今後隨著這個生態的豐富,我們期望OSS數據,TableStore數據以及MaxCompute內部存儲的數據,都能在MaxCompute的核心計算引擎上進行融合,從而產生更大的價值。

最後更新:2017-07-18 20:34:26

  上一篇:go  Oracle,MySQL相關問題
  下一篇:go  網站關鍵詞排名不好的原因主要有哪些