阅读444 返回首页    go 阿里云 go 技术社区[云栖]


数加MaxCompute计算资源分布以及LogView分析优化

大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的PB/EB级数据仓库解决方案,具备万台服务器扩展能力和跨地域容灾能力,是阿里巴巴内部核心大数据平台,支撑每日百万级作业规模。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。(官方文档有这里就不多做介绍了)
官方文档链接

用户不必关心分布式计算细节,从而达到分析大数据的目的。

大型互联网企业的数据仓库和BI分析、网站的日志分析、电子商务网站的交易分析、用户特征和兴趣挖掘等。

odps.structure.pngMaxCompute由四部分组成,分别是客户端 (ODPS Client)、接入层 (ODPS Front End)、逻辑层 (ODPS Server) 及存储与计算层 (Apsara Core)。

  • ODPS的客户端有以下几种形式:
    • Web:ODPS以 RESTful API的方式提供离线数据处理服务;
    • ODPS SDK:对ODPS RESTful API的封装,目前有Java等版本的实现;
    • ODPS CLT (Command Line Tool):运行在Window/Linux下的客户端工具,通过CLT可以提交命令完成Project管理、DDL、DML等操作;
    • ODPS IDE:ODPS提供了上层可视化ETL/BI工具,即“采云间”,用户可以基于采云间完成数据同步、任务调度、报表生成等常见操作。
  • ODPS接入层提供HTTP服务、Cache、Load Balance,用户认证和服务层面的访问控制。
  • 逻辑层又称作控制层,是ODPS的核心部分。实现用户空间和对象的管理、命令的解析与执行逻辑、数据对象的访问控制与授权等功能。在逻辑层有Worker、Scheduler和Executor三个角色:
    • Worker处理所有RESTful请求,包括用户空间(project)管理操作、资源(resource)管理操作、作业管理等,对于SQL DML、MR、DT等启动Fuxi任务的作业,会提交Scheduler进一步处理;
    • Scheduler负责instance的调度,包括将instance分解为task、对等待提交的task进行排序、以及向计算集群的Fuxi master询问资源占用情况以进行流控(Fuxi slot满的时候,停止响应Executor的task申请);
    • Executor负责启动SQL/ MR task,向计算集群的Fuxi master提交Fuxi任务,并监控这些任务的运行。
  • 计算层就是飞天内核(Apsara Core),运行在和控制层相互独立的计算集群上。包括Pangu(分布式文件系统)、Fuxi(资源调度系统)、Nuwa/ZK(Naming服务)、Shennong(监控模块)等。ODPS中的元数据存储在阿里云计算的另一个开放服务OTS(Open Table Service,开放结构化数据服务)中,元数据内容主要包括用户空间元数据、Table/Partition Schema、ACL、Job元数据、安全体系等。

下面将以一个完整的SQL语句为例,介绍提交后经过MaxCompute处理的全流程:


309503dcf7f98da684f81285c05961b83c61f956

  1. 通过console提交一个SQL语句。
  2. 调用SDK计算配置信息中的签名。
  3. 发送 RESTful 请求给HTTP服务器。
  4. HTTP 服务器发送请求到云账号服务器做用户认证。
  5. 认证通过后,请求就会以 Kuafu通信协议方式发送给 Worker。
  6. Worker判断该请求作业是否需要启动Fuxi Job。如果不需要,本地执行并返回结果。
  7. 如果需要,则生成一个 instance, 发送给 Scheduler。
  8. Scheduler把instance信息注册到 OTS,将其状态置成 Running。
  9. Scheduler 把 instance 添加到 instance 队列。
  10. Worker把 Instance ID返回给客户端。

  1. Scheduler会把instance拆成多个Task,并生成任务流DAG图。
  2. 把可运行的Task 放入到优先级队列TaskPool中。
  3. Scheduler 有一个后台线程定时对TaskPool 中的任务进行排序。
  4. Scheduler 有一个后台线程定时查询计算集群的资源状况。
  5. Executor在资源未满的情况下,轮询TaskPool,请求Task。
  6. Scheduler判断计算资源。若集群有资源,就将该Task发给Executor。
  7. Executor调用SQL Parse Planner,生成SQL Plan。
  8. Executor 将 SQL Plan 转换成计算层的 FuXi Job 描述文件。
  9. Executor 将该描述文件提交给计算层运行,并查询 Task 执行状态。
  10. Task 执行完成后,Executor更新 OTS 中的 Task信息,并汇报给 Scheudler。
  11. Schduler 判断 instance 结束,更新 OTS 中 instance 信息,置为 Terminated。

