閱讀195 返回首頁    go 阿裏雲 go 技術社區[雲棲]


HDFS啟動

標簽(空格分隔): 大數據 HDFS


[toc]


所有的分析以單機安裝的Hadoop版本2.6.4為例分析。步驟依賴於安裝文檔中的步驟,見Hadoop的單機安裝

預製幾個重要的腳本文件:

  • 假設hadoop的安裝目錄在HADOOP_HOME。
  • 重要的腳本文件hadoop-functions.sh。

步驟詳解

格式化係統

第一步要:$ bin/hdfs namenode -format

主要執行HADOOP_HOME/bin/hdfs命令。其中設置了3個重要的變量名

 namenode)
      HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
      HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.NameNode'
      hadoop_add_param HADOOP_OPTS hdfs.audit.logger "-Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER}"
    ;;

然後最後執行

hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}"

其中的hadoop_java_exec是hadoop-functions.sh中聲明的一個函數,其作用就是啟動java進程執行command。

function hadoop_java_exec
{
  # run a java command.  this is used for
  # non-daemons

  local command=$1
  local class=$2
  shift 2

  hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
  hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
  hadoop_debug "Final JAVA_HOME: ${JAVA_HOME}"
  hadoop_debug "java: ${JAVA}"
  hadoop_debug "Class name: ${class}"
  hadoop_debug "Command line options: $*"

  export CLASSPATH
  #shellcheck disable=SC2086
  exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"
}

所以,整個命令的鏈路核心目標就是執行org.apache.hadoop.hdfs.server.namenode.NameNode類的main函數,傳遞的參數為format。

public static void main(String argv[]) throws Exception {
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null) {
        namenode.join();
      }
    } catch (Throwable e) {
      LOG.error("Failed to start namenode.", e);
      terminate(1, e);
    }
  }

其中startupShutdownMessage方法會打印一些啟動信息到控製台,同時如果是unix係統,會注冊logger到signal,在接受 { "TERM", "HUP", "INT" }信號時打印錯誤日誌。這樣做的意義在於當有係統信號觸發進程結束時,可以根據日誌來判斷是什麼原因退出進程的。

if (SystemUtils.IS_OS_UNIX) {
      try {
        SignalLogger.INSTANCE.register(LOG);
      } catch (Throwable t) {
        LOG.warn("failed to register any UNIX signal loggers: ", t);
      }

接下來就是createNameNode了,首先解析出-format參數為StartOption.FORMAT,然後執行format方法,由於沒有指定cluster,所以係統new一個clusterId,比如形如CID-d2425dab-c066-4a67-954f-32228c22abe6。

private static boolean format(Configuration conf, boolean force,
      boolean isInteractive) throws IOException {
    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
    initializeGenericKeys(conf, nsId, namenodeId);
    checkAllowFormat(conf);

    if (UserGroupInformation.isSecurityEnabled()) {
      InetSocketAddress socAddr = DFSUtilClient.getNNAddress(conf);
      SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
          DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
    }

    Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
    List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
    List<URI> dirsToPrompt = new ArrayList<URI>();
    dirsToPrompt.addAll(nameDirsToFormat);
    dirsToPrompt.addAll(sharedDirs);
    List<URI> editDirsToFormat = 
                 FSNamesystem.getNamespaceEditsDirs(conf);

    // if clusterID is not provided - see if you can find the current one
    String clusterId = StartupOption.FORMAT.getClusterId();
    if(clusterId == null || clusterId.equals("")) {
      //Generate a new cluster id
      clusterId = NNStorage.newClusterID();
    }
    System.out.println("Formatting using clusterid: " + clusterId);

    FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
    try {
      FSNamesystem fsn = new FSNamesystem(conf, fsImage);
      fsImage.getEditLog().initJournalsForWrite();

      if (!fsImage.confirmFormat(force, isInteractive)) {
        return true; // aborted
      }

      fsImage.format(fsn, clusterId);
    } catch (IOException ioe) {
      LOG.warn("Encountered exception during format: ", ioe);
      fsImage.close();
      throw ioe;
    }
    return false;
  }

接下來構造一個FSImage,設置默認的checkpoint目錄,設置存儲以及初始化edit log。其中NNStorage負責管理存儲目錄,FSEditLog是edit log對象。

protected FSImage(Configuration conf,
                    Collection<URI> imageDirs,
                    List<URI> editsDirs)
      throws IOException {
    this.conf = conf;

    storage = new NNStorage(conf, imageDirs, editsDirs);
    if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
                       DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
      storage.setRestoreFailedStorage(true);
    }

    this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
    archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
  }

