閱讀462 返回首頁    go 微信


處理非結構化數據__快速開始_大數據計算服務-阿裏雲

重要提示:目前MaxCompute處理非結構化數據功能仍屬於公測範疇。我們不建議您將該功能用於生產實踐。有意願申請試用非結構化處理的用戶可通過掃描MaxCompute產品詳情頁的釘釘群二維碼提交試用申請。

MaxCompute作為阿裏雲大數據平台的核心計算組件,擁有強大的計算能力,能夠調度大量的節點做並行計算。而MaxCompute SQL能在簡明的語義上實現各種數據處理邏輯,在阿裏巴巴集團內外更是廣為應用,在其上實現與各種數據源的互通,對於打通整個阿裏雲的數據生態具有重要意義。基於這一點,最近MaxCompute團隊依托MaxCompute2.0係統架構,引入了非結構化數據處理框架:通過外部表,為各種數據在MaxCompute上的計算處理提供了入口。這裏以MaxCompute處理存儲在OSS上的數據為例,介紹這些新功能。

本文介紹了一種外部表的功能,支持旨在提供處理除了MaxCompute現有表格以外的其他數據的能力。在這個框架中,通過一條簡單的創建表語句,即可在MaxCompute上創建一張外部表,建立MaxCompute表與外部數據源的關聯,提供各種數據的接入和輸出能力。創建好的外部表可以像普通的MaxCompute表一樣使用(大部分場景),充分利用MaxCompute SQL的強大計算功能。

這裏的各種數據涵蓋兩個維度:

  1. 多樣的數據存儲介質
    • 插件式的框架可以對接多種數據存儲介質,比如OSSTable Store
  2. 多樣的數據格式:MaxCompute表是結構化的數據,而外部表可以不限於結構化數據
    • 完全無結構數據,比如圖像,音頻,視頻文件等;
    • 半結構化數據,比如CSV,TSV等隱含一定schema的文本文件;
    • 除MaxCompute表結構外的結構化數據; 比如Table Store數據;

下麵通過一個簡單例子,來演示如何在MaxCompute上輕鬆訪問OSS上的數據。

1. 係統內置方式讀取OSS數據

重要提示:未開通MaxCompute 2.0版本的用戶將無法使用此示例。有意願申請試用非結構化處理的用戶可通過掃描MaxCompute產品詳情頁的釘釘群二維碼提交試用申請。

訪問外部數據源時,需要用戶自定義不同的Extractor。同時您同樣可以使用MaxCompute內置的 Extractor,來讀取按照約定格式存儲的OSS數據。我們隻需要創建一個外部表,就能以這張表為源表做查詢。假設有一份CSV數據存在OSS上,endpoint為oss-cn-shanghai-internal.aliyuncs.com,bucket為oss-odps-test,數據文件放在/demo/vehicle.csv

1.1 授予權限

首先需要在RAM中授權MaxCompute訪問OSS的權限。登錄RAM控製台,通過控製台中的角色管理創建角色AliyunODPSDefaultRole:

ram_role

並將策略內容設置為:

  1. {
  2. "Statement": [
  3. {
  4. "Action": "sts:AssumeRole",
  5. "Effect": "Allow",
  6. "Principal": {
  7. "Service": [
  8. "odps.aliyuncs.com"
  9. ]
  10. }
  11. }
  12. ],
  13. "Version": "1"
  14. }

然後編輯該角色的授權策略,將權限AliyunODPSRolePolicy授權給該角色。如果覺得這些步驟太麻煩,可以點擊此處完成一鍵授權

1.2 創建外部表

執行一條DDL語句,創建外部表

  1. set odps.task.major.version=2dot0_demo_flighting;
  2. set odps.sql.planner.mode=lot;
  3. set odps.sql.ddl.odps2=true;
  4. set odps.sql.preparse.odps2=lot; -- (1)
  5. CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_csv_external
  6. (
  7. vehicleId int,
  8. recordId int,
  9. patientId int,
  10. calls int,
  11. locationLatitute double,
  12. locationLongtitue double,
  13. recordTime string,
  14. direction string
  15. )
  16. STORED BY 'com.aliyun.odps.CsvStorageHandler' -- (2)
  17. WITH SERDEPROPERTIES (
  18. 'odps.properties.rolearn'='acs:ram::1811270634786818:role/aliyunodpsdefaultrole'
  19. ) -- (3)
  20. LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/'; -- (4)(5)

說明:

  1. 上述四個set命令是必須與創建表語句一並執行的。否則創建表語句將會報錯;
  2. com.aliyun.odps.CsvStorageHandler是內置的處理CSV格式文件的StorageHandler,它定義了如何讀寫CSV文件。我們隻需要指明這個名字,相關邏輯已經由係統實現。
  3. odps.properties.rolearn中的信息是RAM中AliyunODPSDefaultRoleArn信息。您可以通過RAM控製台中的角色詳情獲取到。
  4. LOCATION必須指定一個OSS目錄,默認係統會讀取這個目錄下所有的文件。

    • 強烈建議您使用OSS提供的內網域名,否則將產生OSS流量費用;
    • 建議您將OSS數據上傳至華東2(上海)區域;由於MaxCompute隻有在上海部署,我們不承諾跨區域的數據連通性;
    • 此外,請注意,OSS的連接格式為oss://oss-cn-shanghai-internal.aliyuncs.com/Bucket名稱/目錄名稱/。目錄後不要加文件名稱。如下的集中用法都是錯誤的:
      1. https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持http連接
      2. https://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo/ -- 不支持https連接
      3. oss://oss-odps-test.oss-cn-shanghai-internal.aliyuncs.com/Demo -- 連接地址錯誤
      4. oss://oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/vehicle.csv -- 不必指定文件名
  5. 外部表隻是在係統中記錄了與OSS目錄的關聯,當DROP這張表時,對應的LOCATION數據不會被刪除。

備注:更多有關外部表的說明請點擊這裏

1.3 查詢外部表

外部表創建成功後,我們可以像對普通表一樣使用這個外部表。假設/demo/vehicle.csv數據為:

  1. 1,1,51,1,46.81006,-92.08174,9/14/2014 0:00,S
  2. 1,2,13,1,46.81006,-92.08174,9/14/2014 0:00,NE
  3. 1,3,48,1,46.81006,-92.08174,9/14/2014 0:00,NE
  4. 1,4,30,1,46.81006,-92.08174,9/14/2014 0:00,W
  5. 1,5,47,1,46.81006,-92.08174,9/14/2014 0:00,S
  6. 1,6,9,1,46.81006,-92.08174,9/14/2014 0:00,S
  7. 1,7,53,1,46.81006,-92.08174,9/14/2014 0:00,N
  8. 1,8,63,1,46.81006,-92.08174,9/14/2014 0:00,SW
  9. 1,9,4,1,46.81006,-92.08174,9/14/2014 0:00,NE
  10. 1,10,31,1,46.81006,-92.08174,9/14/2014 0:00,N

建議您將OSS數據上傳至華東2(上海)區域;由於MaxCompute隻有在上海部署,我們不承諾跨區域的數據連通性;

執行如下SQL:

  1. set odps.task.major.version=2dot0_demo_flighting;
  2. set odps.sql.planner.mode=lot;
  3. set odps.sql.ddl.odps2=true;
  4. set odps.sql.preparse.odps2=lot;
  5. select recordId, patientId, direction from ambulance_data_csv_external where patientId > 25;

這條語句會提交一個作業,調用內置csv extractor,從OSS讀取數據進行處理。輸出結果為:

  1. +------------+------------+-----------+
  2. | recordId | patientId | direction |
  3. +------------+------------+-----------+
  4. | 1 | 51 | S |
  5. | 3 | 48 | NE |
  6. | 4 | 30 | W |
  7. | 5 | 47 | S |
  8. | 7 | 53 | N |
  9. | 8 | 63 | SW |
  10. | 10 | 31 | N |
  11. +------------+------------+-----------+

2. 自定義Extractor訪問OSS

