閱讀59 返回首頁    go 魔獸


Hadoop工作機製

可以隻用一行代碼來運行MapReduce作業:JobClient.runJon(conf),Job作業運行時參與的四個實體:

     1.JobClient 寫代碼,配置作業,提交作業。
     2.JobTracker:初始化作業,分配作業,協調作業運行。這是一個java程序,主類是JobTracker。
     3.TaskTracker:運行作業劃分後的任務,即分配數據分配上執行Map或Reduce任務。
     4.HDFS:保存作業數據、配置信息等,保存作業結果。

Map/Reduce 作業總體執行流程:

     代碼編寫 ----> 作業配置  ---->  作業提交 ----> Map任務分配和執行 ----> 處理中間結果 ---->  Reduce任務分配與執行 ---->  輸出結果

而對於每個作業的執行,又包含:

     輸入準備 ----> 任務執行 ----> 輸出結果
作業提交JobClient:

     JobClient的runJob方法產生一個Jobclient實例並調用其submitJob方法,然後runJob開始循環嗎,並在循環中調用getTaskCompetionEvents方法,獲得TaskCompletionEvent實例,每秒輪詢作業進度(後麵有介紹進度和狀態更新),把進度寫到控製台,作業完成後顯示作業計數器,若失敗,則把錯誤記錄到控製台。
     submitJob方法作業提交的過程:
     1.向JobTracker請求一個新的JobId。
     2.檢查作業相關路徑,如果路徑不正確就會返回錯誤。
     3.計算作業輸入分片及其劃分信息。
     4.將作業運行需要的資源(jar文件、配置文件等)複製到Shared HDFS,並複製多個副本(參數控製,默認值為10)供tasktracker訪問,也會將計算的分片複製到HDFS。
     5.調用JobTracker對象的submitJob()方法來真正提交作業,告訴JobTracker作業準備執行。

作業的初始化JobTracker:

     JobTracker收到submitJob方法調用後,會把調用放入到一個內部隊列,由作業調度器(Job scheduler)進行調度並對其初始化。Job初始化即創建一個作業對象。
     當作業被調度後,JobTracker會創建一個代表這個作業的JobInProgress對象,並將任務和記錄信息封裝在這個對象中,以便跟蹤任務狀態和進程。
     初始化過程就是JobInProgress對象的initTasks方法進行初始化的。

     初始化步驟:
          1.從HDFS中讀取作業對應的job.split信息,為後麵的初始化做好準備。
          2.創建並初始化map和reduce任務。根據數據分片信息中的個數確定map task的個數,然後為每個map task生成一個TaskInProgress對象來處理數據分片,先將其放入nonRunningMapCache,以便JobTracker分配任務的時候使用。接下來根據JobConf中的mapred.reduce.tasks屬性利用setNumReduceTasks()方法設置reduce task的數量,然後同map task創建方式。
          3.最後就是創建兩個初始化task,進行map和reduce的初始化。

任務的分配JobTracker:

    消息傳遞HeartBeat: tasktracker運行一個簡單循環定期發送心跳(heartbeat)給JobTracker。由心跳告知JobTracker自己是否存活,同時作為消息通道傳遞其它信息(請求新task)。作為心跳的一部分,tasktracker會指明自己是否已準備好運行新的任務,如果是,jobtracker會分配它一個任務。
    分配任務所屬於的作業:在Jobtracker分配任務前需先確定任務所在的作業。後麵會介紹到各種作業調度算法,默認是一個FIFO的作業調度。
    分配Map和Reduce任務:tasktracker有固定數量的任務槽,一個tasktracker可以同時運行多個Map和Reduce任務,但其準確的數量由tasktracker的核的數量和內存大小決定。默認調度器會先填滿Map任務槽,再填Reduce任務槽。jobtracker會選擇距離離分片文件最近的tasktracker,最理想情況下,任務是數據本地化(data-local)的,當然也可以是機架本地化(rack-local),如果不是本地化的,那麼他們就需要從其他機架上檢索數據。Reduce任務分配很簡單,jobtracker會簡單的從待運行的reduce任務列表中選取下一個來執行,不用考慮數據本地化。

