From 887654e9353c477337abad2109d5c6d2b9f599a9 Mon Sep 17 00:00:00 2001 From: jiangzhongxiang Date: Wed, 19 Jun 2019 14:04:19 +0800 Subject: [PATCH] kafka ok --- .../game/controller/GameController.java | 102 ++-------- .../game/service/EvaluatingService.java | 4 +- .../thread/BridgeThreadPoolTaskExecutor.java | 174 ++++++++++-------- src/main/resources/logback_local.xml | 15 +- src/main/resources/logback_test.xml | 1 + 5 files changed, 128 insertions(+), 168 deletions(-) diff --git a/src/main/java/com/educoder/bridge/game/controller/GameController.java b/src/main/java/com/educoder/bridge/game/controller/GameController.java index d637cc5..b682ce4 100644 --- a/src/main/java/com/educoder/bridge/game/controller/GameController.java +++ b/src/main/java/com/educoder/bridge/game/controller/GameController.java @@ -27,10 +27,8 @@ import com.educoder.bridge.common.utils.GameHelper; import com.educoder.bridge.common.utils.JedisUtil; import com.educoder.bridge.common.utils.PortUtil; import com.educoder.bridge.common.utils.TimeHelper; -import com.educoder.bridge.game.model.Evaluating; import com.educoder.bridge.game.service.GameService; import com.educoder.bridge.game.service.K8sService; -import com.educoder.bridge.game.thread.BuildThread; import com.educoder.bridge.k8s.model.BridgePod; import com.educoder.bridge.k8s.service.BridgePodService; @@ -54,14 +52,14 @@ public class GameController { private GameService gameService; @Autowired private K8sService k8sService; - + @Autowired private BridgePodService bridgePodService; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; - - @Qualifier("evaluatingKafkaTemplate") + + @Qualifier("evaluatingKafkaTemplate") @Autowired private KafkaTemplate evaluatingKafkaTemplate; @@ -114,12 +112,10 @@ public class GameController { @ApiParam(name = "file", required = false, value = "需要传文件的实训,给出文件存放路径(一个目录)及文件类型") String file, @ApiParam(name = "containers", required = true, value = "需要使用的容器,base64编码") @RequestParam String containers, @ApiParam(name = "content_modified", required = true, value = "文件是否修改的标志") @RequestParam Integer content_modified, - @ApiParam(name = "sec_key", required = false, value = "每一次评测的唯一标识") String sec_key) - throws Exception { - logger.info( - "评测:tpiID: {}, tpiGitURL: {}, buildID: {}, isPublished: {}, instanceChallenge: {}, " - + "testCases: {}, tpmScript: {}, timeLimit: {}, resubmit: {}, " - + "times: {}, needPortMapping: {}, podType: {}, file: {}, containers: {}, content_modified: {}, sec_key: {}", + @ApiParam(name = "sec_key", required = false, value = "每一次评测的唯一标识") String sec_key) throws Exception { + logger.info("评测:tpiID: {}, tpiGitURL: {}, buildID: {}, isPublished: {}, instanceChallenge: {}, " + + "testCases: {}, tpmScript: {}, timeLimit: {}, resubmit: {}, " + + "times: {}, needPortMapping: {}, podType: {}, file: {}, containers: {}, content_modified: {}, sec_key: {}", tpiID, tpiGitURL, buildID, isPublished, instanceChallenge, testCases, tpmScript, timeLimit, resubmit, times, needPortMapping, podType, file, containers, content_modified, sec_key); @@ -128,7 +124,7 @@ public class GameController { String evaluateStartTime = requestTime.toString(); JSONObject cost = new JSONObject(); cost.put("evaluateStartTime", evaluateStartTime); - JedisUtil.set("timeCost:" + tpiID + ":" + buildID, cost.toJSONString()); + JedisUtil.set("timeCost:" + tpiID + ":" + buildID, cost.toJSONString()); JSONObject response = new JSONObject(); @@ -172,7 +168,7 @@ public class GameController { // podName String podName = podType != 2 ? "evaluate-" + tpiID : "evassh-" + tpiID; - + BridgePod bridgePod = bridgePodService.createBridgePod(podName, tpiID, requestTime, sec_key); buildParams.put("bridgePodId", bridgePod.getId()); @@ -190,83 +186,19 @@ public class GameController { String task = buildParams.toJSONString(); sendToKafka(buildParams); // 最大running pod数量 - int maxRunningPodNum = Integer.parseInt(appConfig.getMaxRunningPodNum()); - // 此次请求只为轮询时间,只返回轮询的时间结果 - if (times != 1) { - logger.debug("轮询,times: {}", times); - double buildRank; + response.put("ableToCreate", 1); + response.put("costTime", System.currentTimeMillis() - TimeHelper.convertTimeToMillis(evaluateStartTime)); + response.put("waitNum", 0); + response.put("code", 0); + response.put("port", port); + response.put("msg", "评测完成"); - try { - buildRank = JedisUtil.zrank("task", task); - } catch (Exception e) { - response.put("ableToCreate", 0); - response.put("waitNum", maxRunningPodNum + JedisUtil.zlen("task")); - response.put("code", 0); - response.put("msg", "等待评测"); - return response; - } - - if (buildRank != Double.MIN_VALUE) { - logger.debug("任务:{} : 还在等待队列中!", task); - - response.put("ableToCreate", 0); - response.put("waitNum", buildRank == Double.MAX_VALUE ? maxRunningPodNum + JedisUtil.zlen("task") - : buildRank + maxRunningPodNum); - response.put("code", 0); - response.put("msg", "等待评测"); - return response; - } else { - // 轮询时间请求,发现目标任务不在队列,就直接认为其正在运行 - logger.debug("任务:{} : 正在执行中!", task); - - response.put("ableToCreate", 1); - response.put("waitNum", 0); - response.put("code", 0); - response.put("port", Integer.parseInt(port)); - response.put("msg", "正在评测"); - return response; - } - } - - // 判断是否可以立即执行(若pod已经存在,或是,pod不存在但是此时可以创建pod) - boolean executeImmediately = k8sService.isPodRunning("tpiID", tpiID); - if(!executeImmediately) { - // 获取Running状态的pod数 - int runningPodNum = k8sService.getRunningPodNum(); - boolean able = true; // k8sService.ableToEvaluate(); - executeImmediately = (runningPodNum < maxRunningPodNum && able); - } - if (executeImmediately) { - // 直接执行任务 - BuildThread buildThread = gameService.getBuildThread(buildParams); - threadPoolTaskExecutor.execute(buildThread); - - response.put("ableToCreate", 1); - response.put("costTime", System.currentTimeMillis() - TimeHelper.convertTimeToMillis(evaluateStartTime)); - response.put("waitNum", 0); - response.put("code", 0); - response.put("port", port); - response.put("msg", "评测完成"); - - logger.debug("直接执行任务!task: {}, tpi: {}", task, tpiID); - } else { - // 否则,将构建任务推入redis - JedisUtil.zpush("task", task); - - // 评测任务等待执行 - response.put("ableToCreate", 0); - // 在等待队列中的位置 - response.put("waitNum", JedisUtil.zlen("task") + maxRunningPodNum); - response.put("code", 0); - response.put("msg", "等待评测"); - - logger.debug("任务入队等待!task: {}, tpi: {}", task, tpiID); - } + logger.debug("直接执行任务!task: {}, tpi: {}", task, tpiID); return response; } - + private void sendToKafka(JSONObject buildParams) { final ProducerRecord record = new ProducerRecord("evaluating", buildParams); diff --git a/src/main/java/com/educoder/bridge/game/service/EvaluatingService.java b/src/main/java/com/educoder/bridge/game/service/EvaluatingService.java index f1e309f..9ccd19b 100644 --- a/src/main/java/com/educoder/bridge/game/service/EvaluatingService.java +++ b/src/main/java/com/educoder/bridge/game/service/EvaluatingService.java @@ -29,7 +29,7 @@ public class EvaluatingService { @Autowired private GameService gameService; - //@PostConstruct + @PostConstruct public void init() { evaluatingConsumer.subscribe(Arrays.asList("evaluating")); new Thread(() -> { @@ -44,7 +44,7 @@ public class EvaluatingService { ConsumerRecords records = evaluatingConsumer.poll(100);// 当前每次只获取一个 for (ConsumerRecord record : records) { JSONObject buildParams = record.value(); - logger.info("获取的评测请求:{}", record.value()); + logger.info("获取的评测请求:{}", buildParams); BuildThread buildThread = gameService.getBuildThread(buildParams); threadPoolTaskExecutor.execute(buildThread); } diff --git a/src/main/java/com/educoder/bridge/game/thread/BridgeThreadPoolTaskExecutor.java b/src/main/java/com/educoder/bridge/game/thread/BridgeThreadPoolTaskExecutor.java index 631af7a..6d01588 100644 --- a/src/main/java/com/educoder/bridge/game/thread/BridgeThreadPoolTaskExecutor.java +++ b/src/main/java/com/educoder/bridge/game/thread/BridgeThreadPoolTaskExecutor.java @@ -2,7 +2,12 @@ package com.educoder.bridge.game.thread; import java.lang.reflect.Field; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.MDC; import org.springframework.core.task.TaskDecorator; @@ -15,98 +20,107 @@ import com.educoder.bridge.game.exception.UncaughtBridgeExceptionHandler; @SuppressWarnings("serial") public class BridgeThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { - private int queueCapacity = Integer.MAX_VALUE; + private int queueCapacity = Integer.MAX_VALUE; - private TaskDecorator taskDecorator; + private TaskDecorator taskDecorator; - private boolean allowCoreThreadTimeOut = false; + private boolean allowCoreThreadTimeOut = false; - /** - * 所有线程都会委托给这个execute方法,在这个方法中我们把父线程的MDC内容赋值给子线程 - * https://logback.qos.ch/manual/mdc.html#managedThreads - * - * @param runnable - */ - @Override - public void execute(Runnable runnable) { - // 获取父线程MDC中的内容,必须在run方法之前,否则等异步线程执行的时候有可能MDC里面的值已经被清空了,这个时候就会返回null - Map context = MDC.getCopyOfContextMap(); - super.execute(() -> run(runnable, context)); - } + /** + * 所有线程都会委托给这个execute方法,在这个方法中我们把父线程的MDC内容赋值给子线程 + * https://logback.qos.ch/manual/mdc.html#managedThreads + * + * @param runnable + */ + @Override + public void execute(Runnable runnable) { + // 获取父线程MDC中的内容,必须在run方法之前,否则等异步线程执行的时候有可能MDC里面的值已经被清空了,这个时候就会返回null + Map context = MDC.getCopyOfContextMap(); + super.execute(() -> run(runnable, context)); + } - /** - * 子线程委托的执行方法 - * - * @param runnable {@link Runnable} - * @param context 父线程MDC内容 - */ - private void run(Runnable runnable, Map context) { - // 将父线程的MDC内容传给子线程 - MDC.setContextMap(context); - try { - // 执行异步操作 - runnable.run(); - } finally { - // 清空MDC内容 - MDC.clear(); - } - } + /** + * 子线程委托的执行方法 + * + * @param runnable + * {@link Runnable} + * @param context + * 父线程MDC内容 + */ + private void run(Runnable runnable, Map context) { + // 将父线程的MDC内容传给子线程 + String secKey = "sec_key"; + if (context == null) {// 从 kafka读取评测消息的情形 + BuildThread th = (BuildThread) runnable; + String key = th.getBuildParams().getString(secKey); + MDC.put(secKey, key); + } else { + MDC.setContextMap(context); + } + try { + // 执行异步操作 + runnable.run(); + } finally { + // 清空MDC内容 + MDC.clear(); + } + } - @Override - public Thread newThread(Runnable runnable) { - Thread t = super.newThread(runnable); - UncaughtBridgeExceptionHandler eh = BeanFactory.getObejct(UncaughtBridgeExceptionHandler.class); - t.setUncaughtExceptionHandler(eh); - return t; - } + @Override + public Thread newThread(Runnable runnable) { + Thread t = super.newThread(runnable); + UncaughtBridgeExceptionHandler eh = BeanFactory.getObejct(UncaughtBridgeExceptionHandler.class); + t.setUncaughtExceptionHandler(eh); + return t; + } - @Override - protected ExecutorService initializeExecutor(ThreadFactory threadFactory, - RejectedExecutionHandler rejectedExecutionHandler) { - BlockingQueue queue = createQueue(this.queueCapacity); + @Override + protected ExecutorService initializeExecutor(ThreadFactory threadFactory, + RejectedExecutionHandler rejectedExecutionHandler) { + BlockingQueue queue = createQueue(this.queueCapacity); - ThreadPoolExecutor executor; - if (this.taskDecorator != null) { - // 使用 BridgeThreadPoolExecutor 执行 afterExecute, 处理异常情况 - executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(), - super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { - @Override - public void execute(Runnable command) { - super.execute(taskDecorator.decorate(command)); - } - }; - } else { - // 使用 BridgeThreadPoolExecutor 执行 afterExecute, 处理异常情况 - executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(), - super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); + ThreadPoolExecutor executor; + if (this.taskDecorator != null) { + // 使用 BridgeThreadPoolExecutor 执行 afterExecute, 处理异常情况 + executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(), + super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { + @Override + public void execute(Runnable command) { + super.execute(taskDecorator.decorate(command)); + } + }; + } else { + // 使用 BridgeThreadPoolExecutor 执行 afterExecute, 处理异常情况 + executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(), + super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); - } + } - if (this.allowCoreThreadTimeOut) { - executor.allowCoreThreadTimeOut(true); - } + if (this.allowCoreThreadTimeOut) { + executor.allowCoreThreadTimeOut(true); + } - Field field = ReflectionUtils.findField(ThreadPoolTaskExecutor.class, "threadPoolExecutor", - ThreadPoolExecutor.class); - field.setAccessible(true); - ReflectionUtils.setField(field, this, executor); - return executor; - } + Field field = ReflectionUtils.findField(ThreadPoolTaskExecutor.class, "threadPoolExecutor", + ThreadPoolExecutor.class); + field.setAccessible(true); + ReflectionUtils.setField(field, this, executor); + return executor; + } - public void setQueueCapacity(int queueCapacity) { - super.setQueueCapacity(queueCapacity); - this.queueCapacity = queueCapacity; + public void setQueueCapacity(int queueCapacity) { + super.setQueueCapacity(queueCapacity); + this.queueCapacity = queueCapacity; - } + } - public void setTaskDecorator(TaskDecorator taskDecorator) { - super.setTaskDecorator(taskDecorator); - this.taskDecorator = taskDecorator; - } + public void setTaskDecorator(TaskDecorator taskDecorator) { + super.setTaskDecorator(taskDecorator); + this.taskDecorator = taskDecorator; + } - public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { - super.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut); - this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; - } + public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { + super.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut); + this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; + } } diff --git a/src/main/resources/logback_local.xml b/src/main/resources/logback_local.xml index b8da592..9929033 100644 --- a/src/main/resources/logback_local.xml +++ b/src/main/resources/logback_local.xml @@ -19,6 +19,18 @@ ${log_path}bridge.%d{yyyy-MM-dd}.log + + + + %d{MM-dd HH:mm:ss} [%X{sec_key}] [%thread] %-5level -- %msg%n + + + info + + + ${log_path}kafka.%d{yyyy-MM-dd}.log + + @@ -26,7 +38,8 @@ - + + diff --git a/src/main/resources/logback_test.xml b/src/main/resources/logback_test.xml index ee9ee1e..2c7deec 100644 --- a/src/main/resources/logback_test.xml +++ b/src/main/resources/logback_test.xml @@ -19,6 +19,7 @@ +