Maxcompute的任务状态和多任务执行
我们在使用maxcompute的时候,我们其实非常期望知道当前有多少任务在跑,哪些任务耗时长,哪些任务已经完成,并且能通过任务的logview来分析任务耗时长的原因。
任务状态监控
Maxcompute的任务状态分Running和Terminated, 其中Running是包含:正在运行和等待运行的两种状态,Terminated包含:完成、失败、cancel的任务三个状态。阿里云提供了获取上述2种状态的SDK函数,odps.list_instances(status=Running|Terminated, start_time=开始时间,结束时间)。为了实现秒级别更新任务状态我们可以用以下思路来实现。
1、对于已经running的任务,我们需要快速更新它的状态,有可能已经完成了;
2、不断获取新的任务状态。
我们用Mysql来记录任务的状态表设计如下:
CREATE TABLE `maxcompute_task` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `instanceid` varchar(255) DEFAULT NULL comment '任务实例ID', `logview` varchar(1024) DEFAULT NULL comment 'logview链接,查看问题非常有用', `start_time` varchar(64) DEFAULT NULL comment '任务开始时间', `end_time` varchar(64) DEFAULT NULL comment '任务结束时间', `cast_time` varchar(32) DEFAULT NULL comment '耗时', `project_name` varchar(255) DEFAULT NULL comment '项目名', `status` varchar(64) DEFAULT NULL comment '任务状态', PRIMARY KEY (`id`), UNIQUE KEY `instanceid` (`instanceid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
下面的页面可以查看当前的任务耗时,开始时间,对超过1小时的任务颜色使用红色标注,并且能查看logview,还能对任务进行取消,非常方便。
我们来看看代码的实现:
#!/usr/bin/env python # -*- coding: utf-8 -*- # author: lemon import time import threading import traceback import datetime from odps import ODPS from dataflow import config from libs.myconn import Cursor from config import DBINFO_BI_MASTER from libs import logger as _logger g_table_name = "bi_maxcompute_task" def save_task(instanceid, odps, mysqlconn): # 保存任务状态到Mysql, 分别传入odps连接器和mysql连接器 instance = odps.get_instance(instanceid) project_name = odps.project status = instance.status.value start_time = instance.start_time end_time = instance.end_time sql = "select logview,status from {0} where instanceid='{1}'".format(g_table_name, instanceid) sqlret = mysqlconn.fetchone(sql) if sqlret and sqlret["status"] == "Terminated": return if sqlret and sqlret["logview"] is not None: logview = sqlret["logview"] else: logview = instance.get_logview_address() start_time = start_time + datetime.timedelta(hours=8) if status == "Running": end_time = datetime.datetime.now() else: end_time = end_time + datetime.timedelta(hours=8) cast_time = end_time - start_time colname = "instanceid,start_time,end_time,cast_time,project_name,status,logview" values = ",".join(["'{0}'".format(r) for r in [instanceid, str(start_time),str(end_time), cast_time, project_name, status,logview]]) sql = """replace into {0}({1}) values({2}) """.format(g_table_name, colname, values) mysqlconn.execute(sql) class MaxcomputeTask(threading.Thread): # 获取所有任务 def __init__(self, logger): threading.Thread.__init__(self) self.logger = logger self.hour = 1 self.status_conf = [("demo", "Running"), ("demo", "Terminated"), ("demo1", "Running"), ("demo1","Terminated")] def run(self): # 建立mysql连接, 根据你的需要来使用 self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) while True: try: self.start_more() time.sleep(10) except: self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) self.logger.error(traceback.format_exc()) def start_more(self,): for params in self.status_conf: self.get_task(*params) def get_task(self, project_name, status): odps = ODPS(**config.ODPS_INFO) odps.project = project_name list = odps.list_instances(status=status, start_time=time.time() - self.hour * 3600) self.logger.info("start {0} {1} ".format(project_name, status)) for row in list: save_task(instanceid=str(row), odps=odps, mysqlconn=self.mysqlconn) self.logger.info( "end {0} {1}".format(project_name, status)) class MaxcomputeTaskRunning(threading.Thread): # 更新running任务的状态 def __init__(self, logger): threading.Thread.__init__(self) self.logger = logger def run(self): self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) while True: try: self.update_running() time.sleep(1) except: self.mysqlconn = Cursor.new(**DBINFO_BI_MASTER) self.logger.error(traceback.format_exc()) def update_running(self): sql = "select instanceid, project_name from {0} where status='Running'".format(g_table_name) sqlret = self.mysqlconn.fetchall(sql) if not sqlret: return self.logger.info("{1} running update length:{0}".format(len(sqlret), time.strftime("%Y-%m-%d %H:%M:%S") )) for row in sqlret: odps = ODPS(**config.ODPS_INFO) odps.project = row["project_name"] save_task(row["instanceid"], odps, self.mysqlconn) if __name__ == "__main__": # logger是自己编写的日志工具类 logger = _logger.Logger("maxcompute_task.log").getLogger() running = MaxcomputeTaskRunning(logger) running.setDaemon(True) running.start() task = MaxcomputeTask(logger) task.start()
多任务执行
maxcompute可以在命令行下运行,也可以用SDK,阿里云的集成环境跑任务等。很多时候我们面临的任务是非常多的,如何做一个多任务的代码执行器,也是经常遇到的问题。任务执行是一个典型的生产者和消费者的关系,生产者获取任务,消费者执行任务。这么做有2个好处。
1)任务执行的数量是需要可控的,如果同时运行的任务不可控势必对服务器资源造成冲击,
2)多机运行服务,避免单点故障,maxcompute的任务是运行在云端的,可以通过instanceid获取到结果,此结果是保留7天的。
我大致贴一些我们在实际场景种的一些代码,生产者和消费者的代码:
class Consumer(threading.Thread): def __init__(self, queue, lock): threading.Thread.__init__(self) self.queue = queue self.lock = lock self.timeout = 1800 def run(self): self.execute = Execute() logger.info("consumer %s start" % threading.current_thread().name) while G_RUN_FLAG: try: task = self.queue.get() self.execute.start(task) except: logger.error(traceback.format_exc()) class Producter(threading.Thread): def __init__(self, queue, lock): threading.Thread.__init__(self) self.queue = queue self.lock = lock self.sleep_time = 30 self.step_sleep_time = 5 def run(self): self.mysqlconn_bi_master = Cursor.new(**config.DBINFO_BI_MASTER) logger.info("producter %s start" % threading.current_thread().name) while G_RUN_FLAG: if self.queue.qsize() >= QUEUE_SIZE: time.sleep(self.sleep_time) continue # TODO self.queue.put(task) time.sleep(self.step_sleep_time) def main(): queue = Queue.LifoQueue(QUEUE_SIZE) lock = threading.RLock() for _ in xrange(MAX_PROCESS_NUM): consumer = Consumer(queue, lock) consumer.setDaemon(True) consumer.start() producter = Producter(queue, lock) producter.start() producter.join() def signal_runflag(sig, frame): global G_RUN_FLAG if sig == signal.SIGHUP: logger.info("receive HUP signal ") G_RUN_FLAG = False if __name__ == "__main__": logger.info("execute run") if platform.system() == "Linux": signal.signal(signal.SIGHUP, signal_runflag) main() logger.info("execute exit.")
Maxcompute实际执行时的代码:
def _max_compute_run(self, taskid, sql): # 异步的方式执行 hints = { 'odps.sql.planner.mode': 'lot', 'odps.sql.ddl.odps2': 'true', 'odps.sql.preparse.odps2': 'lot', 'odps.service.mode': 'off', 'odps.task.major.version': '2dot0_demo_flighting', 'odps.sql.hive.compatible': 'true' } new_sql = "{0}".format(sql) instance = self.odps.run_sql(new_sql, hints=hints) #instance = self.odps.run_sql(sql) # 异步的方式执行 # instance = self.odps.run_sql(sql) self._save_task_instance_id(taskid, instance.id) # 阻塞直到完成 instance.wait_for_success() return instance.id
获取结果时的代码:
def instance_result(odps, instance_id): # 通过instance_id 获取结果 instance = odps.get_instance(instance_id) response = [] with instance.open_reader() as reader: raw_response = [r.values for r in reader] column_names = reader._schema.names for line in raw_response: tmp = {} for i in range(len(line)): tmp[column_names[i]] = line[i] response.append(tmp) return response
总结:
阿里云的Maxcompute是非常好用的云计算服务,它的更新和迭代速度都非常快,使用阿里云解放工程师的搭建基础服务的时间,让我们更多的专注业务,站在巨人的肩膀上聪明的干活。
最后更新:2017-07-26 09:32:45