Maxcompute的任務狀態和多任務執行
我們在使用maxcompute的時候,我們其實非常期望知道當前有多少任務在跑,哪些任務耗時長,哪些任務已經完成,並且能通過任務的logview來分析任務耗時長的原因。
任務狀態監控
Maxcompute的任務狀態分Running和Terminated, 其中Running是包含:正在運行和等待運行的兩種狀態,Terminated包含:完成、失敗、cancel的任務三個狀態。阿裏雲提供了獲取上述2種狀態的SDK函數,odps.list_instances(status=Running|Terminated, start_time=開始時間,結束時間)。為了實現秒級別更新任務狀態我們可以用以下思路來實現。
1、對於已經running的任務,我們需要快速更新它的狀態,有可能已經完成了;
2、不斷獲取新的任務狀態。
我們用Mysql來記錄任務的狀態表設計如下:
CREATE TABLE `maxcompute_task` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `instanceid` varchar(255) DEFAULT NULL comment '任務實例ID', `logview` varchar(1024) DEFAULT NULL comment 'logview鏈接,查看問題非常有用', `start_time` varchar(64) DEFAULT NULL comment '任務開始時間', `end_time` varchar(64) DEFAULT NULL comment '任務結束時間', `cast_time` varchar(32) DEFAULT NULL comment '耗時', `project_name` varchar(255) DEFAULT NULL comment '項目名', `status` varchar(64) DEFAULT NULL comment '任務狀態', PRIMARY KEY (`id`), UNIQUE KEY `instanceid` (`instanceid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
下麵的頁麵可以查看當前的任務耗時,開始時間,對超過1小時的任務顏色使用紅色標注,並且能查看logview,還能對任務進行取消,非常方便。
我們來看看代碼的實現:
#!/usr/bin/env python # -*- coding: utf-8 -*- # author: lemon import time import threading import traceback import datetime from odps import ODPS from dataflow import config from libs.myconn import Cursor from config import DBINFO_BI_MASTER from libs import logger as _logger g_table_name = "bi_maxcompute_task" def save_task(instanceid, odps, mysqlconn): # 保存任務狀態到Mysql, 分別傳入odps連接器和mysql連接器 instance = odps.get_instance(instanceid) project_name = odps.project status = instance.status.value start_time = instance.start_time end_time = instance.end_time sql = "select logview,status from {0} where instanceid='{1}'".format(g_table_name, instanceid) sqlret = mysqlconn.fetchone(sql) if sqlret and sqlret["status"] == "Terminated": return if sqlret and sqlret["logview"] is not None: logview = sqlret["logview"] else: logview = instance.get_logview_address() start_time = start_time + datetime.timedelta(hours=8) if status == "Running": end_time = datetime.datetime.now() else: end_time = end_time + datetime.timedelta(hours=8) cast_time = end_time - start_time colname = "instanceid,start_time,end_time,cast_time,project_name,status,logview" values = ",".join(["'{0}'".format(r) for r in [instanceid, str(start_time),str(end_time), cast_time, project_name, status,logview]]) sql = """replace into {0}({1}) values({2}) """.format(g_table_name, colname, values) mysqlconn.execute(sql) class MaxcomputeTask(threading.Thread): # 獲取所有任務 def __init__(self, logger): threading.Thread.__init__(self) self.logger = logger self.hour = 1 self.status_conf = [("demo", "Running"), ("demo", "Terminated"), ("demo1", "Running"), ("demo1","Terminated")] def run(self): # 建立mysql連接, 根據你的需要來使用 self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) while True: try: self.start_more() time.sleep(10) except: self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) self.logger.error(traceback.format_exc()) def start_more(self,): for params in self.status_conf: self.get_task(*params) def get_task(self, project_name, status): odps = ODPS(**config.ODPS_INFO) odps.project = project_name list = odps.list_instances(status=status, start_time=time.time() - self.hour * 3600) self.logger.info("start {0} {1} ".format(project_name, status)) for row in list: save_task(instanceid=str(row), odps=odps, mysqlconn=self.mysqlconn) self.logger.info( "end {0} {1}".format(project_name, status)) class MaxcomputeTaskRunning(threading.Thread): # 更新running任務的狀態 def __init__(self, logger): threading.Thread.__init__(self) self.logger = logger def run(self): self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) while True: try: self.update_running() time.sleep(1) except: self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) self.logger.error(traceback.format_exc()) def update_running(self): sql = "select instanceid, project_name from {0} where status='Running'".format(g_table_name) sqlret = self.mysqlconn.fetchall(sql) if not sqlret: return self.logger.info("{1} running update length:{0}".format(len(sqlret), time.strftime("%Y-%m-%d %H:%M:%S") )) for row in sqlret: odps = ODPS(**config.ODPS_INFO) odps.project = row["project_name"] save_task(row["instanceid"], odps, self.mysqlconn) if __name__ == "__main__": # logger是自己編寫的日誌工具類 logger = _logger.Logger("maxcompute_task.log").getLogger() running = MaxcomputeTaskRunning(logger) running.setDaemon(True) running.start() task = MaxcomputeTask(logger) task.start()
多任務執行
maxcompute可以在命令行下運行,也可以用SDK,阿裏雲的集成環境跑任務等。很多時候我們麵臨的任務是非常多的,如何做一個多任務的代碼執行器,也是經常遇到的問題。任務執行是一個典型的生產者和消費者的關係,生產者獲取任務,消費者執行任務。這麼做有2個好處。
1)任務執行的數量是需要可控的,如果同時運行的任務不可控勢必對服務器資源造成衝擊,
2)多機運行服務,避免單點故障,maxcompute的任務是運行在雲端的,可以通過instanceid獲取到結果,此結果是保留7天的。
我大致貼一些我們在實際場景種的一些代碼,生產者和消費者的代碼:
class Consumer(threading.Thread): def __init__(self, queue, lock): threading.Thread.__init__(self) self.queue = queue self.lock = lock self.timeout = 1800 def run(self): self.execute = Execute() logger.info("consumer %s start" % threading.current_thread().name) while G_RUN_FLAG: try: task = self.queue.get() self.execute.start(task) except: logger.error(traceback.format_exc()) class Producter(threading.Thread): def __init__(self, queue, lock): threading.Thread.__init__(self) self.queue = queue self.lock = lock self.sleep_time = 30 self.step_sleep_time = 5 def run(self): self.mysqlconn_bi_master = Cursor.new(**config.DBINFO_BI_MASTER) logger.info("producter %s start" % threading.current_thread().name) while G_RUN_FLAG: if self.queue.qsize() >= QUEUE_SIZE: time.sleep(self.sleep_time) continue # TODO self.queue.put(task) time.sleep(self.step_sleep_time) def main(): queue = Queue.LifoQueue(QUEUE_SIZE) lock = threading.RLock() for _ in xrange(MAX_PROCESS_NUM): consumer = Consumer(queue, lock) consumer.setDaemon(True) consumer.start() producter = Producter(queue, lock) producter.start() producter.join() def signal_runflag(sig, frame): global G_RUN_FLAG if sig == signal.SIGHUP: logger.info("receive HUP signal ") G_RUN_FLAG = False if __name__ == "__main__": logger.info("execute run") if platform.system() == "Linux": signal.signal(signal.SIGHUP, signal_runflag) main() logger.info("execute exit.")
Maxcompute實際執行時的代碼:
def _max_compute_run(self, taskid, sql): # 異步的方式執行 hints = { 'odps.sql.planner.mode': 'lot', 'odps.sql.ddl.odps2': 'true', 'odps.sql.preparse.odps2': 'lot', 'odps.service.mode': 'off', 'odps.task.major.version': '2dot0_demo_flighting', 'odps.sql.hive.compatible': 'true' } new_sql = "{0}".format(sql) instance = self.odps.run_sql(new_sql, hints=hints) #instance = self.odps.run_sql(sql) # 異步的方式執行 # instance = self.odps.run_sql(sql) self._save_task_instance_id(taskid, instance.id) # 阻塞直到完成 instance.wait_for_success() return instance.id
獲取結果時的代碼:
def instance_result(odps, instance_id): # 通過instance_id 獲取結果 instance = odps.get_instance(instance_id) response = [] with instance.open_reader() as reader: raw_response = [r.values for r in reader] column_names = reader._schema.names for line in raw_response: tmp = {} for i in range(len(line)): tmp[column_names[i]] = line[i] response.append(tmp) return response
總結:
阿裏雲的Maxcompute是非常好用的雲計算服務,它的更新和迭代速度都非常快,使用阿裏雲解放工程師的搭建基礎服務的時間,讓我們更多的專注業務,站在巨人的肩膀上聰明的幹活。
最後更新:2017-07-26 09:32:45