This commit is contained in:
jiangzhongxiang 2019-06-19 14:04:19 +08:00
parent 7cfe44a4f6
commit 887654e935
5 changed files with 128 additions and 168 deletions

View File

@ -27,10 +27,8 @@ import com.educoder.bridge.common.utils.GameHelper;
import com.educoder.bridge.common.utils.JedisUtil;
import com.educoder.bridge.common.utils.PortUtil;
import com.educoder.bridge.common.utils.TimeHelper;
import com.educoder.bridge.game.model.Evaluating;
import com.educoder.bridge.game.service.GameService;
import com.educoder.bridge.game.service.K8sService;
import com.educoder.bridge.game.thread.BuildThread;
import com.educoder.bridge.k8s.model.BridgePod;
import com.educoder.bridge.k8s.service.BridgePodService;
@ -54,14 +52,14 @@ public class GameController {
private GameService gameService;
@Autowired
private K8sService k8sService;
@Autowired
private BridgePodService bridgePodService;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Qualifier("evaluatingKafkaTemplate")
@Qualifier("evaluatingKafkaTemplate")
@Autowired
private KafkaTemplate<Integer, JSONObject> evaluatingKafkaTemplate;
@ -114,12 +112,10 @@ public class GameController {
@ApiParam(name = "file", required = false, value = "需要传文件的实训,给出文件存放路径(一个目录)及文件类型") String file,
@ApiParam(name = "containers", required = true, value = "需要使用的容器,base64编码") @RequestParam String containers,
@ApiParam(name = "content_modified", required = true, value = "文件是否修改的标志") @RequestParam Integer content_modified,
@ApiParam(name = "sec_key", required = false, value = "每一次评测的唯一标识") String sec_key)
throws Exception {
logger.info(
"评测tpiID: {}, tpiGitURL: {}, buildID: {}, isPublished: {}, instanceChallenge: {}, "
+ "testCases: {}, tpmScript: {}, timeLimit: {}, resubmit: {}, "
+ "times: {}, needPortMapping: {}, podType: {}, file: {}, containers: {}, content_modified: {}, sec_key: {}",
@ApiParam(name = "sec_key", required = false, value = "每一次评测的唯一标识") String sec_key) throws Exception {
logger.info("评测tpiID: {}, tpiGitURL: {}, buildID: {}, isPublished: {}, instanceChallenge: {}, "
+ "testCases: {}, tpmScript: {}, timeLimit: {}, resubmit: {}, "
+ "times: {}, needPortMapping: {}, podType: {}, file: {}, containers: {}, content_modified: {}, sec_key: {}",
tpiID, tpiGitURL, buildID, isPublished, instanceChallenge, testCases, tpmScript, timeLimit, resubmit,
times, needPortMapping, podType, file, containers, content_modified, sec_key);
@ -128,7 +124,7 @@ public class GameController {
String evaluateStartTime = requestTime.toString();
JSONObject cost = new JSONObject();
cost.put("evaluateStartTime", evaluateStartTime);
JedisUtil.set("timeCost:" + tpiID + ":" + buildID, cost.toJSONString());
JedisUtil.set("timeCost:" + tpiID + ":" + buildID, cost.toJSONString());
JSONObject response = new JSONObject();
@ -172,7 +168,7 @@ public class GameController {
// podName
String podName = podType != 2 ? "evaluate-" + tpiID : "evassh-" + tpiID;
BridgePod bridgePod = bridgePodService.createBridgePod(podName, tpiID, requestTime, sec_key);
buildParams.put("bridgePodId", bridgePod.getId());
@ -190,83 +186,19 @@ public class GameController {
String task = buildParams.toJSONString();
sendToKafka(buildParams);
// 最大running pod数量
int maxRunningPodNum = Integer.parseInt(appConfig.getMaxRunningPodNum());
// 此次请求只为轮询时间只返回轮询的时间结果
if (times != 1) {
logger.debug("轮询times: {}", times);
double buildRank;
response.put("ableToCreate", 1);
response.put("costTime", System.currentTimeMillis() - TimeHelper.convertTimeToMillis(evaluateStartTime));
response.put("waitNum", 0);
response.put("code", 0);
response.put("port", port);
response.put("msg", "评测完成");
try {
buildRank = JedisUtil.zrank("task", task);
} catch (Exception e) {
response.put("ableToCreate", 0);
response.put("waitNum", maxRunningPodNum + JedisUtil.zlen("task"));
response.put("code", 0);
response.put("msg", "等待评测");
return response;
}
if (buildRank != Double.MIN_VALUE) {
logger.debug("任务:{} : 还在等待队列中!", task);
response.put("ableToCreate", 0);
response.put("waitNum", buildRank == Double.MAX_VALUE ? maxRunningPodNum + JedisUtil.zlen("task")
: buildRank + maxRunningPodNum);
response.put("code", 0);
response.put("msg", "等待评测");
return response;
} else {
// 轮询时间请求发现目标任务不在队列就直接认为其正在运行
logger.debug("任务:{} : 正在执行中!", task);
response.put("ableToCreate", 1);
response.put("waitNum", 0);
response.put("code", 0);
response.put("port", Integer.parseInt(port));
response.put("msg", "正在评测");
return response;
}
}
// 判断是否可以立即执行(若pod已经存在或是pod不存在但是此时可以创建pod)
boolean executeImmediately = k8sService.isPodRunning("tpiID", tpiID);
if(!executeImmediately) {
// 获取Running状态的pod数
int runningPodNum = k8sService.getRunningPodNum();
boolean able = true; // k8sService.ableToEvaluate();
executeImmediately = (runningPodNum < maxRunningPodNum && able);
}
if (executeImmediately) {
// 直接执行任务
BuildThread buildThread = gameService.getBuildThread(buildParams);
threadPoolTaskExecutor.execute(buildThread);
response.put("ableToCreate", 1);
response.put("costTime", System.currentTimeMillis() - TimeHelper.convertTimeToMillis(evaluateStartTime));
response.put("waitNum", 0);
response.put("code", 0);
response.put("port", port);
response.put("msg", "评测完成");
logger.debug("直接执行任务task: {}, tpi: {}", task, tpiID);
} else {
// 否则将构建任务推入redis
JedisUtil.zpush("task", task);
// 评测任务等待执行
response.put("ableToCreate", 0);
// 在等待队列中的位置
response.put("waitNum", JedisUtil.zlen("task") + maxRunningPodNum);
response.put("code", 0);
response.put("msg", "等待评测");
logger.debug("任务入队等待!task: {}, tpi: {}", task, tpiID);
}
logger.debug("直接执行任务task: {}, tpi: {}", task, tpiID);
return response;
}
private void sendToKafka(JSONObject buildParams) {
final ProducerRecord<Integer, JSONObject> record = new ProducerRecord<Integer, JSONObject>("evaluating",
buildParams);

View File

@ -29,7 +29,7 @@ public class EvaluatingService {
@Autowired
private GameService gameService;
//@PostConstruct
@PostConstruct
public void init() {
evaluatingConsumer.subscribe(Arrays.asList("evaluating"));
new Thread(() -> {
@ -44,7 +44,7 @@ public class EvaluatingService {
ConsumerRecords<Integer, JSONObject> records = evaluatingConsumer.poll(100);// 当前每次只获取一个
for (ConsumerRecord<Integer, JSONObject> record : records) {
JSONObject buildParams = record.value();
logger.info("获取的评测请求:{}", record.value());
logger.info("获取的评测请求:{}", buildParams);
BuildThread buildThread = gameService.getBuildThread(buildParams);
threadPoolTaskExecutor.execute(buildThread);
}

View File

@ -2,7 +2,12 @@ package com.educoder.bridge.game.thread;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;
@ -15,98 +20,107 @@ import com.educoder.bridge.game.exception.UncaughtBridgeExceptionHandler;
@SuppressWarnings("serial")
public class BridgeThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private int queueCapacity = Integer.MAX_VALUE;
private int queueCapacity = Integer.MAX_VALUE;
private TaskDecorator taskDecorator;
private TaskDecorator taskDecorator;
private boolean allowCoreThreadTimeOut = false;
private boolean allowCoreThreadTimeOut = false;
/**
* 所有线程都会委托给这个execute方法在这个方法中我们把父线程的MDC内容赋值给子线程
* https://logback.qos.ch/manual/mdc.html#managedThreads
*
* @param runnable
*/
@Override
public void execute(Runnable runnable) {
// 获取父线程MDC中的内容必须在run方法之前否则等异步线程执行的时候有可能MDC里面的值已经被清空了这个时候就会返回null
Map<String, String> context = MDC.getCopyOfContextMap();
super.execute(() -> run(runnable, context));
}
/**
* 所有线程都会委托给这个execute方法在这个方法中我们把父线程的MDC内容赋值给子线程
* https://logback.qos.ch/manual/mdc.html#managedThreads
*
* @param runnable
*/
@Override
public void execute(Runnable runnable) {
// 获取父线程MDC中的内容必须在run方法之前否则等异步线程执行的时候有可能MDC里面的值已经被清空了这个时候就会返回null
Map<String, String> context = MDC.getCopyOfContextMap();
super.execute(() -> run(runnable, context));
}
/**
* 子线程委托的执行方法
*
* @param runnable {@link Runnable}
* @param context 父线程MDC内容
*/
private void run(Runnable runnable, Map<String, String> context) {
// 将父线程的MDC内容传给子线程
MDC.setContextMap(context);
try {
// 执行异步操作
runnable.run();
} finally {
// 清空MDC内容
MDC.clear();
}
}
/**
* 子线程委托的执行方法
*
* @param runnable
* {@link Runnable}
* @param context
* 父线程MDC内容
*/
private void run(Runnable runnable, Map<String, String> context) {
// 将父线程的MDC内容传给子线程
String secKey = "sec_key";
if (context == null) {// kafka读取评测消息的情形
BuildThread th = (BuildThread) runnable;
String key = th.getBuildParams().getString(secKey);
MDC.put(secKey, key);
} else {
MDC.setContextMap(context);
}
try {
// 执行异步操作
runnable.run();
} finally {
// 清空MDC内容
MDC.clear();
}
}
@Override
public Thread newThread(Runnable runnable) {
Thread t = super.newThread(runnable);
UncaughtBridgeExceptionHandler eh = BeanFactory.getObejct(UncaughtBridgeExceptionHandler.class);
t.setUncaughtExceptionHandler(eh);
return t;
}
@Override
public Thread newThread(Runnable runnable) {
Thread t = super.newThread(runnable);
UncaughtBridgeExceptionHandler eh = BeanFactory.getObejct(UncaughtBridgeExceptionHandler.class);
t.setUncaughtExceptionHandler(eh);
return t;
}
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
// 使用 BridgeThreadPoolExecutor 执行 afterExecute 处理异常情况
executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(),
super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
super.execute(taskDecorator.decorate(command));
}
};
} else {
// 使用 BridgeThreadPoolExecutor 执行 afterExecute 处理异常情况
executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(),
super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
// 使用 BridgeThreadPoolExecutor 执行 afterExecute 处理异常情况
executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(),
super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
super.execute(taskDecorator.decorate(command));
}
};
} else {
// 使用 BridgeThreadPoolExecutor 执行 afterExecute 处理异常情况
executor = new BridgeThreadPoolExecutor(super.getCorePoolSize(), super.getMaxPoolSize(),
super.getKeepAliveSeconds(), TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
Field field = ReflectionUtils.findField(ThreadPoolTaskExecutor.class, "threadPoolExecutor",
ThreadPoolExecutor.class);
field.setAccessible(true);
ReflectionUtils.setField(field, this, executor);
return executor;
}
Field field = ReflectionUtils.findField(ThreadPoolTaskExecutor.class, "threadPoolExecutor",
ThreadPoolExecutor.class);
field.setAccessible(true);
ReflectionUtils.setField(field, this, executor);
return executor;
}
public void setQueueCapacity(int queueCapacity) {
super.setQueueCapacity(queueCapacity);
this.queueCapacity = queueCapacity;
public void setQueueCapacity(int queueCapacity) {
super.setQueueCapacity(queueCapacity);
this.queueCapacity = queueCapacity;
}
}
public void setTaskDecorator(TaskDecorator taskDecorator) {
super.setTaskDecorator(taskDecorator);
this.taskDecorator = taskDecorator;
}
public void setTaskDecorator(TaskDecorator taskDecorator) {
super.setTaskDecorator(taskDecorator);
this.taskDecorator = taskDecorator;
}
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
super.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
super.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
}

View File

@ -19,6 +19,18 @@
<fileNamePattern>${log_path}bridge.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
</appender>
<appender name="kafka" class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
<pattern>%d{MM-dd HH:mm:ss} [%X{sec_key}] [%thread] %-5level -- %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log_path}kafka.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
</appender>
<!-- 屏蔽框架输出 -->
<logger name="org.slf4j" level="ERROR"/>
@ -26,7 +38,8 @@
<logger name="io.swagger" level="ERROR"/>
<logger name="ch.qos.logback" level="OFF"/>
<logger name="springfox.documentation" level="ERROR"/>
<looger name="com.spotify.docker.client" level="ERROR"/>
<logger name="com.spotify.docker.client" level="ERROR"/>
<logger name="org.apache.kafka" level="ERROR" />
<root>
<level value="DEBUG"/>

View File

@ -19,6 +19,7 @@
<logger name="ch.qos.logback" level="OFF"/>
<logger name="springfox.documentation" level="ERROR"/>
<looger name="com.spotify.docker.client" level="ERROR"/>
<logger name="org.apache.kafka" level="ERROR" />
<root>
<level value="DEBUG"/>