閱讀822 返回首頁    go 微信


Java快速開始__快速入門_批量計算-阿裏雲

Java快速開始例子

本文檔將介紹如何使用 Java 版 SDK 來提交一個作業,目的是統計一個日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

如果您還沒開通批量計算服務,請先開通

步驟預覽

  • 作業準備
    • 上傳數據文件到OSS
    • 使用示例代碼
    • 編譯打包
    • 上傳到OSS
  • 使用SDK創建(提交)作業
  • 查看結果

1. 作業準備

本作業是統計一個日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

該作業包含3個任務: split, count 和 merge:

  • split 任務會把日誌文件分成 3 份。
  • count 任務會統計每份日誌文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數 (count 任務需要配置InstanceCount為3,表示同時啟動3台機器運行個 count 程序)。
  • merge 任務會把 count 任務的結果統一合並起來。

DAG圖例:

DAG圖例

(1) 上傳數據文件到OSS

下載本例子所需的數據: log-count-data.txt

將 log-count-data.txt 上傳到:

oss://your-bucket/log-count/log-count-data.txt

  • your-bucket如表示您自己創建的bucket,本例子假設region為: cn-shenzhen.
  • 如何上傳到OSS,請參考OSS上傳文檔

(2) 使用示例代碼

這裏我們將采用Java來編寫作業任務,使用maven來編譯,推薦使用IDEA:https://www.jetbrains.com/idea/download/ 選擇Community版本(免費).

示例程序下載:java-log-count.zip

這是一個maven工程。

  • 注意:無需修改代碼。

(3) 編譯打包

運行命令編譯打包:

  1. mvn package

即可在target得到下麵3個jar包:

  1. batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
  2. batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
  3. batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar

再將3個jar包,打成一個tar.gz壓縮包,命令如下:

  1. > cd target #進入target目錄
  2. > tar -czf worker.tar.gz *SNAPSHOT-*.jar #打包

運行以下命令,查看包的內容是否正確:

  1. > tar -tvf worker.tar.gz
  2. batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
  3. batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
  4. batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar
  • 注意:BatchCompute 隻支持以 tar.gz 為後綴的壓縮包, 請注意務必用以上方式(gzip)打包, 否則將會無法解析。

(4) 上傳到OSS

本例將 worke.tar.gz 上傳到 OSS 的 your-bucket 中:

oss://your-bucket/log-count/worker.tar.gz

  • 如要運行本例子,您需要創建自己的bucket,並且把worker.tar.gz文件上傳至您自己創建的bucket的路徑下。

2. 使用SDK創建(提交)作業

(1) 新建一個maven工程

在pom.xml中增加以下dependencies:

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.aliyun</groupId>
  4. <artifactId>aliyun-java-sdk-batchcompute</artifactId>
  5. <version>3.1.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>com.aliyun</groupId>
  9. <artifactId>aliyun-java-sdk-core</artifactId>
  10. <version>3.0.3</version>
  11. </dependency>
  12. </dependencies>

(2) 新建一個java類: Demo.java

提交作業需要指定集群ID或者使用匿名集群參數。本例子使用匿名集群方式進行。匿名集群需要配置2個參數, 其中:

  • 可用的鏡像ID, 可以使用係統提供的Image,也可以自行製作鏡像, 請看使用鏡像
  • 實例規格(InstanceType,實例類型),請看 目前支持類型

在 OSS 中創建存儲StdoutRedirectPath(程序輸出結果)和StderrRedirectPath(錯誤日誌)的文件路徑,本例中創建的路徑為

oss://your-bucket/log-count/logs/

  • 如需運行本例子,請按照上文所述的變量獲取以及與上文對應的您的OSS路徑對程序中注釋中的變量進行修改。

Java SDK 提交程序模板如下,程序中具體參數含義請參照 SDK接口說明