有了文件係統鏡像,就可以構造FSNamesystem了,這是一個namespace狀態存儲的容器,負責承載NameNode的一切記錄性質的工作。具體的構造函數代碼較長,這裏就不貼明細了。具體分析一下步驟:
1. 先創建KeyProvider,我們這個例子沒有安全模式,因此no KeyProvider found。
2. 讀取dfs.namenode.fslock.fair,構造FSNamesystemLock,默認true,即公平讀寫鎖。
3. 設置用戶和權限
4. check 是否HA
5. 初始化BlockManager及其代理的一堆manager,包括:DatanodeManager(管理DataNode的下線[DecommissionManager]和其他活動),HeartbeatManager(管理從datanode接收到的心跳),BlockIdManager(分配和管理GenerationStamp和block id)等。
6. 構造FSDirectory,這是個純內存的結構,用來和FSNamesystem一起管理NameNode,構造INode。
7. 初始化CacheManager來管理DataNode的cache。
8. 初始化RetryCache。cache了一些非冪等的被RPCserver成功處理的請求,用以處理重試。

至此FSNamesystem初始化完成,最後執行FSImage的format方法,進行格式化。然後shutdown NameNode。

啟動NameNode和DataNode的進程

第二步就是啟動NameNode和DataNode了,具體腳本如下:

$ sbin/start-dfs.sh

NameNode啟動

腳本核心代碼:

#---------------------------------------------------------
# namenodes

NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)

if [[ -z "${NAMENODES}" ]]; then
  NAMENODES=$(hostname)
fi

echo "Starting namenodes on [${NAMENODES}]"
hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs" \
    --workers \
    --config "${HADOOP_CONF_DIR}" \
    --hostnames "${NAMENODES}" \
    --daemon start \
    namenode ${nameStartOpt}

HADOOP_JUMBO_RETCOUNTER=$?

也就是先hdfs getconf -namenodes來查詢配置列出所有NameNode。然後執行hdfs namenode來啟動NameNode。根據上麵的分析,我們知道hdfs腳本就是啟動對應命令的java進程,namenode子命令還是對應NameNode類的main方法,具體執行的其他步驟一樣,隻是在createNameNode時,因為參數不同而導致邏輯不同。因為啟動腳本裏namenode沒有其他參數,因此啟動默認邏輯

default: {
        DefaultMetricsSystem.initialize("NameNode");
        return new NameNode(conf);
      }

核心就是NameNode的構造方法。其首先通過setClientNamenodeAddress方法設置NameNode的地址,默認的就是fs.defaultFS配置對應的值hdfs://localhost:9000。

接著初始化NameNode

protected void initialize(Configuration conf) throws IOException {
    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
      if (intervals != null) {
        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
          intervals);
      }
    }

    UserGroupInformation.setConfiguration(conf);
    loginAsNameNodeUser(conf);

    NameNode.initMetrics(conf, this.getRole());
    StartupProgressMetrics.register(startupProgress);

    pauseMonitor = new JvmPauseMonitor();
    pauseMonitor.init(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

    if (NamenodeRole.NAMENODE == role) {
      startHttpServer(conf);
    }

    loadNamesystem(conf);

    rpcServer = createRpcServer(conf);

    initReconfigurableBackoffKey();

    if (clientNamenodeAddress == null) {
      // This is expected for MiniDFSCluster. Set it now using 
      // the RPC server's bind address.
      clientNamenodeAddress = 
          NetUtils.getHostPortString(getNameNodeAddress());
      LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
          + " this namenode/service.");
    }
    if (NamenodeRole.NAMENODE == role) {
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
    }

    startCommonServices(conf);
    startMetricsLogger(conf);
  }

