462
微信
處理非結構化數據__快速開始_大數據計算服務-阿裏雲
重要提示:目前MaxCompute處理非結構化數據功能仍屬於公測範疇。我們不建議您將該功能用於生產實踐。有意願申請試用非結構化處理的用戶可通過掃描MaxCompute產品詳情頁的釘釘群二維碼提交試用申請。
MaxCompute作為阿裏雲大數據平台的核心計算組件,擁有強大的計算能力,能夠調度大量的節點做並行計算。而MaxCompute SQL能在簡明的語義上實現各種數據處理邏輯,在阿裏巴巴集團內外更是廣為應用,在其上實現與各種數據源的互通,對於打通整個阿裏雲的數據生態具有重要意義。基於這一點,最近MaxCompute團隊依托MaxCompute2.0係統架構,引入了非結構化數據處理框架:通過外部表,為各種數據在MaxCompute上的計算處理提供了入口。這裏以MaxCompute處理存儲在OSS上的數據為例,介紹這些新功能。
本文介紹了一種外部表的功能,支持旨在提供處理除了MaxCompute現有表格以外的其他數據的能力。在這個框架中,通過一條簡單的創建表語句,即可在MaxCompute上創建一張外部表,建立MaxCompute表與外部數據源的關聯,提供各種數據的接入和輸出能力。創建好的外部表可以像普通的MaxCompute表一樣使用(大部分場景),充分利用MaxCompute SQL的強大計算功能。
這裏的各種數據
涵蓋兩個維度:
- 多樣的數據存儲介質
- 插件式的框架可以對接多種數據存儲介質,比如OSS、Table Store。
- 多樣的數據格式:MaxCompute表是結構化的數據,而外部表可以不限於結構化數據
- 完全無結構數據,比如圖像,音頻,視頻文件等;
- 半結構化數據,比如CSV,TSV等隱含一定schema的文本文件;
- 除MaxCompute表結構外的結構化數據; 比如Table Store數據;
下麵通過一個簡單例子,來演示如何在MaxCompute上輕鬆訪問OSS上的數據。
1. 係統內置方式讀取OSS數據
重要提示:未開通MaxCompute 2.0版本的用戶將無法使用此示例。有意願申請試用非結構化處理的用戶可通過掃描MaxCompute產品詳情頁的釘釘群二維碼提交試用申請。
訪問外部數據源時,需要用戶自定義不同的Extractor。同時您同樣可以使用MaxCompute內置的 Extractor,來讀取按照約定格式存儲的OSS數據。我們隻需要創建一個外部表,就能以這張表為源表做查詢。假設有一份CSV數據存在OSS上,endpoint為oss-cn-shanghai-internal.aliyuncs.com
,bucket為oss-odps-test
,數據文件放在/demo/vehicle.csv
。
1.1 授予權限
首先需要在RAM中授權MaxCompute訪問OSS的權限。登錄RAM控製台,通過控製台中的角色管理創建角色AliyunODPSDefaultRole:
並將策略內容設置為:
{
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": [
"odps.aliyuncs.com"
]
}
}
],
"Version": "1"
}
然後編輯該角色的授權策略,將權限AliyunODPSRolePolicy
授權給該角色。如果覺得這些步驟太麻煩,可以點擊此處完成一鍵授權。
1.2 創建外部表
執行一條DDL語句,創建外部表
set odps.task.major.version=2dot0_demo_flighting;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.preparse.odps2=lot; -- (1)
CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_csv_external
(
vehicleId int,
recordId int,
patientId int,
calls int,
locationLatitute double,
locationLongtitue double,
recordTime string,
direction string
)
STORED BY 'com.aliyun.odps.CsvStorageHandler' -- (2)
WITH SERDEPROPERTIES (
'odps.properties.rolearn'='acs:ram::1811270634786818:role/aliyunodpsdefaultrole'
) -- (3)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/'; -- (4)(5)
說明:
- 上述四個
set
命令是必須與創建表語句一並執行的。否則創建表語句將會報錯; com.aliyun.odps.CsvStorageHandler
是內置的處理CSV格式文件的StorageHandler
,它定義了如何讀寫CSV文件。我們隻需要指明這個名字,相關邏輯已經由係統實現。odps.properties.rolearn
中的信息是RAM中AliyunODPSDefaultRole
的Arn
信息。您可以通過RAM控製台中的角色詳情獲取到。LOCATION必須指定一個OSS目錄,默認係統會讀取這個目錄下所有的文件。
- 強烈建議您使用OSS提供的內網域名,否則將產生OSS流量費用;
- 建議您將OSS數據上傳至華東2(上海)區域;由於MaxCompute隻有在上海部署,我們不承諾跨區域的數據連通性;
- 此外,請注意,OSS的連接格式為
oss://oss-cn-shanghai-internal.aliyuncs.com/Bucket名稱/目錄名稱/
。目錄後不要加文件名稱。如下的集中用法都是錯誤的:https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持http連接
https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持https連接
oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo -- 連接地址錯誤
oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv -- 不必指定文件名
外部表隻是在係統中記錄了與OSS目錄的關聯,當DROP這張表時,對應的
LOCATION
數據不會被刪除。
備注:更多有關外部表的說明請點擊這裏。
1.3 查詢外部表
外部表創建成功後,我們可以像對普通表一樣使用這個外部表。假設/demo/vehicle.csv
數據為:
1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S
1,7,53,1,46.81006,-92.08174,9/14/2014 0:00,N
1,8,63,1,46.81006,-92.08174,9/14/2014 0:00,SW
1,9,4,1,46.81006,-92.08174,9/14/2014 0:00,NE
1,10,31,1,46.81006,-92.08174,9/14/2014 0:00,N
建議您將OSS數據上傳至華東2(上海)區域;由於MaxCompute隻有在上海部署,我們不承諾跨區域的數據連通性;
執行如下SQL:
set odps.task.major.version=2dot0_demo_flighting;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.preparse.odps2=lot;
select recordId, patientId, direction from ambulance_data_csv_external where patientId > 25;
這條語句會提交一個作業,調用內置csv extractor,從OSS讀取數據進行處理。輸出結果為:
+------------+------------+-----------+
| recordId | patientId | direction |
+------------+------------+-----------+
| 1 | 51 | S |
| 3 | 48 | NE |
| 4 | 30 | W |
| 5 | 47 | S |
| 7 | 53 | N |
| 8 | 63 | SW |
| 10 | 31 | N |
+------------+------------+-----------+
2. 自定義Extractor訪問OSS
當OSS中數據格式比較複雜,內置的Extractor無法滿足需求時,需要自定義Extractor來讀取OSS文件中的數據。例如有一個txt數據文件,並不是CSV格式,記錄之間的列通過|
分割。/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv
數據為:
1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N
2.1 定義Extractor
可以寫一個通用的Extractor,將分隔符作為參數傳進來,可以處理所有類似格式的text文件。
/**
* Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
**/
public class TextExtractor extends Extractor {
private InputStreamSet inputs;
private String columnDelimiter;
private DataAttributes attributes;
private BufferedReader currentReader;
private boolean firstRead = true;
public TextExtractor() {
// default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
this.columnDelimiter = ",";
}
// no particular usage for execution context in this example
@Override
public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
this.inputs = inputs; // (1)
this.attributes = attributes;
// check if "delimiter" attribute is supplied via SQL query
String columnDelimiter = this.attributes.getValueByKey("delimiter"); // (2)
if ( columnDelimiter != null)
{
this.columnDelimiter = columnDelimiter;
}
// note: more properties can be inited from attributes if needed
}
@Override
public Record extract() throws IOException {
String line = readNextLine();
if (line == null) {
return null; // (5)
}
return textLineToRecord(line); // (3)(4)
}
@Override
public void close(){
// no-op
}
}
說明:
- inputs是一個InputStreamSet,每次調用next()返回一個InputStream,這個InputStream可以讀取一個OSS文件的所有內容。
- delimiter通過DDL語句傳參。
- textLineToRecord將一行數據按照delimiter分割為多個列,完整實現可以參考: 此鏈接。
- extactor()調用返回一條Record,代表外部表中的一條記錄。
- 返回NULL來表示這個表中已經沒有記錄可讀。
2.2 定義StorageHandler
StorageHandler作為External Table自定義邏輯的統一入口。
package com.aliyun.odps.udf.example.text;
public class TextStorageHandler extends OdpsStorageHandler {
@Override
public Class<? extends Extractor> getExtractorClass() {
return TextExtractor.class;
}
@Override
public Class<? extends Outputer> getOutputerClass() {
return TextOutputer.class;
}
}
2.3 編譯打包
將自定義代碼編譯打包,並上傳到MaxCompute。
add jar odps-udf-example.jar;
2.4 創建External表
與使用內置Extractor類似,我們同樣需要建立一個外部表,不同的是這次需要指定外部表訪問數據的時候,使用自定義的StorageHandler。
set odps.task.major.version=2dot0_demo_flighting;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.preparse.odps2=lot;
CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_txt_external
(
vehicleId int,
recordId int,
patientId int,
calls int,
locationLatitute double,
locationLongtitue double,
recordTime string,
direction string
)
STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler' -- (1)
with SERDEPROPERTIES (
'delimiter'='\|', --(2)
'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole'
)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/'
USING 'odps-udf-example.jar'; --(3)
說明:
- STORED BY指定自定義StorageHandler的類名;
- SERDEPROPERITES可以指定參數,這些參數會通過DataAttributes傳遞到Extractor代碼中;
- 同時需要指定類定義所在的jar包;
2.5 查詢外部表
執行:
set odps.task.major.version=2dot0_demo_flighting;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.preparse.odps2=lot;
select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;
3. 通過自定義Extractor讀取非結構化數據
在前麵我們看到了通過內置與自定義的Extractor可以輕鬆處理存儲在OSS上的CSV等文本數據。接下來我們以語音數據(wav格式文件)為例,來看看怎樣通過自定義的Extractor來訪問處理OSS上的非文本文件。
這裏我們從最終執行的SQL開始,介紹以MaxCompute SQL為入口,處理存放在OSS上的語音文件的使用方法:
set odps.task.major.version=2dot0_demo_flighting;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.preparse.odps2=lot;
CREATE EXTERNAL TABLE IF NOT EXISTS speech_sentence_snr_external
(
sentence_snr double,
id string
)
STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
WITH SERDEPROPERTIES (
'mlfFileName'='sm_random_5_utterance.text.label' ,
'speechSampleRateInKHz' = '16'
)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
USING 'odps-udf-example.jar,sm_random_5_utterance.text.label';
這裏我們依然建立的外部表,並且通過外部表的Schema定義了我們希望通過外部表從語音文件中抽取出來的信息:
- 一個語音文件中的語句信噪比(SNR):sentence_snr;
- 對應語音文件的名字:id;
創建了外部表後,通過標準的Select語句進行查詢,則會觸發Extractor運行計算。從這我們可以更直接的感受到,在讀取處理OSS數據時,除了之前介紹過的對文本文件做簡單的反序列化處理,還可以通過在自定義的Extractor中實現更複雜的數據處理抽取邏輯:在這個例子中,我們通過自定義的Ecom.aliyun.odps.udf.example.speech.SpeechStorageHandler
中封裝的Extractor, 實現了對語音文件計算平均有效語句信噪比的功能,並將抽取出來的結構化數據直接進行SQL運算(WHERE sentence_snr > 10), 最終返回所有信噪比大於10的語音文件以及對應的信噪比值。
在OSS地址oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/
上,存儲了原始的多個WAV格式的語音文件,MaxCompute 框架將讀取該地址上的所有文件,並在必要的時候進行文件級別的分片,自動將文件分配給多個計算節點處理。每個計算節點上的Extractor則負責處理通過InputStreamSet分配給該節點的文件集。具體的處理邏輯則與用戶單機程序相仿,用戶不用關心分布計算中的種種細節,按照類單機方式實現其用戶算法即可。
這裏簡單介紹一下定製化的SpeechSentenceSnrExtractor
主體邏輯。首先我們在setup
接口中讀取參數,進行初始化,並且導入語音處理模型(通過resource引入):
public SpeechSentenceSnrExtractor(){
this.utteranceLabels = new HashMap<String, UtteranceLabel>();
}
@Override
public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
this.inputs = inputs;
this.attributes = attributes;
this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
try {
// read the speech model file from resource and load the model into memory
BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
loadMlfLabelsFromResource(inputStream);
inputStream.close();
} catch (IOException e) {
throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
}
}
Extractor()接口中,實現了對語音文件的具體讀取和處理邏輯,對讀取的數據根據語音模型進行信噪比的計算,並且將結果填充成[snr, id]格式的Record。這個例子中對實現進行了簡化,同時也沒有包括涉及語音處理的算法邏輯,具體實現可參靠MaxCompute SDK在開源社區中提供的樣例代碼。
@Override
public Record extract() throws IOException {
SourceInputStream inputStream = inputs.next();
if (inputStream == null){
return null;
}
// process one wav file to extract one output record [snr, id]
String fileName = inputStream.getFileName();
fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
logger.info("Processing wav file " + fileName);
String id = fileName.substring(0, fileName.lastIndexOf('.'));
// read speech file into memory buffer
long fileSize = inputStream.getFileSize();
byte[] buffer = new byte[(int)fileSize];
int readSize = inputStream.readToEnd(buffer);
inputStream.close();
// compute the avg sentence snr
double snr = computeSnr(id, buffer, readSize);
// construct output record [snr, id]
Column[] outputColumns = this.attributes.getRecordColumns();
ArrayRecord record = new ArrayRecord(outputColumns);
record.setDouble(0, snr);
record.setString(1, id);
return record;
}
private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
throws IOException {
// skipped here
}
// compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
private double computeSnr(String id, byte[] buffer, int validBufferLen){
// computing the snr value for the wav file (supplied as byte buffer array), skipped here
}
執行查詢:
set odps.task.major.version=2dot0_demo_flighting;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.preparse.odps2=lot;
select sentence_snr, id
from speech_sentence_snr_external
where sentence_snr > 10.0;
可獲得計算結果:
--------------------------------------------------------------
| sentence_snr | id |
--------------------------------------------------------------
| 34.4703 | J310209090013_H02_K03_042 |
--------------------------------------------------------------
| 31.3905 | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
--------------------------------------------------------------
| 35.4774 | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0 |
--------------------------------------------------------------
| 16.0462 | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0 |
--------------------------------------------------------------
| 14.5568 | tsh_148_3013_5_13_47_3d5008d792408f81_0 |
--------------------------------------------------------------
可以看到,通過自定義Extractor,我們在SQL語句上即可分布式地處理多個OSS上語音數據文件。同樣的,用類似的方法,我們可以方便的利用MaxCompute的大規模計算能力,完成對圖像,視頻等各種類型非結構化數據的處理。
最後更新:2016-12-15 13:25:27
上一篇:
編寫Graph__快速開始_大數據計算服務-阿裏雲
下一篇:
項目空間__基本概念_基本介紹_大數據計算服務-阿裏雲
UpdateRole__角色管理接口_RAM API文檔_訪問控製-阿裏雲
授權策略示例__子賬號訪問IoT_控製台使用手冊_阿裏雲物聯網套件-阿裏雲
新建自定義路由__路由表相關接口_API 參考_雲服務器 ECS-阿裏雲
創建訂閱__訂閱操作_快速入門_消息服務-阿裏雲
單IP多HTTPS域名場景下的解決方案__最佳實踐_HTTPDNS-阿裏雲
DescribeRuleAttribute__轉發規則相關API_API 參考_負載均衡-阿裏雲
安全組默認規則__安全組_用戶指南_雲服務器 ECS-阿裏雲
查詢共享帶寬包監控信息__監控相關接口_API參考_專有網絡 VPC-阿裏雲
創建存儲過程__數據庫開發_用戶指南(RDBMS)_數據管理-阿裏雲
SetPasswordPolicy__安全設置接口_RAM API文檔_訪問控製-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