当前位置: > Linux服务器 > nginx >

java nginx监控服务程序调度算法实现

时间:2015-06-28 21:05来源:linux.it.net.cn 作者:IT
package com.wole.monitor;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.wole.monitor.dao.ServiceDao;
import com.wole.servicemonitor.util.ServiceUtils;

/**
* 管理并调度某一个服务数据源的监控池
* @author yzygenuine
*
*/

public class MonitorsManage {
    private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);

    private ServiceDao dao;

    /**
     * 执行的一个并发池
     */

    private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());

    /**
     *
     */

    private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);

    /**
     * 正在执行中的MonitorService集合
     */

    private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();

    /**
     * 等待优先级队列
     */

    private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();

    /**
     * 执行队列
     */

    private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();

    /**
     * 是否关闭
     */

    private AtomicBoolean isClose = new AtomicBoolean(false);

    /**
     * 生产者启动时间
     */

    private AtomicLong startTime = new AtomicLong(0);
    /**
     * 相对于启动的间隔时间
     */

    private AtomicLong intervalTime = new AtomicLong(0);

    public void close() {
        logger.info("closing................");
        isClose.compareAndSet(false, true);
    }

    public void init() {
        logger.info("初始化");

    }

    public void work() {
        logger.info("开始工作");
        // 生产者启动工作

        Thread productThread = new Thread(new ProductMonitor(1000));
        // 消费者启动工作
        Thread consumerThread = new Thread(new ConsumerMonitor(1000));
        // 回收者启动工作
        Thread recoverThread = new Thread(new RecoverMonitor(1000));

        // 启动定时加载数据工作
        Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
        productThread.start();
        consumerThread.start();
        recoverThread.start();
        refreshThread.start();

    }

    /**
     * 生产者
     *
     * @author yzygenuine
     *
     */

    class ProductMonitor implements Runnable {
        long sleepTime = 1000;

        public ProductMonitor(long sleepTime) {
            this.sleepTime = sleepTime;
        }

        @Override
        public void run() {
            logger.info("生产者开启工作");
            // 开始进行定时监控
            long now = System.currentTimeMillis();
            long lastTime = now;
            startTime.addAndGet(now);
            try {
                do {
                    Thread.sleep(sleepTime);
                    logger.debug("生产者休息{}ms", sleepTime);
                    now = System.currentTimeMillis();
                    intervalTime.addAndGet(now - lastTime);
                    while (sleepQueue.size() > 0) {
                        MonitorService service = sleepQueue.peek();
                        if (service.getCurrentTime() - intervalTime.get() < 1) {
                            service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列
                            if (!currentSet.contains(service)) {
                                logger.info("service {} 已被删除,不加入执行队列了", service.toString());
                                continue;
                            }
                            executeQueue.add(service);
                        } else {
                            logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get());
                            break;
                        }
                    }

                    if (sleepQueue.size() <= 0) {
                        logger.debug("生产队列为空");
                    }
                    lastTime = now;
                } while (!isClose.get());
            } catch (Exception e) {
                logger.error("", e);
            }

        }

    }

    /**
     * 消费者
     *
     * @author yzygenuine
     *
     */

    class ConsumerMonitor implements Runnable {
        long sleepTime = 1000;

        public ConsumerMonitor(long sleepTime) {
            this.sleepTime = sleepTime;
            if (sleepTime < 1000) {
                throw new RuntimeException("请配置sleepTime值大一些");
            }
        }

        @Override
        public void run() {
            logger.info("消费者开启工作");
            try {
                do {
                    Thread.sleep(sleepTime);
                    logger.debug("消费者休息{}ms", sleepTime);
                    while (executeQueue.size() > 0) {
                        final MonitorService service = executeQueue.poll();
                        completionService.submit(new ExecuteCallable(service));
                    }
                    logger.debug("消费队列为空");
                } while (!isClose.get());
            } catch (Exception e) {
                logger.error("", e);
            }
        }

    }

    /**
     * 执行回调类
     *
     * @author yzygenuine
     *
     */

    class ExecuteCallable implements Callable<Response> {
        final MonitorService service;

        public ExecuteCallable(MonitorService service) {
            this.service = service;
        }

        @Override
        public Response call() throws Exception {
            logger.debug("执行");
            Map<String, String> r = new HashMap<String, String>();
            Response response = new Response();
            response.service = service;
            response.response = r;
            Monitor m = MonitorFactory.getMonitor(service);
            response.isNeedWarn = m.isNeedWarnging(service, r);
            if (response.isNeedWarn) {
                response.isSucToNotify = m.sendNotify(service, r);
            }
            return response;
        }

    }

    /**
     * 回收者
     *
     * @author yzygenuine
     *
     */

    class RecoverMonitor implements Runnable {
        private long sleepTime = 1000;

        private long count = 0;

        public RecoverMonitor(long sleepTime) {
            this.sleepTime = sleepTime;
            if (sleepTime < 1000) {
                throw new RuntimeException("请配置sleepTime值大一些");
            }
        }

        @Override
        public void run() {
            logger.info("回收者开启工作");
            try {
                do {
                    // Thread.sleep(sleepTime);
                    Future<Response> response = completionService.take();
                    // 重置后进入休眠队列
                    MonitorService s = response.get().service;
                    if (!currentSet.contains(s)) {
                        logger.info("service {} 已被删除,不回收了", s.toString());
                        continue;
                    }
                    // 当前程序已运动的时间+相对间隔时间=绝对的间隔时间
                    s.setCurrentTime(s.getIntervalTime() + intervalTime.get());
                    sleepQueue.add(s);
                    count++;
                    logger.info("回收,当前回收数量:" + count);
                } while (!isClose.get());
            } catch (Exception e) {
                logger.error("", e);
            }
        }
    }

    /** * 加载新的数据 * * @author yzygenuine * */
    class RefreshMonitorService implements Runnable {
        private long sleepTime = 1000;
        private ServiceDao dao;

        public RefreshMonitorService(long sleepTime, ServiceDao dao) {
            this.sleepTime = sleepTime;
            if (sleepTime < 60000) {
                logger.warn("刷新加载数据的间隔时间不能太短");
                throw new RuntimeException("刷新加载数据的间隔时间不能太短");
            }
            this.dao = dao;
        }

        private void firstLoad() {
            List<MonitorService> monitorService = dao.getService();
            logger.info("加载记录:" + monitorService.size());

            // 将被监控服务加入优先级队列里
            for (int j = 0; j < monitorService.size(); j++) {
                MonitorService service = monitorService.get(j);
                // 初始化好时间
                service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
                currentSet.add(service);
                sleepQueue.add(service);
            }

        }

        @Override
        public void run() {
            logger.info("读取新的service开启工作");
            firstLoad();
            try {
                do {
                    logger.info("定时加载新的数据监听者休息{}ms", sleepTime);
                    Thread.sleep(sleepTime);
                    logger.info("##########开始执行更新数据############");
                    // 加载新的所有所数据 ,与当前的数据比较
                    List<MonitorService> deleteList = dao.deleteService();
                    List<MonitorService> addList = dao.incrementalService();
                    logger.info("删除旧的数据共:{}", deleteList.size());
                    currentSet.removeAll(deleteList);
                    logger.info("增加新的数据共:{}", addList.size());
                    currentSet.addAll(addList);
                    logger.info("更新后的currentSet size:{}", currentSet.size());

                    for (MonitorService service : addList) {
                        // 初始化绝对间隔时间
                        service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
                        sleepQueue.add(service);
                    }
                    logger.info("########这一轮更新结束");
                } while (!isClose.get());
            } catch (Exception e) {
                logger.error("", e);
            }
        }
    }

    /**
     * 响应的封装类
     *
     * @author yzygenuine
     *
     */

    class Response {
        public Map<String, String> response;
        public MonitorService service;
        public boolean isNeedWarn;
        public boolean isSucToNotify;

    }

    public void setDao(ServiceDao dao) {
        this.dao = dao;
    }

}




(责任编辑:IT)
------分隔线----------------------------
栏目列表
推荐内容