任務的執行TaskTracker:


     TaskTracker收到新任務後,就要在本地運行任務了,運行任務的第一步就是通過localizedJob將任務本地化所需要的注入配置、數據、程序等信息進行本地化。

     1.本地化數據:從共享文件係統將job.split 、job.jar (在分布式緩存中)複製本地,將job配置信息寫入job.xml。
     2.新建本地工作目錄:tasktracker會加壓job.jar文件到本工作目錄。
     3.調用launchTaskForJob方法發布任務(其中會新建TaskRunner實例運行任務),如果是Map任務就啟用MapTaskRunner,對於Reduce就是ReduceTaskRunner。
     在這之後,TaskRunner會啟用一個新的JVM來運行每個Map/Reduce任務,防止程序原因而導致tasktracker崩潰,但不同任務間重用JVM還是可以的,後續會講到任務JVM重用
     對於單個Map,任務執行的簡單流程是:
     1.分配任務執行參數
     2.在Child臨時文件中添加map任務信息(Child是運行Map和Reduce任務的主進程)
     3.配置log文件夾,配置map任務的通信和輸出參數
     4.讀取input split,生成RecordReader讀取數據
     5.為Map生成MapRunnable,依次從RecordReader中接收數據,並調用Map函數進行處理。
     6.最後將map函數的輸出調用collect收集到MapOutputBuffer(參數控製其大小)中。

Streaming和Pipes:
     
     Streaming和Pipes都運行特殊的Map和Reduce任務,目的是運行用戶提供的可執行程序並與之通信。
     Streaming:使用標準輸入輸出Streaming與進程進行通信。
     Pipes:用來監聽套接字,會發送一個端口號給C++程序,兩者便可建立鏈接。
     
進度和狀態更新:

     一個作業和它的任務都有狀態(status),其中包括:運行成功失敗狀態、Map/Reduce進度、作業計數器值、狀態消息。
     狀態消息與客戶端的通信:
     1.對於Map任務Progress的追蹤:progress是已經處理完的輸入所占的比例。
     2.對於Reduce:稍複雜,reduce任務分三個階段(每個階段占1/3),複製、排序和Reduce處理,若reduce已執行一半的輸入的話,那麼任務進度便是1/3+1/3+1/6=5/6。
     3.任務計數器:任務有一組計數器,負責對任務運行各個事件進行計數。
     4.任務進度報告:如果任務報告了進度,便會設置一個標記以表明狀態將被發送到tasktracker。有一個獨立線程每隔三秒檢查一次此標記,如果已設置,則告知tasktracker當前狀態。
     5.tasktracker進度報告:tasktracker會每隔5秒(這個心跳是由集群大小決定,集群越大時間會越長)發送heartbeat到jobtracker,並且tasktracker運行的所有狀態都會在調用中被發送到jobtracker。
     6.jobtracker合並各任務報告:產生一個表明所有運行作業機器所含任務狀態的全局視圖。
     前麵提到的JobClient就是通過每秒查詢JobTracker來接收最新狀態,而且客戶端JobClient的getJob方法可以得到一個RunningJob的實例,其包含了作業的所以狀態信息。
     
作業的完成:

     當jobtracker收到作業最後一個任務已完成的通知後,便把作業狀態設置成成功。JobClient查詢狀態時,便知道任務已成功完成,於是JobClient打印一條消息告知用戶,然後從runJob方法返回。
     如果jobtracker有相應設置,也會發送一個Http作業通知給客戶端,希望收到回調指令的客戶端可以通過job.end.notification.url屬性來進行設置。
     jobtracker情況作業的工作狀態,指示tasktracker也清空作業的工作狀態,如刪除中間輸出。
     
