閱讀480 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Marble原理之線程池

本章節依賴於【Marble使用】,閱讀本章節前請保證已經充分了解Marble

線程池概述

由於Marble屬於框架性項目,用戶接入Marble不關心Marble的實現機製。因此Marble在做相關處理時對資源的消耗要可控,不能因為Marble的原因導致接入的應用不可用(比如資源耗盡)。
此外,Marble-Agent每次收到RPC調度為了不阻塞都會新開線程進行JOB執行,對線程的使用非常頻繁,因此必須使用同一的線程池進行Marble的資源使用收口。

對於線程池 Java已經做了很好的封裝,大部分的使用場景都能覆蓋,枚舉如下:
1. newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程;
2. newFixedThreadPool 創建一個定長線程池,可控製線程最大並發數,超出的線程會在隊列中等待;
3. newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行;
4. newSingleThreadExecutor 創建一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行;

線程池new線程的流程(網絡盜圖):
4678905_666cb2fd65393956

Marble線程池

線程池定義

由於Marble線程池一個很大的作用是為了控製資源使用,給Marble資源占用設定上限,Java本身提供的線程池雖然有最大線程數設置,但阻塞隊列用的都是無界的,不適合做資源限定使用。因此,Marble對java線程池做了定製化。

使用有界阻塞隊列

 executor = new ThreadPoolExecutor(
                tpConfig.getMaxSize(),
                tpConfig.getCoreSize(), 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(tpConfig.getBlockQueueSize()),
                tpConfig.getRejectPolicy()
        );

線程池自配置支持

為了方便用戶進行線程池自配置,Marble提供配置文件的方式支持用戶自定義線程池配置,配置方式為:在項目根目錄下建立文件marble-config.properties 。文件中進行參數賦值,如下:

#線程池最大線程數
tpool_max_size=5
#線程池核心線程數
tpool_core_size=5
#線程池阻塞有界隊列長度
tpool_bq_size=3
#線程池滿後的處理策略。1-AbortPolicy(拋出RejectedExecutionException異常); 2-CallerRunsPolicy; 3-DiscardOldestPolicy 4-DiscardPolicy(不拋出異常)
tpool_reject_policy=1

Marble會首先在根目錄下查找此配置文件,找不到會用默認配置。
tpool_max_size=20
tpool_core_size=20
tpool_bq_size=5
tpool_reject_policy=1

Marble的配置解析類如下:


/**
 * Marble 配置解析
 *
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/3/31 20:15
 */