當OSS中數據格式比較複雜,內置的Extractor無法滿足需求時,需要自定義Extractor來讀取OSS文件中的數據。例如有一個txt數據文件,並不是CSV格式,記錄之間的列通過|分割。/demo/SampleData/CustomTxt/AmbulanceData/vehicle.csv數據為:

  1. 1|1|51|1|46.81006|-92.08174|9/14/2014 0:00|S
  2. 1|2|13|1|46.81006|-92.08174|9/14/2014 0:00|NE
  3. 1|3|48|1|46.81006|-92.08174|9/14/2014 0:00|NE
  4. 1|4|30|1|46.81006|-92.08174|9/14/2014 0:00|W
  5. 1|5|47|1|46.81006|-92.08174|9/14/2014 0:00|S
  6. 1|6|9|1|46.81006|-92.08174|9/14/2014 0:00|S
  7. 1|7|53|1|46.81006|-92.08174|9/14/2014 0:00|N
  8. 1|8|63|1|46.81006|-92.08174|9/14/2014 0:00|SW
  9. 1|9|4|1|46.81006|-92.08174|9/14/2014 0:00|NE
  10. 1|10|31|1|46.81006|-92.08174|9/14/2014 0:00|N

2.1 定義Extractor

可以寫一個通用的Extractor,將分隔符作為參數傳進來,可以處理所有類似格式的text文件。

  1. /**
  2. * Text extractor that extract schematized records from formatted plain-text(csv, tsv etc.)
  3. **/
  4. public class TextExtractor extends Extractor {
  5. private InputStreamSet inputs;
  6. private String columnDelimiter;
  7. private DataAttributes attributes;
  8. private BufferedReader currentReader;
  9. private boolean firstRead = true;
  10. public TextExtractor() {
  11. // default to ",", this can be overwritten if a specific delimiter is provided (via DataAttributes)
  12. this.columnDelimiter = ",";
  13. }
  14. // no particular usage for execution context in this example
  15. @Override
  16. public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes) {
  17. this.inputs = inputs; // (1)
  18. this.attributes = attributes;
  19. // check if "delimiter" attribute is supplied via SQL query
  20. String columnDelimiter = this.attributes.getValueByKey("delimiter"); // (2)
  21. if ( columnDelimiter != null)
  22. {
  23. this.columnDelimiter = columnDelimiter;
  24. }
  25. // note: more properties can be inited from attributes if needed
  26. }
  27. @Override
  28. public Record extract() throws IOException {
  29. String line = readNextLine();
  30. if (line == null) {
  31. return null; // (5)
  32. }
  33. return textLineToRecord(line); // (3)(4)
  34. }
  35. @Override
  36. public void close(){
  37. // no-op
  38. }
  39. }

說明:

  • inputs是一個InputStreamSet,每次調用next()返回一個InputStream,這個InputStream可以讀取一個OSS文件的所有內容。
  • delimiter通過DDL語句傳參。
  • textLineToRecord將一行數據按照delimiter分割為多個列,完整實現可以參考: 此鏈接
  • extactor()調用返回一條Record,代表外部表中的一條記錄。
  • 返回NULL來表示這個表中已經沒有記錄可讀。

2.2 定義StorageHandler

StorageHandler作為External Table自定義邏輯的統一入口。

  1. package com.aliyun.odps.udf.example.text;
  2. public class TextStorageHandler extends OdpsStorageHandler {
  3. @Override
  4. public Class<? extends Extractor> getExtractorClass() {
  5. return TextExtractor.class;
  6. }
  7. @Override
  8. public Class<? extends Outputer> getOutputerClass() {
  9. return TextOutputer.class;
  10. }
  11. }

2.3 編譯打包

將自定義代碼編譯打包,並上傳到MaxCompute。

  1. add jar odps-udf-example.jar;

2.4 創建External表

與使用內置Extractor類似,我們同樣需要建立一個外部表,不同的是這次需要指定外部表訪問數據的時候,使用自定義的StorageHandler。

  1. set odps.task.major.version=2dot0_demo_flighting;
  2. set odps.sql.planner.mode=lot;
  3. set odps.sql.ddl.odps2=true;
  4. set odps.sql.preparse.odps2=lot;
  5. CREATE EXTERNAL TABLE IF NOT EXISTS ambulance_data_txt_external
  6. (
  7. vehicleId int,
  8. recordId int,
  9. patientId int,
  10. calls int,
  11. locationLatitute double,
  12. locationLongtitue double,
  13. recordTime string,
  14. direction string
  15. )
  16. STORED BY 'com.aliyun.odps.udf.example.text.TextStorageHandler' -- (1)
  17. with SERDEPROPERTIES (
  18. 'delimiter'='\|', --(2)
  19. 'odps.properties.rolearn'='acs:ram::xxxxxxxxxxxxx:role/aliyunodpsdefaultrole'
  20. )
  21. LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/Demo/SampleData/CustomTxt/AmbulanceData/'
  22. USING 'odps-udf-example.jar'; --(3)