幾個比較重要的步驟,其中startHttpServer會啟動一個httpServer,默認地址是https://0.0.0.0:50070。HDFS的默認httpserver是一個Jetty服務器,啟動httpserver後,打開頁麵可以看到整個hdfs的監控情況。然後加載Namesystem,先check參數,由於本地啟動,會收到這樣兩個警告:

2017-02-11 21:59:28,765 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Only one image storage directory (dfs.namenode.name.dir) configured. Beware of data loss due to lack of redundant storage 
directories!
2017-02-11 21:59:28,765 WARN org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Only one namespace edits storage directory (dfs.namenode.edits.dir) configured. Beware of data loss due to lack of redunda
nt storage directories!

無視存儲和editlog的存儲單目錄問題,接下來和format邏輯一樣,要構造FSNamesystem。接著就是loadFSImage,FSImage加載後需要判斷是否保存,其邏輯上是

final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); 

由於單機模式,這幾個值都是false,因此needToSave也是false,所以不會進行fsImage的saveNamespace方法。

結束後會看到一行日誌:

2017-02-11 21:59:29,472 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Finished loading FSImage in 349 msecs

表示FSImage加載完畢。

後麵跟著初始化RPC server。具體對應的類是RPC.Server,基於Protobuf的一個客戶端rpc服務器。

方法的最後兩行,startCommonServices會啟動所有的*manager和httpServer以及rpcServer,還有如果有配置ServicePlugin,每個plugin也會啟動。而startMetricsLogger開啟日誌記錄

DataNode啟動

啟動腳本

#---------------------------------------------------------
# datanodes (using default workers file)

echo "Starting datanodes"
hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \
    --workers \
    --config "${HADOOP_CONF_DIR}" \
    --daemon start \
    datanode ${dataStartOpt}
(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))

執行無參數的hdfs datanode。DataNode存儲了一係列的block來存放實際的文件數據。DataNode會和NameNode通信,且也會和其他DataNode甚至客戶端來通信。DataNode隻維護了一個關係block到bytes流的映射關係。

具體DataNode的初始化,首先先初始MetricSystem。接著進入核心的代碼段——DataNode的構造函數:

DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final StorageLocationChecker storageLocationChecker,
           final SecureResources resources) throws IOException {
    super(conf);
    this.tracer = createTracer(conf);
    this.tracerConfigurationManager =
        new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
    this.fileIoProvider = new FileIoProvider(conf, this);
    this.blockScanner = new BlockScanner(this);
    this.lastDiskErrorCheck = 0;
    this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
        DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);

    this.usersWithLocalPathAccess = Arrays.asList(
        conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
    this.connectToDnViaHostname = conf.getBoolean(
        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
    this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
    this.isPermissionEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
    this.pipelineSupportECN = conf.getBoolean(
        DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
        DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);

    confVersion = "core-" +
        conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
        ",hdfs-" +
        conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");

    this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());

    // Determine whether we should try to pass file descriptors to clients.
    if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
              HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {
      String reason = DomainSocket.getLoadingFailureReason();
      if (reason != null) {
        LOG.warn("File descriptor passing is disabled because " + reason);
        this.fileDescriptorPassingDisabledReason = reason;
      } else {
        LOG.info("File descriptor passing is enabled.");
        this.fileDescriptorPassingDisabledReason = null;
      }
    } else {
      this.fileDescriptorPassingDisabledReason =
          "File descriptor passing was not configured.";
      LOG.debug(this.fileDescriptorPassingDisabledReason);
    }

    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);

    try {
      hostName = getHostName(conf);
      LOG.info("Configured hostname is " + hostName);
      startDataNode(dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }
    final int dncCacheMaxSize =
        conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,
            DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ;
    datanodeNetworkCounts =
        CacheBuilder.newBuilder()
            .maximumSize(dncCacheMaxSize)
            .build(new CacheLoader<String, Map<String, Long>>() {
              @Override
              public Map<String, Long> load(String key) throws Exception {
                final Map<String, Long> ret = new HashMap<String, Long>();
                ret.put("networkErrors", 0L);
                return ret;
              }
            });

    initOOBTimeout();
    this.storageLocationChecker = storageLocationChecker;
  }