public class MarbleConfigParser {
    private static ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleConfigParser.class);
    private static final String CONFIG = "marble-config.properties";
    private static Properties prop = new Properties();
    //默認配置
    private static final int TPOOL_MAX_SIZE = 20;//線程池最大線程數
    private static final int TPOOL_CORE_SIZE = 20;//線程池核心線程數
    private static final int TPOOL_BQ_SIZE = 5;//線程池阻塞隊列大小
    private static final int TPOOL_REJECT_POLICY = 1;//線程池滿的處理策略. 1-AbortPolicy(拋出RejectedExecutionException異常); 2-CallerRunsPolicy; 3-DiscardOldestPolicy 4-DiscardPolicy

    private MarbleConfigParser() {
        try {
            InputStream stream = PropertyUtils.class.getClassLoader().getResourceAsStream(CONFIG);
            if (stream == null) {
                logger.MARK("PARSE_CONFIG").warn("no marbleConfig.properties.xml is exist in the root directory of classpath, so default the config will be used.");
                return;
            }
            prop.load(stream);
        } catch (Exception e) {
            logger.MARK("PARSE_CONFIG").error("parse the marbleConfig.properties.xml in the root directory exception, detail: {}", Throwables.getStackTraceAsString(e));
        }
    }

    //解析出thread pool配置
    ThreadPoolConfig parseTPConfig() {
        ThreadPoolConfig tpConfig = null;
        try {
            Integer tpms = getInteger(prop, "tpool_max_size");
            Integer tpcs = getInteger(prop, "tpool_core_size");
            Integer tpqs = getInteger(prop, "tpool_bq_size");
            Integer tprp = getInteger(prop, "tpool_reject_policy");

            //修正參數
            tpcs = (tpcs == null || tpcs < 0 || tpcs > 500) ? TPOOL_CORE_SIZE : tpcs;
            tpms = (tpms == null || tpms < tpqs) ? tpcs : tpms;
            tpqs = (tpqs == null || tpqs < 0 || tpqs > 100) ? TPOOL_BQ_SIZE : tpqs;
            tprp = (tprp == null || tprp > 4) ? 1 : tprp;

            RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
            switch (tprp) {
                case 1:
                    handler = new ThreadPoolExecutor.AbortPolicy();
                    break;
                case 2:
                    handler = new ThreadPoolExecutor.CallerRunsPolicy();
                    break;
                case 3:
                    handler = new ThreadPoolExecutor.DiscardOldestPolicy();
                    break;
                case 4:
                    handler = new ThreadPoolExecutor.DiscardPolicy();
                    break;
            }
            tpConfig = new ThreadPoolConfig(tpms,tpcs,tpqs,handler);
        } catch (Exception e) {
            logger.MARK("PARSE_CONFIG").error("parse the thread-pool config from marbleConfig.properties.xml exception, detail: {}", Throwables.getStackTraceAsString(e));
        }
        if (tpConfig == null) {
            tpConfig = new ThreadPoolConfig(TPOOL_MAX_SIZE,TPOOL_CORE_SIZE, TPOOL_BQ_SIZE, new ThreadPoolExecutor.DiscardPolicy());
        }
        return tpConfig;
    }


    private Integer getInteger(Properties prop, String key) {
        Integer result = null;
        try {
            String value = prop.getProperty(key);
            if (value != null && value.trim().length() > 0) {
                result = Integer.parseInt(value);
            }
        } catch (Exception e) {
        }
        return result;
    }

    //單例
    private static class SingletonHolder {
        private static final MarbleConfigParser CONFIG_HELPER = new MarbleConfigParser();
    }

    public static MarbleConfigParser getInstance() {
        return MarbleConfigParser.SingletonHolder.CONFIG_HELPER;
    }


    //線程池配置
    class ThreadPoolConfig {
        private int maxSize;//線程池最大線程數
        private int coreSize;//線程池核心線程數
        private int blockQueueSize;//線程池阻塞隊列大小
        private RejectedExecutionHandler rejectPolicy;//線程池拒絕策略

        ThreadPoolConfig(int maxSize, int coreSize, int blockQueueSize, RejectedExecutionHandler rejectPolicy) {
            this.maxSize = maxSize;
            this.coreSize = coreSize;
            this.blockQueueSize = blockQueueSize;
            this.rejectPolicy = rejectPolicy;
        }

        int getCoreSize() {
            return coreSize;
        }

        int getBlockQueueSize() {
            return blockQueueSize;
        }

        public int getMaxSize() {
            return maxSize;
        }

        RejectedExecutionHandler getRejectPolicy() {
            return rejectPolicy;
        }

        @Override
        public String toString() {
            return "ThreadPoolConfig{" +
                    "maxSize=" + StringUtils.safeString(maxSize) +
                    ", coreSize=" + StringUtils.safeString(coreSize) +
                    ", blockQueueSize=" + StringUtils.safeString(blockQueueSize) +
                    ", rejectPolicy=" + StringUtils.safeString(rejectPolicy.getClass().getSimpleName()) +
                    '}';
        }
    }
}

線程池使用示例

以如下線程池配置為例:
tpool_max_size=5
tpool_core_size=5
tpool_bq_size=3
tpool_reject_policy=1

下圖中同一台機器(10.2.37.137)連續收到11次Marble調度 >

  • 第1~5次Marble-Agent成功從線程池中啟動了5個線程進行執行;
  • 第6~8次調用,核心線程數已滿,有界阻塞隊列開始進行填充;
  • 第9~10次調用有界阻塞隊列已被填滿,最大線程數也已滿,由於采用了 拒絕策略Abort,直接拒絕了10~11次的調度請求;
  • 手動進行了“線程中斷”調用;
  • 第11次又成功執行;

image003

最後更新:2017-07-11 17:32:43

  上一篇:go  IT眾包不養技術人員該怎麼玩?
  下一篇:go  如何保護個人信息安全? 代表建議:製定地方法規增強可操作性