java nginx监控服务程序调度算法实现
时间:2015-06-28 21:05 来源:linux.it.net.cn 作者:IT
package com.wole.monitor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.wole.monitor.dao.ServiceDao;
import com.wole.servicemonitor.util.ServiceUtils;
/**
* 管理并调度某一个服务数据源的监控池
* @author yzygenuine
*
*/
public class MonitorsManage {
private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);
private ServiceDao dao;
/**
* 执行的一个并发池
*/
private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
/**
*
*/
private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor);
/**
* 正在执行中的MonitorService集合
*/
private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>();
/**
* 等待优先级队列
*/
private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>();
/**
* 执行队列
*/
private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>();
/**
* 是否关闭
*/
private AtomicBoolean isClose = new AtomicBoolean(false);
/**
* 生产者启动时间
*/
private AtomicLong startTime = new AtomicLong(0);
/**
* 相对于启动的间隔时间
*/
private AtomicLong intervalTime = new AtomicLong(0);
public void close() {
logger.info("closing................");
isClose.compareAndSet(false, true);
}
public void init() {
logger.info("初始化");
}
public void work() {
logger.info("开始工作");
// 生产者启动工作
Thread productThread = new Thread(new ProductMonitor(1000));
// 消费者启动工作
Thread consumerThread = new Thread(new ConsumerMonitor(1000));
// 回收者启动工作
Thread recoverThread = new Thread(new RecoverMonitor(1000));
// 启动定时加载数据工作
Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
productThread.start();
consumerThread.start();
recoverThread.start();
refreshThread.start();
}
/**
* 生产者
*
* @author yzygenuine
*
*/
class ProductMonitor implements Runnable {
long sleepTime = 1000;
public ProductMonitor(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public void run() {
logger.info("生产者开启工作");
// 开始进行定时监控
long now = System.currentTimeMillis();
long lastTime = now;
startTime.addAndGet(now);
try {
do {
Thread.sleep(sleepTime);
logger.debug("生产者休息{}ms", sleepTime);
now = System.currentTimeMillis();
intervalTime.addAndGet(now - lastTime);
while (sleepQueue.size() > 0) {
MonitorService service = sleepQueue.peek();
if (service.getCurrentTime() - intervalTime.get() < 1) {
service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列
if (!currentSet.contains(service)) {
logger.info("service {} 已被删除,不加入执行队列了", service.toString());
continue;
}
executeQueue.add(service);
} else {
logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get());
break;
}
}
if (sleepQueue.size() <= 0) {
logger.debug("生产队列为空");
}
lastTime = now;
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/**
* 消费者
*
* @author yzygenuine
*
*/
class ConsumerMonitor implements Runnable {
long sleepTime = 1000;
public ConsumerMonitor(long sleepTime) {
this.sleepTime = sleepTime;
if (sleepTime < 1000) {
throw new RuntimeException("请配置sleepTime值大一些");
}
}
@Override
public void run() {
logger.info("消费者开启工作");
try {
do {
Thread.sleep(sleepTime);
logger.debug("消费者休息{}ms", sleepTime);
while (executeQueue.size() > 0) {
final MonitorService service = executeQueue.poll();
completionService.submit(new ExecuteCallable(service));
}
logger.debug("消费队列为空");
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/**
* 执行回调类
*
* @author yzygenuine
*
*/
class ExecuteCallable implements Callable<Response> {
final MonitorService service;
public ExecuteCallable(MonitorService service) {
this.service = service;
}
@Override
public Response call() throws Exception {
logger.debug("执行");
Map<String, String> r = new HashMap<String, String>();
Response response = new Response();
response.service = service;
response.response = r;
Monitor m = MonitorFactory.getMonitor(service);
response.isNeedWarn = m.isNeedWarnging(service, r);
if (response.isNeedWarn) {
response.isSucToNotify = m.sendNotify(service, r);
}
return response;
}
}
/**
* 回收者
*
* @author yzygenuine
*
*/
class RecoverMonitor implements Runnable {
private long sleepTime = 1000;
private long count = 0;
public RecoverMonitor(long sleepTime) {
this.sleepTime = sleepTime;
if (sleepTime < 1000) {
throw new RuntimeException("请配置sleepTime值大一些");
}
}
@Override
public void run() {
logger.info("回收者开启工作");
try {
do {
// Thread.sleep(sleepTime);
Future<Response> response = completionService.take();
// 重置后进入休眠队列
MonitorService s = response.get().service;
if (!currentSet.contains(s)) {
logger.info("service {} 已被删除,不回收了", s.toString());
continue;
}
// 当前程序已运动的时间+相对间隔时间=绝对的间隔时间
s.setCurrentTime(s.getIntervalTime() + intervalTime.get());
sleepQueue.add(s);
count++;
logger.info("回收,当前回收数量:" + count);
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/** * 加载新的数据 * * @author yzygenuine * */
class RefreshMonitorService implements Runnable {
private long sleepTime = 1000;
private ServiceDao dao;
public RefreshMonitorService(long sleepTime, ServiceDao dao) {
this.sleepTime = sleepTime;
if (sleepTime < 60000) {
logger.warn("刷新加载数据的间隔时间不能太短");
throw new RuntimeException("刷新加载数据的间隔时间不能太短");
}
this.dao = dao;
}
private void firstLoad() {
List<MonitorService> monitorService = dao.getService();
logger.info("加载记录:" + monitorService.size());
// 将被监控服务加入优先级队列里
for (int j = 0; j < monitorService.size(); j++) {
MonitorService service = monitorService.get(j);
// 初始化好时间
service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
currentSet.add(service);
sleepQueue.add(service);
}
}
@Override
public void run() {
logger.info("读取新的service开启工作");
firstLoad();
try {
do {
logger.info("定时加载新的数据监听者休息{}ms", sleepTime);
Thread.sleep(sleepTime);
logger.info("##########开始执行更新数据############");
// 加载新的所有所数据 ,与当前的数据比较
List<MonitorService> deleteList = dao.deleteService();
List<MonitorService> addList = dao.incrementalService();
logger.info("删除旧的数据共:{}", deleteList.size());
currentSet.removeAll(deleteList);
logger.info("增加新的数据共:{}", addList.size());
currentSet.addAll(addList);
logger.info("更新后的currentSet size:{}", currentSet.size());
for (MonitorService service : addList) {
// 初始化绝对间隔时间
service.setCurrentTime(service.getIntervalTime() + intervalTime.get());
sleepQueue.add(service);
}
logger.info("########这一轮更新结束");
} while (!isClose.get());
} catch (Exception e) {
logger.error("", e);
}
}
}
/**
* 响应的封装类
*
* @author yzygenuine
*
*/
class Response {
public Map<String, String> response;
public MonitorService service;
public boolean isNeedWarn;
public boolean isSucToNotify;
}
public void setDao(ServiceDao dao) {
this.dao = dao;
}
}
(责任编辑:IT)
package com.wole.monitor; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.wole.monitor.dao.ServiceDao; import com.wole.servicemonitor.util.ServiceUtils; /** * 管理并调度某一个服务数据源的监控池 * @author yzygenuine * */ public class MonitorsManage { private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class); private ServiceDao dao; /** * 执行的一个并发池 */ private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); /** * */ private CompletionService<Response> completionService = new ExecutorCompletionService<Response>(commExecutor); /** * 正在执行中的MonitorService集合 */ private ConcurrentHashSet<MonitorService> currentSet = new ConcurrentHashSet<MonitorService>(); /** * 等待优先级队列 */ private Queue<MonitorService> sleepQueue = new PriorityBlockingQueue<MonitorService>(); /** * 执行队列 */ private Queue<MonitorService> executeQueue = new LinkedBlockingQueue<MonitorService>(); /** * 是否关闭 */ private AtomicBoolean isClose = new AtomicBoolean(false); /** * 生产者启动时间 */ private AtomicLong startTime = new AtomicLong(0); /** * 相对于启动的间隔时间 */ private AtomicLong intervalTime = new AtomicLong(0); public void close() { logger.info("closing................"); isClose.compareAndSet(false, true); } public void init() { logger.info("初始化"); } public void work() { logger.info("开始工作"); // 生产者启动工作 Thread productThread = new Thread(new ProductMonitor(1000)); // 消费者启动工作 Thread consumerThread = new Thread(new ConsumerMonitor(1000)); // 回收者启动工作 Thread recoverThread = new Thread(new RecoverMonitor(1000)); // 启动定时加载数据工作 Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao)); productThread.start(); consumerThread.start(); recoverThread.start(); refreshThread.start(); } /** * 生产者 * * @author yzygenuine * */ class ProductMonitor implements Runnable { long sleepTime = 1000; public ProductMonitor(long sleepTime) { this.sleepTime = sleepTime; } @Override public void run() { logger.info("生产者开启工作"); // 开始进行定时监控 long now = System.currentTimeMillis(); long lastTime = now; startTime.addAndGet(now); try { do { Thread.sleep(sleepTime); logger.debug("生产者休息{}ms", sleepTime); now = System.currentTimeMillis(); intervalTime.addAndGet(now - lastTime); while (sleepQueue.size() > 0) { MonitorService service = sleepQueue.peek(); if (service.getCurrentTime() - intervalTime.get() < 1) { service = sleepQueue.poll();// 出队并检查是否被删除,如果没被删除则进入执行队列 if (!currentSet.contains(service)) { logger.info("service {} 已被删除,不加入执行队列了", service.toString()); continue; } executeQueue.add(service); } else { logger.debug("还有{}秒可执行", service.getCurrentTime() - intervalTime.get()); break; } } if (sleepQueue.size() <= 0) { logger.debug("生产队列为空"); } lastTime = now; } while (!isClose.get()); } catch (Exception e) { logger.error("", e); } } } /** * 消费者 * * @author yzygenuine * */ class ConsumerMonitor implements Runnable { long sleepTime = 1000; public ConsumerMonitor(long sleepTime) { this.sleepTime = sleepTime; if (sleepTime < 1000) { throw new RuntimeException("请配置sleepTime值大一些"); } } @Override public void run() { logger.info("消费者开启工作"); try { do { Thread.sleep(sleepTime); logger.debug("消费者休息{}ms", sleepTime); while (executeQueue.size() > 0) { final MonitorService service = executeQueue.poll(); completionService.submit(new ExecuteCallable(service)); } logger.debug("消费队列为空"); } while (!isClose.get()); } catch (Exception e) { logger.error("", e); } } } /** * 执行回调类 * * @author yzygenuine * */ class ExecuteCallable implements Callable<Response> { final MonitorService service; public ExecuteCallable(MonitorService service) { this.service = service; } @Override public Response call() throws Exception { logger.debug("执行"); Map<String, String> r = new HashMap<String, String>(); Response response = new Response(); response.service = service; response.response = r; Monitor m = MonitorFactory.getMonitor(service); response.isNeedWarn = m.isNeedWarnging(service, r); if (response.isNeedWarn) { response.isSucToNotify = m.sendNotify(service, r); } return response; } } /** * 回收者 * * @author yzygenuine * */ class RecoverMonitor implements Runnable { private long sleepTime = 1000; private long count = 0; public RecoverMonitor(long sleepTime) { this.sleepTime = sleepTime; if (sleepTime < 1000) { throw new RuntimeException("请配置sleepTime值大一些"); } } @Override public void run() { logger.info("回收者开启工作"); try { do { // Thread.sleep(sleepTime); Future<Response> response = completionService.take(); // 重置后进入休眠队列 MonitorService s = response.get().service; if (!currentSet.contains(s)) { logger.info("service {} 已被删除,不回收了", s.toString()); continue; } // 当前程序已运动的时间+相对间隔时间=绝对的间隔时间 s.setCurrentTime(s.getIntervalTime() + intervalTime.get()); sleepQueue.add(s); count++; logger.info("回收,当前回收数量:" + count); } while (!isClose.get()); } catch (Exception e) { logger.error("", e); } } } /** * 加载新的数据 * * @author yzygenuine * */ class RefreshMonitorService implements Runnable { private long sleepTime = 1000; private ServiceDao dao; public RefreshMonitorService(long sleepTime, ServiceDao dao) { this.sleepTime = sleepTime; if (sleepTime < 60000) { logger.warn("刷新加载数据的间隔时间不能太短"); throw new RuntimeException("刷新加载数据的间隔时间不能太短"); } this.dao = dao; } private void firstLoad() { List<MonitorService> monitorService = dao.getService(); logger.info("加载记录:" + monitorService.size()); // 将被监控服务加入优先级队列里 for (int j = 0; j < monitorService.size(); j++) { MonitorService service = monitorService.get(j); // 初始化好时间 service.setCurrentTime(service.getIntervalTime() + intervalTime.get()); currentSet.add(service); sleepQueue.add(service); } } @Override public void run() { logger.info("读取新的service开启工作"); firstLoad(); try { do { logger.info("定时加载新的数据监听者休息{}ms", sleepTime); Thread.sleep(sleepTime); logger.info("##########开始执行更新数据############"); // 加载新的所有所数据 ,与当前的数据比较 List<MonitorService> deleteList = dao.deleteService(); List<MonitorService> addList = dao.incrementalService(); logger.info("删除旧的数据共:{}", deleteList.size()); currentSet.removeAll(deleteList); logger.info("增加新的数据共:{}", addList.size()); currentSet.addAll(addList); logger.info("更新后的currentSet size:{}", currentSet.size()); for (MonitorService service : addList) { // 初始化绝对间隔时间 service.setCurrentTime(service.getIntervalTime() + intervalTime.get()); sleepQueue.add(service); } logger.info("########这一轮更新结束"); } while (!isClose.get()); } catch (Exception e) { logger.error("", e); } } } /** * 响应的封装类 * * @author yzygenuine * */ class Response { public Map<String, String> response; public MonitorService service; public boolean isNeedWarn; public boolean isSucToNotify; } public void setDao(ServiceDao dao) { this.dao = dao; } } (责任编辑:IT) |