而其中最重要的就是startDataNode方法。其核心步驟摘要如下:
1. 注冊MBean
2. 創建一個TcpPeerServer,監聽50010端口。該server負責和Client和其他DataNode通信。此server不使用Hadoop的IPC機製
3. 啟動JvmPauseManager,用於記錄Jvm的暫停,發現則log一條
4. 初始化IpcServer,監聽50020端口。
5. 構造一個BPOfferService線程,然後啟動線程。BPServiceActor是這樣一個線程,它會先和NameNode進行握手做預注冊,接下來注冊DataNode到NameNode,然後周期性的發送心跳給NameNode,並處理接收到的response命令。
具體描述步驟5,就是如下代碼:

public void run() {
    LOG.info(this + " starting to offer service");

    try {
      while (true) {
        // init stuff
        try {
          // setup storage
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          // Initial handshake, storage recovery or registration failed
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            // Retry until all namenode's of BPOS failed initialization
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }

      runningState = RunningState.RUNNING;
      if (initialRegistrationComplete != null) {
        initialRegistrationComplete.countDown();
      }

      while (shouldRun()) {
        try {
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }

下麵具體分析一下BPServiceActor線程做的幾件事:
1. 發送versionRequest請求給NameNode,來獲取NameNode的namespace和版本信息。響應得到一個NamespaceInfo。
2. 利用NamespaceInfo初始化Storage,初始化之前先做格式化format。初始化後生成一個uuid,具體可以看到如下的日誌:

2017-02-11 21:59:33,901 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Setting up storage: nsid=537369943;bpid=BP-503975772-192.168.0.109-1486821555429;lv=-56;nsInfo=lv=-60;cid=CID-c79cc043-b282-435c-a0f6-d5a55b23e87e;nsid=537369943;c=0;bpid=BP-503975772-192.168.0.109-1486821555429;dnuuid=null
2017-02-11 21:59:33,902 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Generated and persisted new Datanode UUID 43ed99d1-20c6-4d71-919c-e9a70cb75c6e
  1. 真實握手,發送registerDatanode請求給NameNode。這時NameNode會處理這個請求,利用DataNodeManager來進行registerDatanode。這時在NameNode日誌會看到如下的日誌:

    2017-02-11 21:59:34,090 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* register
    Datanode: from DatanodeRegistration(127.0.0.1, datanodeUuid=43ed99d1-20c6-4d71-9
    19c-e9a70cb75c6e, infoPort=50075, ipcPort=50020, storageInfo=lv=-56;cid=CID-c79c
    c043-b282-435c-a0f6-d5a55b23e87e;nsid=537369943;c=0) storage 43ed99d1-20c6-4d71-
    919c-e9a70cb75c6e
    2017-02-11 21:59:34,099 INFO org.apache.hadoop.hdfs.server.blockmanagement.Datan
    odeDescriptor: Number of failed storage changes from 0 to 0
    2017-02-11 21:59:34,100 INFO org.apache.hadoop.net.NetworkTopology: Adding a new
    node: /default-rack/127.0.0.1:50010
    2017-02-11 21:59:34,189 INFO org.apache.hadoop.hdfs.server.blockmanagement.Datan
    odeDescriptor: Number of failed storage changes from 0 to 0
    2017-02-11 21:59:34,189 INFO org.apache.hadoop.hdfs.server.blockmanagement.Datan
    odeDescriptor: Adding new storage ID DS-7d302778-acd6-4366-be5e-9dbf7ad22c4d for
    DN 127.0.0.1:50010

  2. 調用offerService方法,開始周期性發送心跳。每個心跳包都包含幾個內容:DataNode名字、數據傳輸端口、總容量和剩餘bytes。然後NameNode接受到心跳後開始handleHeartbeat。

至此,整個NameNode和DataNode都開始正常工作,整個HDFS的啟動結束。

最後更新:2017-04-18 17:30:32

  上一篇:go 學Java,是自學還是去參加培訓?
  下一篇:go 2017全球雲計算開源峰會,精靈雲與您相約B17展位