失敗
     
     實際情況下,用戶的代碼存在軟件錯誤進程會崩潰,機器也會產生故障,但Hadoop能很好的應對這些故障並完成作業。
     1.任務失敗    
     子任務異常:如Map/Reduce任務中的用戶代碼拋出異常,子任務JVM進程會在退出前向父進程tasktracker發送錯誤報告,錯誤被記錄用戶日誌。tasktracker會將此次task attempt標記為tailed,並釋放這個任務槽運行另外一個任務。
     子進程JVM突然退出:可能由於JVM bug導致用戶代碼造成的某些特殊原因導致JVM退出,這種情況下,tasktracker會注意到進程已經退出,並將此次嚐試標記為failed。
     任務掛起:一旦tasktracker注意一段時間沒有收到進度更新,便會將任務標記為failed,JVM子進程將被自動殺死。任務失敗間隔時間通常為10分鍾,可以以作業或者集群為基礎設置過期時間,參數為mapred.task.timeout。注意:如果參數值設置為0,則掛起的任務永遠不會釋放掉它的任務槽,隨著時間的推移會降低整個集群的效率。
     任務失敗嚐試次數:jobtracker得知一個tasktracker失敗後,它會重新調度該任務執行,當然,jobtracker會嚐試避免重新調度失敗過的tasktracker任務。如果一個任務嚐試次數超過4次,它將不再被重試。這個值是可以設置的,對於Map任務,參數是mapred.map.max.attempts,對於reduce任務,則由mapred.reduce.max.attempts屬性控製。如果次數超過限製,整個作業都會失敗。當然,有時我們不希望少數幾個任務失敗就終止運行的整個作業,因為即使有些任務失敗,作業的一些結果可能還是有用的,這種情況下,可以為作業設置在不觸發作業失敗情況下的允許任務失敗的最大百分比,Map任務和Reduce任務可以獨立控製,參數為mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。
     任務嚐試中止(kill):任務終止和任務失敗不同,task attempt可以中止是因為他是一個推測副本或因為它所處的tasktracker失敗,導致jobtracker將它上麵的所有task attempt標記為killed。被終止的task attempt不會被計入任務運行嚐試次數,因為嚐試中止並不是任務的錯。
     2.tasktracker失敗
     tasktracker由於崩潰或者運行過慢而失敗,他將停止向jobtracker發送心跳(或很少發送心跳)。jobtracker注意已停止發送心跳的tasktracker(過期時間由參數mapred.tasktracker.expiry.interval設置,單位毫秒),並將它從等待調度的tasktracker池中移除。如果是未完成的作業,jobtracker會安排次tasktracker上已經運行成功的Map任務重新運行,因為此時reduce任務已無法訪問(中間輸出存放在失敗的tasktracker的本地文件係統上)。
     即使tasktracker沒有失敗,也有可能被jobtracker列入黑名單。如果tasktracker上麵的失敗任務數量遠遠高於集群的平均失敗任務次數,他就會被列入黑名單,被列入黑名單的tasktracker可以通過重啟從jobtracker黑名單中移除。
     3.jobtracker失敗
     老版本的JobTracker失敗屬於單點故障,這種情況下作業注定失敗。


