閱讀90 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Maxcompute的任務狀態和多任務執行

我們在使用maxcompute的時候,我們其實非常期望知道當前有多少任務在跑,哪些任務耗時長,哪些任務已經完成,並且能通過任務的logview來分析任務耗時長的原因。

任務狀態監控

Maxcompute的任務狀態分Running和Terminated, 其中Running是包含:正在運行和等待運行的兩種狀態,Terminated包含:完成、失敗、cancel的任務三個狀態。阿裏雲提供了獲取上述2種狀態的SDK函數,odps.list_instances(status=Running|Terminated, start_time=開始時間,結束時間)。為了實現秒級別更新任務狀態我們可以用以下思路來實現。

1、對於已經running的任務,我們需要快速更新它的狀態,有可能已經完成了;
2、不斷獲取新的任務狀態。

我們用Mysql來記錄任務的狀態表設計如下:

CREATE TABLE `maxcompute_task` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `instanceid` varchar(255) DEFAULT NULL comment '任務實例ID',
  `logview` varchar(1024) DEFAULT NULL comment 'logview鏈接,查看問題非常有用',
  `start_time` varchar(64) DEFAULT NULL comment '任務開始時間',
  `end_time` varchar(64) DEFAULT NULL comment '任務結束時間',
  `cast_time` varchar(32) DEFAULT NULL comment '耗時',
  `project_name` varchar(255) DEFAULT NULL comment '項目名',
  `status` varchar(64) DEFAULT NULL comment '任務狀態',
  PRIMARY KEY (`id`),
  UNIQUE KEY `instanceid` (`instanceid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

下麵的頁麵可以查看當前的任務耗時,開始時間,對超過1小時的任務顏色使用紅色標注,並且能查看logview,還能對任務進行取消,非常方便。
maxcompute_running

我們來看看代碼的實現:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: lemon

import time
import threading
import traceback
import datetime
from odps import ODPS
from dataflow import config
from libs.myconn import Cursor
from config import DBINFO_BI_MASTER
from libs import logger as _logger

g_table_name = "bi_maxcompute_task"

def save_task(instanceid, odps, mysqlconn):
    # 保存任務狀態到Mysql, 分別傳入odps連接器和mysql連接器
    instance = odps.get_instance(instanceid)
    project_name = odps.project
    status = instance.status.value
    start_time = instance.start_time
    end_time =  instance.end_time

    sql = "select logview,status from {0} where instanceid='{1}'".format(g_table_name, instanceid)

    sqlret = mysqlconn.fetchone(sql)
    if sqlret and sqlret["status"] == "Terminated":
        return
    if sqlret and sqlret["logview"] is not None:
        logview = sqlret["logview"]
    else:
        logview = instance.get_logview_address()
    start_time = start_time + datetime.timedelta(hours=8)
    if status == "Running":
        end_time = datetime.datetime.now()
    else:
        end_time = end_time + datetime.timedelta(hours=8)
    cast_time = end_time - start_time
    colname = "instanceid,start_time,end_time,cast_time,project_name,status,logview"
    values = ",".join(["'{0}'".format(r) for r in [instanceid, str(start_time),str(end_time), cast_time, project_name, status,logview]])
    sql = """replace into {0}({1}) values({2}) """.format(g_table_name, colname, values)
    mysqlconn.execute(sql)


class MaxcomputeTask(threading.Thread):
    # 獲取所有任務
    
    def __init__(self, logger):
        threading.Thread.__init__(self)
        self.logger = logger
        self.hour = 1
        self.status_conf = [("demo", "Running"), ("demo", "Terminated"),
                            ("demo1", "Running"), ("demo1","Terminated")]

    def run(self):
        # 建立mysql連接, 根據你的需要來使用
        self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
        while True:
            try:
                self.start_more()
                time.sleep(10)
            except:
                self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
                self.logger.error(traceback.format_exc())

    def start_more(self,):
        for params in self.status_conf:
            self.get_task(*params)

    def get_task(self, project_name, status):
        odps = ODPS(**config.ODPS_INFO)
        odps.project = project_name
        list = odps.list_instances(status=status, start_time=time.time() - self.hour * 3600)
        self.logger.info("start {0} {1} ".format(project_name, status))
        for row in list:
            save_task(instanceid=str(row), odps=odps, mysqlconn=self.mysqlconn)
        self.logger.info( "end {0} {1}".format(project_name, status))

        
class MaxcomputeTaskRunning(threading.Thread):
    # 更新running任務的狀態
    
    def __init__(self, logger):
        threading.Thread.__init__(self)
        self.logger = logger

    def run(self):
        self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
        while True:
            try:
                self.update_running()
                time.sleep(1)
            except:
                self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER)
                self.logger.error(traceback.format_exc())

    def update_running(self):
        sql = "select instanceid, project_name from {0} where status='Running'".format(g_table_name)
        sqlret = self.mysqlconn.fetchall(sql)
        if not sqlret:
            return

        self.logger.info("{1} running update length:{0}".format(len(sqlret), time.strftime("%Y-%m-%d %H:%M:%S") ))
        for row in sqlret:
            odps = ODPS(**config.ODPS_INFO)
            odps.project = row["project_name"]
            save_task(row["instanceid"], odps, self.mysqlconn)


if __name__ == "__main__":
    # logger是自己編寫的日誌工具類
    logger = _logger.Logger("maxcompute_task.log").getLogger()
    running = MaxcomputeTaskRunning(logger)
    running.setDaemon(True)
    running.start()

    task = MaxcomputeTask(logger)
    task.start()

多任務執行

maxcompute可以在命令行下運行,也可以用SDK,阿裏雲的集成環境跑任務等。很多時候我們麵臨的任務是非常多的,如何做一個多任務的代碼執行器,也是經常遇到的問題。任務執行是一個典型的生產者和消費者的關係,生產者獲取任務,消費者執行任務。這麼做有2個好處。

1)任務執行的數量是需要可控的,如果同時運行的任務不可控勢必對服務器資源造成衝擊,
2)多機運行服務,避免單點故障,maxcompute的任務是運行在雲端的,可以通過instanceid獲取到結果,此結果是保留7天的。

我大致貼一些我們在實際場景種的一些代碼,生產者和消費者的代碼:

class Consumer(threading.Thread):

    def __init__(self, queue, lock):
        threading.Thread.__init__(self)
        self.queue = queue
        self.lock = lock
        self.timeout = 1800


    def run(self):
        self.execute = Execute()
        logger.info("consumer %s start" % threading.current_thread().name)
        while G_RUN_FLAG:
            try:
                task = self.queue.get()
                self.execute.start(task)
            except:
                logger.error(traceback.format_exc())


class Producter(threading.Thread):

    def __init__(self, queue, lock):
        threading.Thread.__init__(self)
        self.queue = queue
        self.lock = lock
        self.sleep_time = 30
        self.step_sleep_time = 5

    def run(self):
        self.mysqlconn_bi_master = Cursor.new(**config.DBINFO_BI_MASTER)
        logger.info("producter %s start" % threading.current_thread().name)
        while G_RUN_FLAG:
            
            if self.queue.qsize() >= QUEUE_SIZE:
                time.sleep(self.sleep_time)
                continue

            # TODO
            self.queue.put(task)
            time.sleep(self.step_sleep_time)


def main():
    queue = Queue.LifoQueue(QUEUE_SIZE)
    lock = threading.RLock()

    for _ in xrange(MAX_PROCESS_NUM):
        consumer = Consumer(queue, lock)
        consumer.setDaemon(True)
        consumer.start()

    producter = Producter(queue, lock)
    producter.start()
    producter.join()

def signal_runflag(sig, frame):
    global G_RUN_FLAG
    if sig == signal.SIGHUP:
        logger.info("receive HUP signal ")
        G_RUN_FLAG = False

if __name__ == "__main__":
    logger.info("execute run")
    if platform.system() == "Linux":
        signal.signal(signal.SIGHUP, signal_runflag)
    main()
    logger.info("execute exit.")

Maxcompute實際執行時的代碼:

 def _max_compute_run(self, taskid, sql):

        # 異步的方式執行
        hints = {
            'odps.sql.planner.mode': 'lot',
            'odps.sql.ddl.odps2': 'true',
            'odps.sql.preparse.odps2': 'lot',
            'odps.service.mode': 'off',
            'odps.task.major.version': '2dot0_demo_flighting',
            'odps.sql.hive.compatible': 'true'
        }
        new_sql = "{0}".format(sql)
        instance = self.odps.run_sql(new_sql, hints=hints)
        #instance = self.odps.run_sql(sql)

        # 異步的方式執行
        # instance = self.odps.run_sql(sql)
        self._save_task_instance_id(taskid, instance.id)
        # 阻塞直到完成
        instance.wait_for_success()
        return instance.id

獲取結果時的代碼:

def instance_result(odps, instance_id):
    # 通過instance_id 獲取結果
    instance = odps.get_instance(instance_id)
    response = []
    with instance.open_reader() as reader:
        raw_response = [r.values for r in reader]
        column_names = reader._schema.names
        for  line in raw_response:
            tmp = {}
            for i in range(len(line)):
                tmp[column_names[i]] = line[i]
            response.append(tmp)
    return response

總結:

阿裏雲的Maxcompute是非常好用的雲計算服務,它的更新和迭代速度都非常快,使用阿裏雲解放工程師的搭建基礎服務的時間,讓我們更多的專注業務,站在巨人的肩膀上聰明的幹活。

最後更新:2017-07-26 09:32:45

  上一篇:go  首次曝光!在線視頻衣物精確檢索技術,開啟刷劇敗明星同款時代
  下一篇:go  maxCompute平台非標準日期和氣象數據處理方法--電力AI賽Rank 20解決方案