174
windows
Python快速開始__快速入門_批量計算-阿裏雲
本文檔將介紹如何使用 Python 版 SDK 來提交一個作業,目的是統計一個日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。
如果您還沒開通批量計算服務,請先開通。
步驟預覽
- 作業準備
- 上傳數據文件到OSS
- 上傳任務程序到OSS
- 使用SDK創建(提交)作業
- 查看結果
1. 作業準備
本作業是統計一個日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。
該作業包含3個任務: split, count 和 merge:
- split 任務會把日誌文件分成 3 份。
- count 任務會統計每份日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數 (count 任務需要配置InstanceCount為3,表示同時啟動3台機器運行個 count 程序)。
- merge 任務會把 count 任務的結果統一合並起來。
DAG圖例:
(1) 上傳數據文件到OSS
下載本例子所需的數據: log-count-data.txt
將 log-count-data.txt 上傳到:
oss://your-bucket/log-count/log-count-data.txt
- your-bucket如表示您自己創建的bucket,本例子假設region為: cn-shenzhen.
- 如何上傳到OSS,請參考OSS上傳文檔。
(2) 上傳任務程序到OSS
本例子的作業程序是使用python編寫的, 下載本例子所需的程序: log-count.tar.gz
本例子不需要改動示例代碼。直接將 log-count.tar.gz 上傳到 oss,如上傳到:
oss://your-bucket/log-count/log-count.tar.gz。
如何上傳前麵已經講過。
- BatchCompute 隻支持以 tar.gz 為後綴的壓縮包, 請注意務必用以上方式(gzip)打包, 否則將會無法解析。
如果你要修改代碼,可以解壓後修改,然後要用下麵的方法打包:
命令如下:
> cd log-count #進入目錄
> tar -czf log-count.tar.gz * #打包,將所有這個目錄下的文件打包到 log-count.tar.gz
可以運行這條命令查看壓縮包內容:
$ tar -tvf log-count.tar.gz
可以看到以下列表:
conf.py
count.py
merge.py
split.py
2. 使用SDK創建(提交)作業
python SDK 的相關下載與安裝請參閱這裏。
v20151111版本,提交作業需要指定集群ID或者使用匿名集群參數。本例子使用匿名集群方式進行。匿名集群需要配置2個參數, 其中:
在 OSS 中創建存儲StdoutRedirectPath(程序輸出結果)和StderrRedirectPath(錯誤日誌)的文件路徑,本例中創建的路徑為
oss://your-bucket/log-count/logs/
- 如需運行本例子,請按照上文所述的變量獲取以及與上文對應的您的OSS路徑對程序中注釋中的變量進行修改。
Python SDK 提交程序模板如下,程序中具體參數含義請參照這裏。
#encoding=utf-8
import sys
from batchcompute import Client, ClientError
from batchcompute import CN_SHENZHEN as REGION
from batchcompute.resources import (
JobDescription, TaskDescription, DAG, AutoCluster
)
ACCESS_KEY_ID='' # 填寫你的AK
ACCESS_KEY_SECRET='' # 填寫你的AK
IMAGE_ID = 'img-ubuntu' #這裏填寫您的鏡像ID
INSTANCE_TYPE = 'ecs.s3.large' # 根據實際region支持的InstanceType 填寫
WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz' 這裏填寫您上傳的log-count.tar.gz的OSS存儲路徑
LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' 這裏填寫您創建的錯誤反饋和task輸出的OSS存儲路徑
OSS_MOUNT= '' # 'oss://your-bucket/log-count/' 同時掛載到/home/inputs 和 /home/outputs
client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
def main():
try:
job_desc = JobDescription()
# Create auto cluster.
cluster = AutoCluster()
cluster.InstanceType = INSTANCE_TYPE
cluster.ResourceType = "OnDemand"
cluster.ImageId = IMAGE_ID
# Create split task.
split_task = TaskDescription()
split_task.Parameters.Command.CommandLine = "python split.py"
split_task.Parameters.Command.PackagePath = WORKER_PATH
split_task.Parameters.StdoutRedirectPath = LOG_PATH
split_task.Parameters.StderrRedirectPath = LOG_PATH
split_task.InstanceCount = 1
split_task.AutoCluster = cluster
split_task.InputMapping[OSS_MOUNT]='/home/input'
split_task.OutputMapping['/home/output'] = OSS_MOUNT
# Create map task.
count_task = TaskDescription(split_task)
count_task.Parameters.Command.CommandLine = "python count.py"
count_task.InstanceCount = 3
count_task.InputMapping[OSS_MOUNT] = '/home/input'
count_task.OutputMapping['/home/output'] = OSS_MOUNT
# Create merge task
merge_task = TaskDescription(split_task)
merge_task.Parameters.Command.CommandLine = "python merge.py"
merge_task.InstanceCount = 1
merge_task.InputMapping[OSS_MOUNT] = '/home/input'
merge_task.OutputMapping['/home/output'] = OSS_MOUNT
# Create task dag.
task_dag = DAG()
task_dag.add_task(task_name="split", task=split_task)
task_dag.add_task(task_name="count", task=count_task)
task_dag.add_task(task_name="merge", task=merge_task)
task_dag.Dependencies = {
'split': ['count'],
'count': ['merge']
}
# Create job description.
job_desc.DAG = task_dag
job_desc.Priority = 99 # 0-1000
job_desc.Name = "log-count"
job_desc.Description = "PythonSDKDemo"
job_desc.JobFailOnInstanceFail = True
job_id = client.create_job(job_desc).Id
print('job created: %s' % job_id)
print('waiting for job finished ...')
# Wait for job finished.
errs = client.poll(job_id)
if errs:
print ("Some errors occur: %s" % 'n'.join(errs))
return 1
else:
print ("Success!")
return 0
except ClientError, e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
if __name__ == '__main__':
sys.exit(main())
3. 查看結果
您可以登錄OSS控製台 查看your-bucket 這個bucket下麵的這個文件:/log-count/merge_result.json。
內容應該如下:
{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}
最後更新:2016-11-23 17:16:09
上一篇:
Java快速開始__快速入門_批量計算-阿裏雲
下一篇:
如何提交作業__操作指南_批量計算-阿裏雲
安裝步驟__快速開始_Eclipse 插件-阿裏雲
名詞解釋及說明__產品簡介_移動推送-阿裏雲
Java SDK實例程序__最佳實踐_歸檔存儲-阿裏雲
GetAccountAlias__安全設置接口_RAM API文檔_訪問控製-阿裏雲
使用 psql 命令遷移 PostgreSQL 數據__快速入門(PostgreSQL)_雲數據庫 RDS 版-阿裏雲
設備新增與留存__查詢相關_API 列表_OpenAPI 2.0_移動推送-阿裏雲
通過鏡像創建 Nginx__快速入門_容器服務-阿裏雲
問題解答__常見問題_Eclipse 插件-阿裏雲
RDS for MySQL 如何定位本地 IP___常見問題_雲數據庫 RDS 版-阿裏雲
產品與技術__產品簡介_數據集成-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