作業調度:

     早期作業調度FIFO:按作業提交順序先進先出。可以設置優先級,通過設置mapred.job.priority屬性或者JobClient的setJobPriority()方法製定優先級(優先級別:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO調度算法不支持搶占(preemption),所以高優先級作業仍然會被那些已經開始的長時間運行的低優先級作業所阻塞。
     Fair Scheduler:目標是讓每個用戶公平地共享集群能力。當集群存在很多作業時,空閑的任務槽會以”讓每個用戶共享集群“的方式進行分配。默認每個用戶都有自己的作業池。FairScheduler支持搶占,所以,如果一個池在特定的一段時間未得到公平地資源共享,它會終止池中得到過多的資源任務,以便把任務槽讓給資源不足的池。FairScheduler是一個後續模塊,使用它需要將其jar文件放在Hadoop的類路徑下。可以通過參數map.red.jobtracker.taskScheduler屬性配置(值為org.apache.hadoop.mapred.FairScheduler)
     Capacity Scheduler:
     集群由很多隊列組成,每個隊列都有一個分配能力,這一點與FairScheduler類似,隻不過在每個隊列內部,作業根據FIFO方式進行調度。本質上說,Capacity Scheduler允許用戶或組織為每個用戶模擬一個獨立使用FIFO的集群。


shuffle和排序:

     MapReduce確保每個Reducer的輸入都是按鍵排序的。係統執行排序的過程-將map輸出作為輸入傳給reducer的過程稱為shuffle。shuffle屬於不斷被優化和改進的代碼庫的一部分,從許多方麵來看,shuffle是MapReduce的心髒。
     整個shuffle的流程應該是這樣:
     map結果劃分partition  排序sort 分割spill   合並同一劃分   合並同一劃分  合並結果排序 reduce處理 輸出
     Map端:
     寫入緩衝區:Map函數的輸出,是由collector處理的,它並不是簡單的將結果寫到磁盤。它利用緩衝的方式寫到內存,並處於效率的考慮進行預排序。每個map都有一個環形的內存緩衝區,用於任務輸出,默認緩衝區大小為100MB(由參數io.sort.mb調整),一旦緩衝區內容達到閾值(默認0.8),後台進程邊開始把內容寫到磁盤(spill),在寫磁盤過程中,map輸出繼續被寫到緩衝區,但如果緩衝區被填滿,map會阻塞知道寫磁盤過程完成。寫磁盤將按照輪詢方式寫到mapred.local.dir屬性製定的作業特定子目錄中。
     寫出緩衝區:collect將緩衝區的內容寫出時,會調用sortAndSpill函數,這個函數作用主要是創建spill文件,按照key值對數據進行排序,按照劃分將數據寫入文件,如果配置了combiner類,會先調用combineAndSpill函數再寫文件。sortAndSpill每被調用一次,就會寫一個spill文件。
     合並所有Map的spill文件:TaskTracker會在每個map任務結束後對所有map產生的spill文件進行merge,merge規則是根據分區將各個spill文件中數據同一分區中的數據合並在一起,並寫入到一個已分區且排序的map輸出文件中。待唯一的已分區且已排序的map輸出文件寫入最後一條記錄後,map端的shuffle階段就結束了。

     在寫磁盤前,線程首先根據數據最終要傳遞到的reducer把數據劃分成響應的分區(partition),在每個分區中,後台線程按鍵進行內排序,如果有一個combiner,它會在排序後的輸出上運行。
     內存達到溢出寫的閾值時,就會新建一個溢出寫文件,因為map任務完成其最後一個輸出記錄之後,會有幾個溢出寫文件。在任務完成前,溢出寫文件會被合並成一個已分區且已排序的輸出文件。配置屬性io.sort.facor控製一次最多能合並多少流,默認值是10。
     如果已經指定combiner,並且寫次數至少為3(通過min.mum.spills.for.combine設置)時,則combiner就會在輸出文件寫到磁盤之前運行。運行combiner的意義在於使map輸出更緊湊,舍得寫到本地磁盤和傳給reducer的數據更少。
     寫磁盤時壓縮:寫磁盤時壓縮會讓寫的速度更快,節約磁盤空間,並且減少傳給reducer的數據量。默認情況下,輸出是不壓縮的,但可以通過設置mapred.compress.map.output值為true,就可以啟用壓縮。使用的壓縮庫是由mapred.map.output.compression.codec製定。
     reducer獲得文件分區的工作線程:reducer通過http方式得到輸出文件的分區,用於文件分區的工作線程數量由tracker.http.threads屬性指定,此設置針對的是每個tasktracker,而不是每個map任務槽。默認值為40,在大型集群上此值可以根據需要而增加。

     Reduce端:
     
     複製階段:reduce會定期向JobTracker獲取map的輸出位置,一旦拿到輸出位置,reduce就會從對應的TaskTracker上複製map輸出到本地(如果map輸出很小,則會被複製到TaskTracker節點的內存中,否則會被讓如磁盤),而不會等到所有map任務結束(當然這個也有參數控製)。
     合並階段:從各個TaskTracker上複製的map輸出文件(無論在磁盤還是內存)進行整合,並維持數據原來的順序。
     Reduce階段:從合並的文件中順序拿出一條數據進行reduce函數處理,然後將結果輸出到本地HDFS。

     Map的輸出文件位於運行map任務的tasktracker的本地磁盤,現在,tasktracker要為分區文件運行reduce任務。每個任務完成時間可能不同,但是隻要有一個任務完成,reduce任務就開始複製其輸出,這就是reduce任務的複製階段(copy phase)。reduce任務有少量複製線程,因此能夠並行取得map輸出。默認值是5個線程,可以通過mapred.reduce.parallel.copies屬性設置。
     Reducer如何得知從哪個tasktracker獲得map輸出:map任務完成後會通知其父tasktracker狀態已更新,tasktracker進而通知(通過heart beat)jobtracker。因此,JobTracker就知道map輸出和tasktracker之間的映射關係,reducer中的一個線程定期詢問jobtracker以便獲知map輸出位置。由於reducer有可能失敗,因此tasktracker並沒有在第一個reducer檢索到map輸出時就立即從磁盤上刪除它們,相反他會等待jobtracker告示它可以刪除map輸出時才刪除,這是作業完成後最後執行的。
     如果map輸出很小,則會被直接複製到reduce tasktracker的內存緩衝區(大小由mapred.job.shuffle.input.buffer.percent控製,占堆空間的百分比),否則,map輸出被複製到磁盤。一旦內存緩衝區達到閾值大小(由mapred.iob.shuffle.merge.percent)
或達到map輸出閾值大小(mapred.inmem.threadhold),則合並後溢出寫到磁盤中。
     隨著磁盤上副本增多,後台線程會將他們合並為更大的、排好序的文件。注意:為了合並,壓縮的map輸出必須在內存中被解壓縮。
     排序階段:複製階段完成後,reduce任務會進入排序階段,更確切的說是合並階段,這個階段將合並map輸出,維持其順序排列。合並是循環進行的,由合並因子決定每次合並的輸出文件數量。但讓有可能會產生中間文件。
     reduce階段:在最後reduce階段,會直接把排序好的文件輸入reduce函數,不會對中間文件進行再合並,最後的合並即可來自內存,也可來自磁盤。此階段的輸出會直接寫到文件係統,一般為hdfs。
     細節:這裏合並是並非平均合並,比如有40個文件,合並因子為10,我們並不是每趟合並10個,合並四趟。而是第一趟合並4個,後三趟合並10,在最後一趟中4個已合並的文件和餘下6個未合並會直接並入reduce。

配置調優:
     
     調優總原則:在保證Map函數和Reduce函數能夠得到足夠內存的前提下,給shuffle過程提供更多的內存空間。
     1.編寫map和reduce函數時盡量少占用內存空間。
     2.設置JVM內存大小(mapred.child.java.opts),任務節點內存大小應該盡量大(關於內存請見集群構建中的環境配置筆記)。
     3.Map端:避免多次溢出寫磁盤。估算map輸出大小,調整io.sort.mb(map輸出內存緩衝區大小),如果可以,可增加其值。注意mapreduce計數器會記錄作業在整個運行過程溢出寫磁盤的記錄數,這對調優很有幫助。
     4.Reduce端:中間數據全部駐留內存可獲得最佳性能。如果reduce函數內存需求不大,那麼可以把mapred.inmem.threadhold輸出閾值調為0(即不寫溢出),把mapred.job.shuffle.input.buffer.percent reduce 值設為1(即reduce內存緩衝區最大)會帶來性能提升。
     5.提高Hadoop緩衝區:默認為4KB,應該在集群中增加這個值。

任務的執行:
     
     推測執行:
     為了避免由於一個任務執行慢而是整個作業執行過慢的情況,hadoop 提供了一種推測執行的機製:即hadoop不會嚐試診斷或修複執行慢的任務(其實不可能辦到),而是在一個任務比預期慢的時候啟動另一個相同的任務作為備份。
     一個任務和其推測任務任何一個成功完成,另一個就會中止。
     推測執行是一種優化措施,默認情況下推測執行是啟用的。可以基於集群或基於每個作業,單獨為map或reduce任務啟用或禁用該項功能。mapred.map.tasks.speculative.execution  默認值為 true;mapred.reduce.tasks.speculative.execution 默認值為 true。推測執行目的是減少作業執行時間,但這是以集群效率為代價的,一般而言,集群管理員傾向於在集群上關閉該功能,而讓用戶根據個別需要而開啟該功能。
     任務JVM重用:
     Hadoop在自己的java虛擬機上運行任務,而且會為每個任務啟動一個新的JVM,啟動時間大約為1秒。參數mapred.job.reuse.jvm.num.tasks製定給定作業每個JVM運行任務的最大數,默認值為1,若設置為-1,則不限任務數量。JobConf中的setNumTasksToExecutePerJvm方法也可設置這個屬性。計算超短任務或密集型任務也可以受益於JVM重用機製。共享JVM的另一個非常有用的地方是:作業個任務之間共享狀態,任務可以較快的訪問共享數據。
     跳過壞數據:
     通過開啟skipnode來控製。
     




本節相關參數:
1.mapred.submit.replication 運行作業資源的副本數。
2.mapred.reduce.task 作業的Reduce任務數量,可通過setNumReduceTasks()方法設置。
3.tasktracker任務槽數量。
4.心跳發送周期、任務進度報告周期、tasktracker進度報告周期、JobClient輪詢周期。
5.job.end.notification.url 作業完成時客戶端接收作業完成回調指令的參數。
6.本節會產生各種狀態信息。
7.mapred.task.timeout 任務掛起的最大等待時間。
8.mapred.map.max.attempts 、mapred.reduce.max.attempts 任務失敗最大嚐試次數
9.mapred.max.map.failures.percent 、mapred.max.reduce.failures.percent 允許錯誤但不觸發作業失敗的任務數的百分比。
10.mapred.tasktracker.expiry.interval tasktracker向jobtracker發送心跳的過期時間,默認10分鍾,單位毫秒。
11.mapred.job.priority 作業調度優先級。
12.map.red.jobtracker.taskScheduler 配置作業調度算法參數(值org.apache.hadoop.mapred.FairScheduler)。
13.io.sort.mb 默認map輸出緩衝區大小參數 默認值100MB。
14.io.sort.spill.percent 寫緩衝區內容閾值參數 默認值0.8。
15.mapred.local.dir map函數輸出寫磁盤目錄。
16.io.sort.facor map溢出寫文件一次被合並的數目 默認值是10(設置成100是很常見的)
17.mapred.compress.map.output Map輸出寫磁盤時是否啟用壓縮參數 默認值為false。
18.mapred.map.output.compression.codec Map輸出磁盤啟用壓縮的壓縮庫
19.tracker.http.threads reducer 每個tasktracker獲得文件分區的工作線程數量,針對一個tasktracker 默認值是40。
20.mapred.reduce.parallel.copies reduce任務複製map輸出文件線程數量 默認值是5。
21.mapred.job.shuffle.input.buffer.percent shuffle複製階段,分配給map 輸出存緩衝區占堆棧空間的百分比,默認值0.7。
22.mapred.inmem.threadhold map輸出閾值。
23.io.sort.record.percent 存儲map輸出記錄邊界的io.sort.mb的比例(內存緩衝區所占棧空間比例),剩餘空間空間存儲記錄本身。
24.min.num.spills.for.combine 運行combiner所需要的最少溢出寫文件數 默認值為3。
25.mapred.reduce.copy.backoff reducer獲取一個map輸出所花最大時間,單位是秒,默認值300
26.mapred.ijob.shufffle.merge.percent map輸出緩衝區使用閾值的比例,啟動合並輸出 默認值為0.66.
27.mapred.inmem.merge.threadhold 啟動map輸出和磁盤溢出寫過程的map輸出閾值,默認值1000.
28.mapred.iob.reduce.input.buffer.percent 在reduce過程中,內從中保存map輸出的空間占整個堆內存空間的比例。默認值為0。默認情況下,map輸出都合並到磁盤上,以便為reducer提供盡可能多的內存,如果reducer需要的內存較少,可以增加此值來最小化磁盤訪問次數。



最後更新:2017-04-03 12:55:57

  上一篇:go linux下串口編程設置函數---------set_opt(fd1,115200,8,'N'1)--------------------
  下一篇:go 墨跡天氣3.0引導動畫