Demo.java:

  1. /*
  2. * IMAGE_ID:ECS鏡像,由上文所述獲取
  3. * INSTANCE_TYPE: 實例類型,由上文所述獲取
  4. * REGION_ID:區域為青島/杭州,目前隻有青島開通,此項需與上文OSS存儲worker的bucket地域一致
  5. * ACCESS_KEY_ID: AccessKeyId可以由上文所述獲取
  6. * ACCESS_KEY_SECRET: AccessKeySecret可以由上文所述獲取
  7. * WORKER_PATH:由上文所述打包上傳的worker的OSS存儲路徑
  8. * LOG_PATH:錯誤反饋和task輸出的存儲路徑,logs文件需事先自行創建
  9. */
  10. import com.aliyuncs.batchcompute.main.v20151111.*;
  11. import com.aliyuncs.batchcompute.model.v20151111.*;
  12. import com.aliyuncs.batchcompute.pojo.v20151111.*;
  13. import com.aliyuncs.exceptions.ClientException;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. public class Demo {
  17. static String IMAGE_ID = "img-ubuntu";; //這裏填寫您的 ECS 鏡像ID
  18. static String INSTANCE_TYPE = "bcs.a2.large"; //根據region填寫合適的InstanceType
  19. static String REGION_ID = "cn-shenzhen"; //這裏填寫region
  20. static String ACCESS_KEY_ID = ""; //"your-AccessKeyId"; 這裏填寫您的AccessKeyId
  21. static String ACCESS_KEY_SECRET = ""; //"your-AccessKeySecret"; 這裏填寫您的AccessKeySecret
  22. static String WORKER_PATH = ""; //"oss://your-bucket/log-count/worker.tar.gz"; // 這裏填寫您上傳的worker.tar.gz的OSS存儲路徑
  23. static String LOG_PATH = ""; // "oss://your-bucket/log-count/logs/"; // 這裏填寫您創建的錯誤反饋和task輸出的OSS存儲路徑
  24. static String MOUNT_PATH = ""; // "oss://your-bucket/log-count/";
  25. public static void main(String[] args){
  26. /** 構造 BatchCompute 客戶端 */
  27. BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
  28. try{
  29. /** 構造 Job 對象 */
  30. JobDescription jobDescription = genJobDescription();
  31. //創建Job
  32. CreateJobResponse response = client.createJob(jobDescription);
  33. //創建成功後,返回jobId
  34. String jobId = response.getJobId();
  35. System.out.println("Job created success, got jobId: "+jobId);
  36. //查詢job狀態
  37. GetJobResponse getJobResponse = client.getJob(jobId);
  38. Job job = getJobResponse.getJob();
  39. System.out.println("Job state:"+job.getState());
  40. } catch (ClientException e) {
  41. e.printStackTrace();
  42. System.out.println("Job created failed, errorCode:"+ e.getErrCode()+", errorMessage:"+e.getErrMsg());
  43. }
  44. }
  45. private static JobDescription genJobDescription(){
  46. JobDescription jobDescription = new JobDescription();
  47. jobDescription.setName("java-log-count");
  48. jobDescription.setPriority(0);
  49. jobDescription.setDescription("log-count demo");
  50. jobDescription.setJobFailOnInstanceFail(true);
  51. jobDescription.setType("DAG");
  52. DAG taskDag = new DAG();
  53. /** 添加 split task */
  54. TaskDescription splitTask = genTaskDescription();
  55. splitTask.setTaskName("split");
  56. splitTask.setInstanceCount(1);
  57. splitTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar");
  58. taskDag.addTask(splitTask);
  59. /** 添加 count task */
  60. TaskDescription countTask = genTaskDescription();
  61. countTask.setTaskName("count");
  62. countTask.setInstanceCount(3);
  63. countTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar");
  64. taskDag.addTask(countTask);
  65. /** 添加 merge task */
  66. TaskDescription mergeTask = genTaskDescription();
  67. mergeTask.setTaskName("merge");
  68. mergeTask.setInstanceCount(1);
  69. mergeTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar");
  70. taskDag.addTask(mergeTask);
  71. /** 添加Task依賴: split-->count-->merge */
  72. List<String> taskNameTargets = new ArrayList();
  73. taskNameTargets.add("merge");
  74. taskDag.addDependencies("count", taskNameTargets);
  75. List<String> taskNameTargets2 = new ArrayList();
  76. taskNameTargets2.add("count");
  77. taskDag.addDependencies("split", taskNameTargets2);
  78. //dag
  79. jobDescription.setDag(taskDag);
  80. return jobDescription;
  81. }
  82. private static TaskDescription genTaskDescription(){
  83. AutoCluster autoCluster = new AutoCluster();
  84. autoCluster.setInstanceType(INSTANCE_TYPE);
  85. autoCluster.setImageId(IMAGE_ID);
  86. //autoCluster.setResourceType("OnDemand");
  87. TaskDescription task = new TaskDescription();
  88. //task.setTaskName("Find");
  89. //打包上傳的作業的OSS全路徑
  90. Parameters p = new Parameters();
  91. Command cmd = new Command();
  92. //cmd.setCommandLine("");
  93. //打包上傳的作業的OSS全路徑
  94. cmd.setPackagePath(WORKER_PATH);
  95. p.setCommand(cmd);
  96. //錯誤反饋存儲路徑
  97. p.setStderrRedirectPath(LOG_PATH);
  98. //最終結果輸出存儲路
  99. p.setStdoutRedirectPath(LOG_PATH);
  100. task.setParameters(p);
  101. task.addInputMapping(MOUNT_PATH, "/home/input");
  102. task.addOutputMapping("/home/output",MOUNT_PATH);
  103. task.setAutoCluster(autoCluster);
  104. //task.setClusterId(clusterId);
  105. task.setTimeout(30000); /* 30000 秒*/
  106. task.setInstanceCount(1); /** 使用1個實例來運行 */
  107. return task;
  108. }
  109. }

正常輸出樣例:

  1. Job created success, got jobId: job-01010100010192397211
  2. Job state:Waiting

3. 查看結果

您可以登錄batchcompute控製台查看job狀態。

Job運行結束,您可以登錄OSS控製台 查看your-bucket 這個bucket下麵的這個文件:/log-count/merge_result.json。

內容應該如下:

  1. {"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}

最後更新:2016-12-05 17:29:54

  上一篇:go 命令行快速開始2__快速入門_批量計算-阿裏雲
  下一篇:go Python快速開始__快速入門_批量計算-阿裏雲