使用 Instance Tunnel 獲取 Maxcompute Instance 的執行結果
本篇將介紹如何使用 Instance Tunnel 來獲取 Maxcompute Instance 執行結果。
源起
每天我們都會在 Maxcompute 平台上提交 select query,用於查詢特定的數據。然而,熟悉平台的同學都知道,從平台獲取 sql 查詢結果是一個 Restful 請求,可能碰到以下兩個問題:
- 1 獲取數據超時。 如果數據分布在多個存儲小文件上,平台需要花費大量時間來收集和歸並這些數據。然而在這個漫長的歸並過程中,獲取數據的 Restful 請求可能已經超時了。此時 Maxcompute https://yq.aliyun.com/articles/Console 會有如下警告:
Warning: ODPS request failed, requestID:xxxx, retryCount:1, will retry in xxx seconds.
- 2 獲取數據量受限。 由於一次 Restful 請求的返回數據有限,且一次性獲取全量數據到本地時可能將內存撐爆等問題,Maxcompute SQL 的查詢結果條數是受限的,具體的數值為 project 上的配置項
READ_TABLE_MAX_ROW
(默認為 10000)。這就會出現明明我們的查詢結果 2W 條,最後卻在 Maxcompute https://yq.aliyun.com/articles/Console 或者 Logview 上隻看到 1W條 的詭異情況了。
針對上述問題,提出下列解法,其中解法 3 可解決上述兩種問題,解法 1、2 僅適用於問題 1:
- 1 合並存儲小文件。 如果查詢語句的數據源隻有一張表,那麼可以在 Maxcompute https://yq.aliyun.com/articles/Console 執行
alter table <source_table_name> merge smallfiles ;
命令將小文件合並之後,再重新執行 select 查詢。 - 2 創建臨時表,合並臨時表的存儲小文件。 如果是一個多表計算的查詢結果,可以通過
create table <tmp_table_name> as select ....
命令創建臨時表,再參照解法 a 合並臨時表的存儲小文件。 - 3 創建臨時表,使用 Tunnel 下載。 Tunnel 是 Maxcompute 批量數據通道。因此我們可以通過
create table <tmp_table_name> as select ....
命令創建臨時表,然後通過 Tunnel 命令或者 Tunnel SDK 來將臨時表數據下載下來。
Instance Tunnel 概述
上麵的方法雖然能解決問題,但總是有那麼點"繞",有那麼點"費"。
有沒有更加直接的方法呢?
如果是之前,隻能慫了,但是現在我們要大聲的說有!
在最近的 Maxcompute 版本(>=S27) 中,我們開發了 **Instanc Tunnel **功能。
Instance Tunnel 提供使用 Tunnel 來下載 SQL 查詢結果的功能,不僅能擺脫上述兩類問題,可直接獲取查詢結果;還豐富了 Maxcompute Tunnel 下載通道,不再局限於表數據。換句話說,以前我們可以用 Tunnel 來下載 Maxcompute 表數據,如今,我們也可以用 Tunnel 來下載 Maxcompute Instance 的數據。
Instance Tunnel 使用
Instance Tunnel 的使用方式與 Table Tunnel 基本一致,下麵分別介紹使用客戶端和 SDK 來下載 Instance 執行結果的方法。
1. 用 Maxcompute https://yq.aliyun.com/articles/Console 來獲取 Instance 數據
1.1 使用 Tunnel download 命令將特定 Instance 的執行結果下載到本地文件
命令:tunnel download instance://<[project_name/]instance_id> <path>
參數:
project_name: instance 所在的項目名稱;
instance_id: 待下載數據的 instance id
舉例:
// 執行一條 select 查詢:
odps@ odps_test_project>select * from wc_in;
ID = 20170724071705393ge3csfb8
... ...
// 使用 Instance Tunnel Download 命令下載執行結果到本地文件
odps@ odps_test_project>tunnel download instance://20170724071705393ge3csfb8 result;
2017-07-24 15:18:47 - new session: 2017072415184785b6516400090ca8 total lines: 8
2017-07-24 15:18:47 - file [0]: [0, 8), result
downloading 8 records into 1 file
2017-07-24 15:18:47 - file [0] start
2017-07-24 15:18:48 - file [0] OK. total: 44 bytes
download OK
// 查看結果
cat result
slkdfj
hellp
apple
tea
peach
apple
tea
teaa
1.2 通過配置參數使 SQL 查詢默認采用 Instance Tunnel 方式輸出執行結果
在 Maxcompute https://yq.aliyun.com/articles/Console 中打開 use_instance_tunnel
選項之後,執行的 select query 就會默認使用 Instance tunnel 來下載結果了,再也不會出現文章開頭所描述的兩種問題了。打開該配置有兩種方法:
- 1) 如果已經下載最新的 https://yq.aliyun.com/articles/Console,odps_config.ini 裏麵已經默認打開此選項,並默認將 instance_tunnel_max_record 設置成了10000 。 如下所示:
# download sql results by instance tunnel
use_instance_tunnel=true
# the max records when download sql results by instance tunnel
instance_tunnel_max_record=10000
其中
instance_tunnel_max_record
表示使用 Instance tunnel 下載 sql 查詢結果的條數。若不設置,下載條數不受限。
- 2) 使用
set console.sql.result.instancetunnel=true;
開啟此功能。
// 打開 Instance tunnel 選項
odps@ odps_test_tunnel_project>set console.sql.result.instancetunnel=true;
OK
// 運行 select query
odps@ odps_test_tunnel_project>select * from wc_in;
ID = 20170724081946458g14csfb8
Log view:
https://logview/xxxxx.....
+------------+
| key |
+------------+
| slkdfj |
| hellp |
| apple |
| tea |
| peach |
| apple |
| tea |
| teaa |
+------------+
A total of 8 records fetched by instance tunnel.
可以看到,如果使用 Instance tunnel 的方式來輸出 select 查詢結果,會在最後打印一條提示。比如上麵例子中的提示告訴我們這個 instance 的執行結果一共有 8 條數據。同樣也可以
set console.sql.result.instancetunnel=false;
來關閉此功能。
2. 用 MaxCompute Instance Tunnel SDK 來獲取 Instance 執行結果
MaxCompute Java SDK 和 Python SDK 都對 Instance tunnel 進行了支持,下麵介紹用法。
2.1 使用 Java SDK 獲取 Instance 執行結果
對於 Java SDK,從 0.27.2-public
(JavaDoc )版本開始,我們提供兩種方式來獲取數據。
- 使用
SQLTask.getResultSet()
靜態方法獲取:
Odps odps = OdpsUtils.newDefaultOdps(); // 初始化 Odps 對象
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
// 根據 instance 對象,獲取結果迭代器
ResultSet rs = SQLTask.getResultSet(i);
for (Record r : rs) {
// 輸出結果條數
System.out.println(rs.getRecordCount());
for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) {
// wc_in 表字段均為 STRING, 這裏就直接打印輸出
System.out.println(r.get(col));
}
}
- 創建
InstanceTunnel.DownloadSession
來獲取:
Odps odps = OdpsUtils.newDefaultOdps(); // 初始化 Odps 對象
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
// 創建 InstanceTunnel
InstanceTunnel tunnel = new InstanceTunnel(odps);
// 根據 instance id,創建 DownloadSession
InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId());
long count = session.getRecordCount();
// 輸出結果條數
System.out.println(count);
// 獲取數據的寫法與 TableTunnel 一樣
TunnelRecordReader reader = session.openRecordReader(0, count);
Record record;
while ((record = reader.read()) != null) {
for (int col = 0; col < session.getSchema().getColumns().size(); ++col) {
// wc_in 表字段均為 STRING, 這裏就直接打印輸出
System.out.println(record.get(col));
}
}
reader.close();
2.2 使用 PyODPS 獲取 Instance 執行結果
對於 PyODPS 來說,我們可以在 instance 上通過 open_reader
來獲取數據,而從 0.7.7.1 的版本開始,我們可以通過 open_reader
使用 instance tunnel 來獲取全量數據。
instance = o.execute_sql('select * from movielens_ratings limit 20000')
with instance.open_reader() as reader:
print(reader.count)
# for record in reader 就是遍曆這2萬條數據,這裏通過切片隻取10條
for record in reader[:10]:
print(record)
Instance Tunnel 約束
雖然 Instance Tunnel 為我們提供了非常方便的獲取 Instance 執行結果的方式,但是為了保護用戶數據安全,此功能也受到了諸多的權限約束:
- 若使用 Instance Tunnel 下載數據時,數據條數不超過 1W,則隻要對該 Instance 有 Read 權限即可使用。 此行為與使用 Restful API 獲取查詢數據的行為一致;
- 若使用 Instance Tunnel 下載數據時,數據條數超過 1W,則需要對 instance sql 查詢語句中涉及到的所有源表進行權限檢查,用戶需要具有這些表的 Read 權限才可。
相關資料
- MaxCompute Java SDK JavaDoc
- MaxCompute https://yq.aliyun.com/articles/Console 下載
- MaxCompute PyODPS 文檔
Instance Tunnel 剛剛上線,歡迎討論和吐槽 ^_^
- Java SDK 和客戶端使用,可聯係作者
- PyOdps 釘釘群:11701793
最後更新:2017-07-24 18:02:37