說明:

  1. STORED BY指定自定義StorageHandler的類名;
  2. SERDEPROPERITES可以指定參數,這些參數會通過DataAttributes傳遞到Extractor代碼中;
  3. 同時需要指定類定義所在的jar包;

2.5 查詢外部表

執行:

  1. set odps.task.major.version=2dot0_demo_flighting;
  2. set odps.sql.planner.mode=lot;
  3. set odps.sql.ddl.odps2=true;
  4. set odps.sql.preparse.odps2=lot;
  5. select recordId, patientId, direction from ambulance_data_txt_external where patientId > 25;

3. 通過自定義Extractor讀取非結構化數據

在前麵我們看到了通過內置與自定義的Extractor可以輕鬆處理存儲在OSS上的CSV等文本數據。接下來我們以語音數據(wav格式文件)為例,來看看怎樣通過自定義的Extractor來訪問處理OSS上的非文本文件。

這裏我們從最終執行的SQL開始,介紹以MaxCompute SQL為入口,處理存放在OSS上的語音文件的使用方法:

  1. set odps.task.major.version=2dot0_demo_flighting;
  2. set odps.sql.planner.mode=lot;
  3. set odps.sql.ddl.odps2=true;
  4. set odps.sql.preparse.odps2=lot;
  5. CREATE EXTERNAL TABLE IF NOT EXISTS speech_sentence_snr_external
  6. (
  7. sentence_snr double,
  8. id string
  9. )
  10. STORED BY 'com.aliyun.odps.udf.example.speech.SpeechStorageHandler'
  11. WITH SERDEPROPERTIES (
  12. 'mlfFileName'='sm_random_5_utterance.text.label' ,
  13. 'speechSampleRateInKHz' = '16'
  14. )
  15. LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/'
  16. USING 'odps-udf-example.jar,sm_random_5_utterance.text.label';

這裏我們依然建立的外部表,並且通過外部表的Schema定義了我們希望通過外部表從語音文件中抽取出來的信息:

  1. 一個語音文件中的語句信噪比(SNR):sentence_snr;
  2. 對應語音文件的名字:id;

創建了外部表後,通過標準的Select語句進行查詢,則會觸發Extractor運行計算。從這我們可以更直接的感受到,在讀取處理OSS數據時,除了之前介紹過的對文本文件做簡單的反序列化處理,還可以通過在自定義的Extractor中實現更複雜的數據處理抽取邏輯:在這個例子中,我們通過自定義的Ecom.aliyun.odps.udf.example.speech.SpeechStorageHandler 中封裝的Extractor, 實現了對語音文件計算平均有效語句信噪比的功能,並將抽取出來的結構化數據直接進行SQL運算(WHERE sentence_snr > 10), 最終返回所有信噪比大於10的語音文件以及對應的信噪比值。

在OSS地址oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/SpeechSentenceTest/上,存儲了原始的多個WAV格式的語音文件,MaxCompute 框架將讀取該地址上的所有文件,並在必要的時候進行文件級別的分片,自動將文件分配給多個計算節點處理。每個計算節點上的Extractor則負責處理通過InputStreamSet分配給該節點的文件集。具體的處理邏輯則與用戶單機程序相仿,用戶不用關心分布計算中的種種細節,按照類單機方式實現其用戶算法即可。

這裏簡單介紹一下定製化的SpeechSentenceSnrExtractor主體邏輯。首先我們在setup接口中讀取參數,進行初始化,並且導入語音處理模型(通過resource引入):

  1. public SpeechSentenceSnrExtractor(){
  2. this.utteranceLabels = new HashMap<String, UtteranceLabel>();
  3. }
  4. @Override
  5. public void setup(ExecutionContext ctx, InputStreamSet inputs, DataAttributes attributes){
  6. this.inputs = inputs;
  7. this.attributes = attributes;
  8. this.mlfFileName = this.attributes.getValueByKey(MLF_FILE_ATTRIBUTE_KEY);
  9. String sampleRateInKHzStr = this.attributes.getValueByKey(SPEECH_SAMPLE_RATE_KEY);
  10. this.sampleRateInKHz = Double.parseDouble(sampleRateInKHzStr);
  11. try {
  12. // read the speech model file from resource and load the model into memory
  13. BufferedInputStream inputStream = ctx.readResourceFileAsStream(mlfFileName);
  14. loadMlfLabelsFromResource(inputStream);
  15. inputStream.close();
  16. } catch (IOException e) {
  17. throw new RuntimeException("reading model from mlf failed with exception " + e.getMessage());
  18. }
  19. }

Extractor()接口中,實現了對語音文件的具體讀取和處理邏輯,對讀取的數據根據語音模型進行信噪比的計算,並且將結果填充成[snr, id]格式的Record。這個例子中對實現進行了簡化,同時也沒有包括涉及語音處理的算法邏輯,具體實現可參靠MaxCompute SDK在開源社區中提供的樣例代碼

  1. @Override
  2. public Record extract() throws IOException {
  3. SourceInputStream inputStream = inputs.next();
  4. if (inputStream == null){
  5. return null;
  6. }
  7. // process one wav file to extract one output record [snr, id]
  8. String fileName = inputStream.getFileName();
  9. fileName = fileName.substring(fileName.lastIndexOf('/') + 1);
  10. logger.info("Processing wav file " + fileName);
  11. String id = fileName.substring(0, fileName.lastIndexOf('.'));
  12. // read speech file into memory buffer
  13. long fileSize = inputStream.getFileSize();
  14. byte[] buffer = new byte[(int)fileSize];
  15. int readSize = inputStream.readToEnd(buffer);
  16. inputStream.close();
  17. // compute the avg sentence snr
  18. double snr = computeSnr(id, buffer, readSize);
  19. // construct output record [snr, id]
  20. Column[] outputColumns = this.attributes.getRecordColumns();
  21. ArrayRecord record = new ArrayRecord(outputColumns);
  22. record.setDouble(0, snr);
  23. record.setString(1, id);
  24. return record;
  25. }
  26. private void loadMlfLabelsFromResource(BufferedInputStream fileInputStream)
  27. throws IOException {
  28. // skipped here
  29. }
  30. // compute the snr of the speech sentence, assuming the input buffer contains the entire content of a wav file
  31. private double computeSnr(String id, byte[] buffer, int validBufferLen){
  32. // computing the snr value for the wav file (supplied as byte buffer array), skipped here
  33. }

執行查詢:

  1. set odps.task.major.version=2dot0_demo_flighting;
  2. set odps.sql.planner.mode=lot;
  3. set odps.sql.ddl.odps2=true;
  4. set odps.sql.preparse.odps2=lot;
  5. select sentence_snr, id
  6. from speech_sentence_snr_external
  7. where sentence_snr > 10.0;

可獲得計算結果:

  1. --------------------------------------------------------------
  2. | sentence_snr | id |
  3. --------------------------------------------------------------
  4. | 34.4703 | J310209090013_H02_K03_042 |
  5. --------------------------------------------------------------
  6. | 31.3905 | tsh148_seg_2_3013_3_6_48_80bd359827e24dd7_0 |
  7. --------------------------------------------------------------
  8. | 35.4774 | tsh148_seg_3013_1_31_11_9d7c87aef9f3e559_0 |
  9. --------------------------------------------------------------
  10. | 16.0462 | tsh148_seg_3013_2_29_49_f4cb0990a6b4060c_0 |
  11. --------------------------------------------------------------
  12. | 14.5568 | tsh_148_3013_5_13_47_3d5008d792408f81_0 |
  13. --------------------------------------------------------------

可以看到,通過自定義Extractor,我們在SQL語句上即可分布式地處理多個OSS上語音數據文件。同樣的,用類似的方法,我們可以方便的利用MaxCompute的大規模計算能力,完成對圖像,視頻等各種類型非結構化數據的處理。

最後更新:2016-12-15 13:25:27

  上一篇:go 編寫Graph__快速開始_大數據計算服務-阿裏雲
  下一篇:go 項目空間__基本概念_基本介紹_大數據計算服務-阿裏雲