diff --git a/pom.xml b/pom.xml index ee30dad..9c96ba1 100644 --- a/pom.xml +++ b/pom.xml @@ -277,6 +277,17 @@ java-jwt 3.8.0 + + + org.springframework.kafka + spring-kafka + 2.2.7.RELEASE + + + org.apache.kafka + kafka-clients + 2.2.1 + diff --git a/src/main/java/com/educoder/bridge/game/controller/GameController2.java b/src/main/java/com/educoder/bridge/game/controller/GameController2.java new file mode 100644 index 0000000..2184c60 --- /dev/null +++ b/src/main/java/com/educoder/bridge/game/controller/GameController2.java @@ -0,0 +1,177 @@ +package com.educoder.bridge.game.controller; + +import java.io.File; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.MediaType; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import com.alibaba.fastjson.JSONObject; +import com.educoder.bridge.common.settings.AppConfig; +import com.educoder.bridge.common.utils.Base64Util; +import com.educoder.bridge.common.utils.GameHelper; +import com.educoder.bridge.game.model.Evaluating; +import com.educoder.bridge.game.service.GameService; +import com.educoder.bridge.game.service.K8sService; +import com.educoder.bridge.k8s.service.BridgePodService; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; + +/** + * 和实训相关的接口 + * + * @author weishao + */ +@Api(value = "game控制器", hidden = true) +@RestController +@RequestMapping("/game2") +public class GameController2 { + private static final Logger logger = LoggerFactory.getLogger(GameController2.class); + @Autowired + private AppConfig appConfig; + @Autowired + private GameService gameService; + @Autowired + private K8sService k8sService; + + @Autowired + private BridgePodService bridgePodService; + + @Autowired + private ThreadPoolTaskExecutor threadPoolTaskExecutor; + + @Qualifier("kafkaT") + @Autowired + private KafkaTemplate kafkaTemplate; + @Qualifier("kafkaT2") + @Autowired + private KafkaTemplate kafkaTemplate2; + + /** + * 开启实训: 克隆版本库 + */ + @RequestMapping(path = "/openGameInstance") + @ApiOperation(value = "开启实训", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE) + public JSONObject openGameInstance( + @ApiParam(name = "tpmGitURL", required = true, value = "Tpm的gitUrl,需要base64编码") @RequestParam String tpmGitURL, + @ApiParam(name = "tpiID", required = true, value = "实训实例的ID") @RequestParam String tpiID, + @ApiParam(name = "tpiRepoName", required = true, value = "tpiRepoName") @RequestParam String tpiRepoName) + throws Exception { + logger.info("开启实训:tpmGitURL: {}, tpiID: {}, tpiRepoName: {}", tpmGitURL, tpiID, tpiRepoName); + + JSONObject response = new JSONObject(); + + // 设定工作路径为${workspace}/myshixun_${tpiID} + String tpiWorkSpace = appConfig.getWorkspace() + File.separator + "myshixun_" + tpiID; + + // 对当前TPI,从TPM clone 版本库 + tpmGitURL = Base64Util.decode(tpmGitURL); + gameService.gitClone(tpiWorkSpace, tpmGitURL, "remote_origin", tpiRepoName); + + response.put("code", 0); + response.put("msg", "开启成功"); + return response; + } + + /** + * 评测: 对每一次评测请求,先判断是否能立即执行还是需要先排队,如果能立即执行,开启一个线程执行以下步骤 + * 如果需要排队,将相关参数封装成一个线程,存入redis队列,等待执行,并返回标志位给前台,通知前台进行轮询 + */ + @RequestMapping(path = "/gameEvaluate") + @ApiOperation(value = "评测", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE) + public JSONObject gameEvaluate() + throws Exception { + logger.info("eeeeeeeeee"); + JSONObject response = new JSONObject(); + + + JSONObject buildParams = new JSONObject(true); + //buildParams.toString().get + + this.sendToKafka(null); + return response; + } + + private void sendToKafka(final Object data) { + final ProducerRecord record = new ProducerRecord("topic1", "val1"); + Evaluating eva = new Evaluating(); + eva.setBuildID("b11"); + eva.setTpiID("11"); + final ProducerRecord record2 = new ProducerRecord("topic1", eva); + try { + kafkaTemplate.send(record).get(10, TimeUnit.SECONDS); + kafkaTemplate2.send(record2).get(10, TimeUnit.SECONDS); + // handleSuccess(data); + logger.info("成功"); + } + catch (ExecutionException e) { + // handleFailure(data, record, e.getCause()); + logger.error("error"); + } + catch (Exception e) { + // handleFailure(data, record, e); + logger.error("error"); + } + } + + /** + * tpm版本库已更新,同步 + */ + @RequestMapping(path = "/resetTpmRepository", method = RequestMethod.POST) + @ApiOperation(value = "tpm版本库已更新,同步操作", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE) + public JSONObject reset(@ApiParam(name = "tpiID", required = true, value = "tpi") @RequestParam String tpiID, + @ApiParam(name = "tpiGitURL", required = true, value = "学员对应当前实训的版本库地址,base64编码") @RequestParam String tpiGitURL, + @ApiParam(name = "tpmGitURL", required = true, value = "学员对应当前实训的tpm版本库地址,base64编码") @RequestParam String tpmGitURL, + @ApiParam(name = "identifier", required = true, value = "push权限") @RequestParam String identifier) + throws Exception { + logger.info("tpm版本库已更新,同步tpi版本库,tpiID: {}, tpiGitURL: {}, tpmGitURL: {}, identifier: {}", tpiID, tpiGitURL, + tpmGitURL, identifier); + + JSONObject response = new JSONObject(); + + tpiGitURL = Base64Util.decode(tpiGitURL); + tpmGitURL = Base64Util.decode(tpmGitURL); + String tpiRepoName = GameHelper.getRepoName(tpiGitURL); + String path = appConfig.getWorkspace() + File.separator + "myshixun_" + tpiID; + + // 从tpm远程库拉取代码到myshixun版本库然后强推到tpi远程库 + gameService.gitPullFromTpm(path, tpmGitURL, tpiRepoName, tpiID); + gameService.gitPushToTpi(path, tpiGitURL, identifier, tpiID); + + logger.debug("tpm库更新内容已同步,tpiID:{}", tpiID); + response.put("code", 0); + response.put("msg", "版本库更新成功"); + + return response; + } + + /** + * tpi版本库head是否存在,若缺失则修复 + * + * @param tpiID + * @param tpiGitURL + * @return + * @throws Exception + */ + @RequestMapping(path = "/check") + @ApiOperation(value = "进入实训时做校验", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE) + public JSONObject check(@ApiParam(name = "tpiID", required = true, value = "实训实例的ID") @RequestParam String tpiID, + @ApiParam(name = "tpiGitURL", required = true, value = "学员对应当前实训的版本库地址,base64编码") @RequestParam String tpiGitURL) + throws Exception { + tpiGitURL = Base64Util.decode(tpiGitURL); + return gameService.check(tpiID, tpiGitURL); + } +} diff --git a/src/main/java/com/educoder/bridge/kafka/config/KafkaConfig.java b/src/main/java/com/educoder/bridge/kafka/config/KafkaConfig.java new file mode 100644 index 0000000..e953ace --- /dev/null +++ b/src/main/java/com/educoder/bridge/kafka/config/KafkaConfig.java @@ -0,0 +1,48 @@ +package com.educoder.bridge.kafka.config; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import com.educoder.bridge.game.model.Evaluating; + +@Configuration +public class KafkaConfig { + + @Bean("f") + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + // See https://kafka.apache.org/documentation/#producerconfigs for more properties + return props; + } + + @Bean("kafkaT") + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); + } + + @Bean("kafkaT2") + public KafkaTemplate kafkaTemplate2() { + return new KafkaTemplate(producerFactory2()); + } + + @Bean("f2") + public ProducerFactory producerFactory2() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + +} \ No newline at end of file diff --git a/src/main/java/com/educoder/bridge/kafka/serializer/EvaluatingDeserializer.java b/src/main/java/com/educoder/bridge/kafka/serializer/EvaluatingDeserializer.java new file mode 100644 index 0000000..2211f6d --- /dev/null +++ b/src/main/java/com/educoder/bridge/kafka/serializer/EvaluatingDeserializer.java @@ -0,0 +1,33 @@ +package com.educoder.bridge.kafka.serializer; + +import java.io.UnsupportedEncodingException; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; + +public class EvaluatingDeserializer implements Deserializer { + + @Override + public void configure(Map configs, boolean isKey) { + + } + + @Override + public JSONObject deserialize(String topic, byte[] data) { + try { + String str = new String(data, "UTF8"); + return JSON.parseObject(str); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + + } + +} diff --git a/src/main/java/com/educoder/bridge/kafka/serializer/EvaluatingSerializer.java b/src/main/java/com/educoder/bridge/kafka/serializer/EvaluatingSerializer.java new file mode 100644 index 0000000..6ca82f9 --- /dev/null +++ b/src/main/java/com/educoder/bridge/kafka/serializer/EvaluatingSerializer.java @@ -0,0 +1,31 @@ +package com.educoder.bridge.kafka.serializer; + +import java.io.UnsupportedEncodingException; +import java.util.Map; + +import org.apache.kafka.common.serialization.Serializer; + +import com.alibaba.fastjson.JSONObject; + +public class EvaluatingSerializer implements Serializer { + + @Override + public void configure(Map configs, boolean isKey) { + + } + + @Override + public byte[] serialize(String topic, JSONObject data) { + try { + return data.toJSONString().getBytes("UTF8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + + } + +}