kafka
This commit is contained in:
parent
e41975c718
commit
59a010e86e
11
pom.xml
11
pom.xml
|
@ -277,6 +277,17 @@
|
|||
<artifactId>java-jwt</artifactId>
|
||||
<version>3.8.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>2.2.7.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.2.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
package com.educoder.bridge.game.controller;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.educoder.bridge.common.settings.AppConfig;
|
||||
import com.educoder.bridge.common.utils.Base64Util;
|
||||
import com.educoder.bridge.common.utils.GameHelper;
|
||||
import com.educoder.bridge.game.model.Evaluating;
|
||||
import com.educoder.bridge.game.service.GameService;
|
||||
import com.educoder.bridge.game.service.K8sService;
|
||||
import com.educoder.bridge.k8s.service.BridgePodService;
|
||||
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import io.swagger.annotations.ApiParam;
|
||||
|
||||
/**
|
||||
* 和实训相关的接口
|
||||
*
|
||||
* @author weishao
|
||||
*/
|
||||
@Api(value = "game控制器", hidden = true)
|
||||
@RestController
|
||||
@RequestMapping("/game2")
|
||||
public class GameController2 {
|
||||
private static final Logger logger = LoggerFactory.getLogger(GameController2.class);
|
||||
@Autowired
|
||||
private AppConfig appConfig;
|
||||
@Autowired
|
||||
private GameService gameService;
|
||||
@Autowired
|
||||
private K8sService k8sService;
|
||||
|
||||
@Autowired
|
||||
private BridgePodService bridgePodService;
|
||||
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||
|
||||
@Qualifier("kafkaT")
|
||||
@Autowired
|
||||
private KafkaTemplate<Integer, String> kafkaTemplate;
|
||||
@Qualifier("kafkaT2")
|
||||
@Autowired
|
||||
private KafkaTemplate<Integer, Evaluating> kafkaTemplate2;
|
||||
|
||||
/**
|
||||
* 开启实训: 克隆版本库
|
||||
*/
|
||||
@RequestMapping(path = "/openGameInstance")
|
||||
@ApiOperation(value = "开启实训", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
|
||||
public JSONObject openGameInstance(
|
||||
@ApiParam(name = "tpmGitURL", required = true, value = "Tpm的gitUrl,需要base64编码") @RequestParam String tpmGitURL,
|
||||
@ApiParam(name = "tpiID", required = true, value = "实训实例的ID") @RequestParam String tpiID,
|
||||
@ApiParam(name = "tpiRepoName", required = true, value = "tpiRepoName") @RequestParam String tpiRepoName)
|
||||
throws Exception {
|
||||
logger.info("开启实训:tpmGitURL: {}, tpiID: {}, tpiRepoName: {}", tpmGitURL, tpiID, tpiRepoName);
|
||||
|
||||
JSONObject response = new JSONObject();
|
||||
|
||||
// 设定工作路径为${workspace}/myshixun_${tpiID}
|
||||
String tpiWorkSpace = appConfig.getWorkspace() + File.separator + "myshixun_" + tpiID;
|
||||
|
||||
// 对当前TPI,从TPM clone 版本库
|
||||
tpmGitURL = Base64Util.decode(tpmGitURL);
|
||||
gameService.gitClone(tpiWorkSpace, tpmGitURL, "remote_origin", tpiRepoName);
|
||||
|
||||
response.put("code", 0);
|
||||
response.put("msg", "开启成功");
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* 评测: 对每一次评测请求,先判断是否能立即执行还是需要先排队,如果能立即执行,开启一个线程执行以下步骤
|
||||
* 如果需要排队,将相关参数封装成一个线程,存入redis队列,等待执行,并返回标志位给前台,通知前台进行轮询
|
||||
*/
|
||||
@RequestMapping(path = "/gameEvaluate")
|
||||
@ApiOperation(value = "评测", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
|
||||
public JSONObject gameEvaluate()
|
||||
throws Exception {
|
||||
logger.info("eeeeeeeeee");
|
||||
JSONObject response = new JSONObject();
|
||||
|
||||
|
||||
JSONObject buildParams = new JSONObject(true);
|
||||
//buildParams.toString().get
|
||||
|
||||
this.sendToKafka(null);
|
||||
return response;
|
||||
}
|
||||
|
||||
private void sendToKafka(final Object data) {
|
||||
final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic1", "val1");
|
||||
Evaluating eva = new Evaluating();
|
||||
eva.setBuildID("b11");
|
||||
eva.setTpiID("11");
|
||||
final ProducerRecord<Integer, Evaluating> record2 = new ProducerRecord<Integer, Evaluating>("topic1", eva);
|
||||
try {
|
||||
kafkaTemplate.send(record).get(10, TimeUnit.SECONDS);
|
||||
kafkaTemplate2.send(record2).get(10, TimeUnit.SECONDS);
|
||||
// handleSuccess(data);
|
||||
logger.info("成功");
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
// handleFailure(data, record, e.getCause());
|
||||
logger.error("error");
|
||||
}
|
||||
catch (Exception e) {
|
||||
// handleFailure(data, record, e);
|
||||
logger.error("error");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* tpm版本库已更新,同步
|
||||
*/
|
||||
@RequestMapping(path = "/resetTpmRepository", method = RequestMethod.POST)
|
||||
@ApiOperation(value = "tpm版本库已更新,同步操作", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
|
||||
public JSONObject reset(@ApiParam(name = "tpiID", required = true, value = "tpi") @RequestParam String tpiID,
|
||||
@ApiParam(name = "tpiGitURL", required = true, value = "学员对应当前实训的版本库地址,base64编码") @RequestParam String tpiGitURL,
|
||||
@ApiParam(name = "tpmGitURL", required = true, value = "学员对应当前实训的tpm版本库地址,base64编码") @RequestParam String tpmGitURL,
|
||||
@ApiParam(name = "identifier", required = true, value = "push权限") @RequestParam String identifier)
|
||||
throws Exception {
|
||||
logger.info("tpm版本库已更新,同步tpi版本库,tpiID: {}, tpiGitURL: {}, tpmGitURL: {}, identifier: {}", tpiID, tpiGitURL,
|
||||
tpmGitURL, identifier);
|
||||
|
||||
JSONObject response = new JSONObject();
|
||||
|
||||
tpiGitURL = Base64Util.decode(tpiGitURL);
|
||||
tpmGitURL = Base64Util.decode(tpmGitURL);
|
||||
String tpiRepoName = GameHelper.getRepoName(tpiGitURL);
|
||||
String path = appConfig.getWorkspace() + File.separator + "myshixun_" + tpiID;
|
||||
|
||||
// 从tpm远程库拉取代码到myshixun版本库然后强推到tpi远程库
|
||||
gameService.gitPullFromTpm(path, tpmGitURL, tpiRepoName, tpiID);
|
||||
gameService.gitPushToTpi(path, tpiGitURL, identifier, tpiID);
|
||||
|
||||
logger.debug("tpm库更新内容已同步,tpiID:{}", tpiID);
|
||||
response.put("code", 0);
|
||||
response.put("msg", "版本库更新成功");
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* tpi版本库head是否存在,若缺失则修复
|
||||
*
|
||||
* @param tpiID
|
||||
* @param tpiGitURL
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
@RequestMapping(path = "/check")
|
||||
@ApiOperation(value = "进入实训时做校验", httpMethod = "POST", produces = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
|
||||
public JSONObject check(@ApiParam(name = "tpiID", required = true, value = "实训实例的ID") @RequestParam String tpiID,
|
||||
@ApiParam(name = "tpiGitURL", required = true, value = "学员对应当前实训的版本库地址,base64编码") @RequestParam String tpiGitURL)
|
||||
throws Exception {
|
||||
tpiGitURL = Base64Util.decode(tpiGitURL);
|
||||
return gameService.check(tpiID, tpiGitURL);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package com.educoder.bridge.kafka.config;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
||||
import com.educoder.bridge.game.model.Evaluating;
|
||||
|
||||
@Configuration
|
||||
public class KafkaConfig {
|
||||
|
||||
@Bean("f")
|
||||
public ProducerFactory<Integer, String> producerFactory() {
|
||||
return new DefaultKafkaProducerFactory<>(producerConfigs());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Map<String, Object> producerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
|
||||
return props;
|
||||
}
|
||||
|
||||
@Bean("kafkaT")
|
||||
public KafkaTemplate<Integer, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<Integer, String>(producerFactory());
|
||||
}
|
||||
|
||||
@Bean("kafkaT2")
|
||||
public KafkaTemplate<Integer, Evaluating> kafkaTemplate2() {
|
||||
return new KafkaTemplate<Integer, Evaluating>(producerFactory2());
|
||||
}
|
||||
|
||||
@Bean("f2")
|
||||
public ProducerFactory<Integer, Evaluating> producerFactory2() {
|
||||
return new DefaultKafkaProducerFactory<>(producerConfigs());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.educoder.bridge.kafka.serializer;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
|
||||
public class EvaluatingDeserializer implements Deserializer<JSONObject> {
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public JSONObject deserialize(String topic, byte[] data) {
|
||||
try {
|
||||
String str = new String(data, "UTF8");
|
||||
return JSON.parseObject(str);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.educoder.bridge.kafka.serializer;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
|
||||
public class EvaluatingSerializer implements Serializer<JSONObject> {
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String topic, JSONObject data) {
|
||||
try {
|
||||
return data.toJSONString().getBytes("UTF8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue