閱讀174 返回首頁    go windows


Python快速開始__快速入門_批量計算-阿裏雲

本文檔將介紹如何使用 Python 版 SDK 來提交一個作業,目的是統計一個日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

如果您還沒開通批量計算服務,請先開通

步驟預覽

  • 作業準備
    • 上傳數據文件到OSS
    • 上傳任務程序到OSS
  • 使用SDK創建(提交)作業
  • 查看結果

1. 作業準備

本作業是統計一個日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

該作業包含3個任務: split, count 和 merge:

  • split 任務會把日誌文件分成 3 份。
  • count 任務會統計每份日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數 (count 任務需要配置InstanceCount為3,表示同時啟動3台機器運行個 count 程序)。
  • merge 任務會把 count 任務的結果統一合並起來。

DAG圖例:

DAG圖例

(1) 上傳數據文件到OSS

下載本例子所需的數據: log-count-data.txt

將 log-count-data.txt 上傳到:

oss://your-bucket/log-count/log-count-data.txt

  • your-bucket如表示您自己創建的bucket,本例子假設region為: cn-shenzhen.
  • 如何上傳到OSS,請參考OSS上傳文檔

(2) 上傳任務程序到OSS

本例子的作業程序是使用python編寫的, 下載本例子所需的程序: log-count.tar.gz

本例子不需要改動示例代碼。直接將 log-count.tar.gz 上傳到 oss,如上傳到:

oss://your-bucket/log-count/log-count.tar.gz。

如何上傳前麵已經講過。

  • BatchCompute 隻支持以 tar.gz 為後綴的壓縮包, 請注意務必用以上方式(gzip)打包, 否則將會無法解析。
  • 如果你要修改代碼,可以解壓後修改,然後要用下麵的方法打包:

    命令如下:

    1. > cd log-count #進入目錄
    2. > tar -czf log-count.tar.gz * #打包,將所有這個目錄下的文件打包到 log-count.tar.gz

    可以運行這條命令查看壓縮包內容:

    1. $ tar -tvf log-count.tar.gz

    可以看到以下列表:

    1. conf.py
    2. count.py
    3. merge.py
    4. split.py

2. 使用SDK創建(提交)作業

python SDK 的相關下載與安裝請參閱這裏

v20151111版本,提交作業需要指定集群ID或者使用匿名集群參數。本例子使用匿名集群方式進行。匿名集群需要配置2個參數, 其中:

  • 可用的鏡像ID, 可以使用係統提供的Image,也可以自行製作鏡像, 請看使用鏡像
  • 實例規格(InstanceType,實例類型),請看 目前支持類型

在 OSS 中創建存儲StdoutRedirectPath(程序輸出結果)和StderrRedirectPath(錯誤日誌)的文件路徑,本例中創建的路徑為

oss://your-bucket/log-count/logs/

  • 如需運行本例子,請按照上文所述的變量獲取以及與上文對應的您的OSS路徑對程序中注釋中的變量進行修改。

Python SDK 提交程序模板如下,程序中具體參數含義請參照這裏

  1. #encoding=utf-8
  2. import sys
  3. from batchcompute import Client, ClientError
  4. from batchcompute import CN_SHENZHEN as REGION
  5. from batchcompute.resources import (
  6. JobDescription, TaskDescription, DAG, AutoCluster
  7. )
  8. ACCESS_KEY_ID='' # 填寫你的AK
  9. ACCESS_KEY_SECRET='' # 填寫你的AK
  10. IMAGE_ID = 'img-ubuntu' #這裏填寫您的鏡像ID
  11. INSTANCE_TYPE = 'ecs.s3.large' # 根據實際region支持的InstanceType 填寫
  12. WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz' 這裏填寫您上傳的log-count.tar.gz的OSS存儲路徑
  13. LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' 這裏填寫您創建的錯誤反饋和task輸出的OSS存儲路徑
  14. OSS_MOUNT= '' # 'oss://your-bucket/log-count/' 同時掛載到/home/inputs 和 /home/outputs
  15. client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
  16. def main():
  17. try:
  18. job_desc = JobDescription()
  19. # Create auto cluster.
  20. cluster = AutoCluster()
  21. cluster.InstanceType = INSTANCE_TYPE
  22. cluster.ResourceType = "OnDemand"
  23. cluster.ImageId = IMAGE_ID
  24. # Create split task.
  25. split_task = TaskDescription()
  26. split_task.Parameters.Command.CommandLine = "python split.py"
  27. split_task.Parameters.Command.PackagePath = WORKER_PATH
  28. split_task.Parameters.StdoutRedirectPath = LOG_PATH
  29. split_task.Parameters.StderrRedirectPath = LOG_PATH
  30. split_task.InstanceCount = 1
  31. split_task.AutoCluster = cluster
  32. split_task.InputMapping[OSS_MOUNT]='/home/input'
  33. split_task.OutputMapping['/home/output'] = OSS_MOUNT
  34. # Create map task.
  35. count_task = TaskDescription(split_task)
  36. count_task.Parameters.Command.CommandLine = "python count.py"
  37. count_task.InstanceCount = 3
  38. count_task.InputMapping[OSS_MOUNT] = '/home/input'
  39. count_task.OutputMapping['/home/output'] = OSS_MOUNT
  40. # Create merge task
  41. merge_task = TaskDescription(split_task)
  42. merge_task.Parameters.Command.CommandLine = "python merge.py"
  43. merge_task.InstanceCount = 1
  44. merge_task.InputMapping[OSS_MOUNT] = '/home/input'
  45. merge_task.OutputMapping['/home/output'] = OSS_MOUNT
  46. # Create task dag.
  47. task_dag = DAG()
  48. task_dag.add_task(task_name="split", task=split_task)
  49. task_dag.add_task(task_name="count", task=count_task)
  50. task_dag.add_task(task_name="merge", task=merge_task)
  51. task_dag.Dependencies = {
  52. 'split': ['count'],
  53. 'count': ['merge']
  54. }
  55. # Create job description.
  56. job_desc.DAG = task_dag
  57. job_desc.Priority = 99 # 0-1000
  58. job_desc.Name = "log-count"
  59. job_desc.Description = "PythonSDKDemo"
  60. job_desc.JobFailOnInstanceFail = True
  61. job_id = client.create_job(job_desc).Id
  62. print('job created: %s' % job_id)
  63. print('waiting for job finished ...')
  64. # Wait for job finished.
  65. errs = client.poll(job_id)
  66. if errs:
  67. print ("Some errors occur: %s" % 'n'.join(errs))
  68. return 1
  69. else:
  70. print ("Success!")
  71. return 0
  72. except ClientError, e:
  73. print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
  74. if __name__ == '__main__':
  75. sys.exit(main())

3. 查看結果

您可以登錄OSS控製台 查看your-bucket 這個bucket下麵的這個文件:/log-count/merge_result.json。

內容應該如下:

  1. {"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}

最後更新:2016-11-23 17:16:09

  上一篇:go Java快速開始__快速入門_批量計算-阿裏雲
  下一篇:go 如何提交作業__操作指南_批量計算-阿裏雲