客户端接收到返回的 Instance ID 后,可以通过 Instance ID 来查询作业状态:

  1. 客户端会发送另一个 REST 的请求,查询作业状态。
  2. HTTP 服务器根据配置信息,去云账号服务器做用户认证。
  3. 用户认证通过后,把查询的请求发送给 Worker。
  4. Worker 根据 InstanceID 去 OTS 中查询该作业的执行状态。
  5. Worker 将查询到的执行状态返回给客户端。


这里主要说下计算层的MR Job和SQL Job,因为ODPS有对外提供MapReduce编程接口,来访问ODPS上的数据,其中MR Job就是用来跑那些任务的。而SQL Job主要用来跑通过客户端接受的SQL查询请求的任务。

逻辑层里主要有二个队列,一个是instance队列,一个是Task队列,Scheduler负责instance的调度,负责将instance分解成Task放入到Task队列,重点是:Task队列是按照优先级排序的,负责排序的就是Scheduler发起的一个后台线程。Executor在资源未满的情况下,轮询TaskPool,请求Task,Executor调用SQL Parse Planner,生成SQL Plan,然后将SQL Plan转换成计算层的 FuXi Job 描述文件,最终将该描述文件提交给计算层运行,并查询 Task 执行状态。

ad8af45fb581339a3444388c13887aa1af9e563b

ODPS提供了数据上传下载通道,SQL及MapReduce等多种计算分析服务,并且提供了完善的安全解决方案,其功能组件(绿色虚线部分)以及周边组件(蓝色标识)。
具体功能组件的作用,请参考官方文档

odps_resource.png
  • 首先整个ODPS计算资源被分成多个集群,每个project可以配置多个集群,但是只能默认跑在其配置的默认集群(默认集群只有一个)上面,除非手动切换。
  • 每个集群会被分成多个quota,一般某个project会跑在某个集群上的quota上的,每个quota有固定的计算资源配额,你的project也会有固定的至少获取到的资源,最大获取到的资源就是所在quota的配额,不一定能获取到最大的配额,因为某个quota是多个project共享的。

Logview分析

当某个任务跑的比较慢,我们可以根据其logview来发现问题,进行优化,下面给大家分享如何对logview进行分析,下面我们来看根据某个logview的分析步骤:

logview1.png

  • 点击圆形的sql,就可以看到实际执行的sql,点击diagnosis就可以看到对sql执行的诊断,是否资源充足,是否有长尾情况,是否有数据倾斜情况。
  • 还可以看到任务运行的开始时间,结束时间,运行时间,点击detail就可以看到这个任务执行详情,包括有向无环图,Mapper和Reducer或Join节点具体的运行记录。 下面是点击detail之后,出现的画面,也是我们重点要分析的地方,如下图所示:
    logview2.png
  • 我们可以看到左边是整个实例所包含的任务运行的有向无环图,一共有三个Task,右边包括具体的三个Task的详细信息,还有summary,你可以看到每个Task的input和output的记录数,还可以看到每个Task开启了几个instance进行运行。
  • 点击每个Fuxi Job就可以在下面看到每个Job详情:具体如下图所示:
    logview3.png
  • 从上面可以看到,M1_STG1这个job一共起了46个instance来跑任务,这个job的开始时间在上面个红色的框框里,每个instance的开始和起始时间在下面的框框里,每个instance实际运行时间就是下面Latency时间,单位是s,最右边的框框里显示的是这个job下面的所有instance里面的最小最大和平均运行时间,如果说差异比较大,可能会有长尾或者数据不均匀所致,我们要根据这些信息进行分析,该如何去优化这个Job。

具体的优化过程以后会给大家具体讲解,下面先给大家展示一个例子,由于小表和大表进行join所造成的长尾问题的解决方案以及效果:

-优化方案:
我们将join的二个小表,使用mapjoin的方式进行优化,将每个小表的内容load到每个mapper节点的内存中,这个速度可以大大优化,但是对小表的大小是有限制的,如果太小,可以设置每个mapper的memery的大小,但是这些都不是万能的,当资源不足时,可能会造成资源等待。所以优化方案要根据自己sql以及涉及到的数据量进行优化,任何优化方法都不是万能的。

-优化前:

screenshot.png-优化后:
screenshot.png

后续

希望大家在跑sql任务的时候,多看看自己的logview,不要太蛮力的去跑sql,这样不仅占用资源太多,而且还会影响别人的任务运行。优化固然很难,但是也要慢慢走下去。
以后会分享更多的优化方案。

最后更新:2017-05-10 21:32:24

  上一篇:go Spring Cloud构建微服务架构书目录
  下一篇:go 微服务架构设计模式书目录