Compare commits

...

12 Commits

Author SHA1 Message Date
曾伟 06a65062f8 修改提交配置和消费线程数 2021-09-13 16:00:43 +08:00
曾伟 b7c4a84f6f merge 2021-09-13 15:54:18 +08:00
baladiwei ac14038d30 Merge pull request '消费者改成 kafka' (#57) from baladiwei/gitlink-notification-system:master into master 2021-09-11 20:23:56 +08:00
巴拉迪维 b0ce76fa3d 消费者改成 kafka 事件模式
1、去掉了 xxl-job 相关的代码
2、executor 部分直接使用 kafka 事件监听模式实现
3、修改了相关的所有 application.yml 和 docker 文件
2021-09-11 20:22:44 +08:00
baladiwei 42d046c6f0 Merge pull request 'docker file alter' (#56) from baladiwei/gitlink-notification-system:master into master 2021-09-10 18:10:08 +08:00
巴拉迪维 8026280580 docker file alter 2021-09-10 18:09:21 +08:00
baladiwei fbd598a372 Merge pull request '发消息时source和extra字段处理' (#55) from wanjia9506/gitlink-notification-system:dev_gitlink_model into master 2021-09-10 17:16:11 +08:00
万佳 1dd8a8993a 发消息时source和extra字段处理 2021-09-10 14:12:05 +08:00
巴拉迪维 75cc4ac2c9 调整 writer kafka 相关的配置 2021-09-10 14:02:42 +08:00
baladiwei 32b640a719 Merge pull request '调整 writer kafka 相关的配置' (#54) from baladiwei/gitlink-notification-system:master into master 2021-09-10 13:20:37 +08:00
巴拉迪维 f714f4d58e 调整 writer kafka 相关的配置 2021-09-10 13:19:46 +08:00
baladiwei 5dad2d9701 Merge pull request 'writer和executor配置修改' (#53) from DavidZeng/gitlink-notification-system:dev_add_middleware_conf into master 2021-09-10 11:46:10 +08:00
15 changed files with 251 additions and 242 deletions

View File

@ -0,0 +1,48 @@
package cn.org.gitlink.notification.common.utils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
private String bootstrapServers;
@Value("${spring.kafka.producer.client_id:#{null}}")
private String clientId;
@Value("${spring.kafka.producer.retries:#{null}}")
private Integer retries;
@Value("${spring.kafka.producer.batch_size:#{null}}")
private Integer batchSize;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerConfigs(),true);
}
@Bean
ProducerFactory<String, String> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}

View File

@ -1,11 +1,17 @@
package cn.org.gitlink.notification.common.utils;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.PostConstruct;
import java.util.Collection;
@ -26,15 +32,16 @@ import java.util.stream.Collectors;
@Component
public class KafkaUtil {
@Value("${spring.kafka.bootstrap-servers:#{null}}")
private String springKafkaBootstrapServers;
private Logger logger = LogManager.getLogger(KafkaUtil.class);
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
private String kafkaServer;
private AdminClient adminClient;
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 初始化AdminClient
* '@PostConstruct该注解被用来修饰一个非静态的void方法
@ -44,29 +51,26 @@ public class KafkaUtil {
@PostConstruct
private void initAdminClient() {
Map<String, Object> props = new HashMap<>(1);
if (springKafkaBootstrapServers == null) {
if (kafkaServer == null) {
return;
}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
adminClient = KafkaAdminClient.create(props);
}
/**
* 新增topic支持批量
*/
public void createTopic(Collection<NewTopic> newTopics) {
adminClient.createTopics(newTopics);
}
/**
* 删除topic支持批量
*/
public void deleteTopic(Collection<String> topics) {
adminClient.deleteTopics(topics);
}
/**
* 获取指定topic的信息
* 获取 Topic 的信息
*
* @param topics
* @return
*/
public String getTopicInfo(Collection<String> topics) {
AtomicReference<String> info = new AtomicReference<String>("");
@ -77,28 +81,55 @@ public class KafkaUtil {
}
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
logger.error(e);
}
return info.get();
}
/**
* 获取全部topic
* 获取所有 Topic
*
* @return
*/
public List<String> getAllTopic() {
try {
return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
logger.error(e);
}
return Lists.newArrayList();
}
/**
* 往topic中发送消息
* Kafka 发送消息
*
* @param topic
* @param message
*/
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error(throwable);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
logger.debug(stringStringSendResult.getRecordMetadata());
}
});
}
/**
* kafka 发送信息
*
* @param topic
* @param message
* @param callback
*/
public void sendMessage(String topic, String message, ListenableFutureCallback callback) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(callback);
}
}

View File

@ -21,13 +21,6 @@
<version>${springboot.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.xuxueli/xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>cn.org.gitlink.notification</groupId>
<artifactId>gns-common</artifactId>

View File

@ -1,10 +1,14 @@
package cn.org.gitlink.notification.executor.core.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@ -13,46 +17,43 @@ import java.util.Map;
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
@Value("${spring.kafka.consumer.bootstrap_servers:#{null}}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout:10000}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
@Value("${spring.kafka.consumer.group_id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
@Value("${spring.kafka.consumer.client_id}")
private String clientId;
@Value("${spring.kafka.consumer.auto_offset_reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.max.poll.records}")
@Value("${spring.kafka.consumer.max_poll_records}")
private String maxPollRecords;
@Bean("kafkaConsumer")
public KafkaConsumer<String, String> consumer() {
return new KafkaConsumer<>(consumerConfigs());
@Value("${spring.kafka.consumer.topic}")
private String topic;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> consumerListenerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerConfigs());
factory.setRecordFilterStrategy(record -> record.topic().toLowerCase().equals(this.topic));
return factory;
}
public Map<String, Object> consumerConfigs() {
@Bean
public ConsumerFactory<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", servers);
//每个消费者分配独立的组号
props.put("group.id", groupId);
//如果value合法则自动提交偏移量
props.put("enable.auto.commit", enableAutoCommit);
// 每次拉取10条
props.put("max.poll.records", maxPollRecords);
//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", autoCommitInterval);
//设置会话响应的时间超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", sessionTimeout);
//自动重置offset
props.put("auto.offset.reset", autoOffsetReset);
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return props;
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}

View File

@ -1,54 +0,0 @@
package cn.org.gitlink.notification.executor.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class XXLJobConfig {
private Logger logger = LoggerFactory.getLogger(XXLJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}

View File

@ -1,76 +0,0 @@
package cn.org.gitlink.notification.executor.service.jobhandler;
import cn.org.gitlink.notification.common.utils.SpringContextUtil;
import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo;
import cn.org.gitlink.notification.model.service.notification.SysNotificationService;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
/**
* 系统消息消费执行器
* @author zengwei
* @Date 2021-09-09
*/
@Component
public class NotificationJob {
private static Logger logger = LoggerFactory.getLogger(NotificationJob.class);
@Value("${topic.notification.name: #{null}}")
private String topic;
@Autowired
private SysNotificationService sysNotificationService;
/**
* 系统消息处理入口在xxl-job-admin添加任务时JobHandler栏填注解内名字
* @author zengwei
* @Date 2021-09-09
* */
@XxlJob("notificationMessageHandler")
public void notificationMessageHandler() {
if (null == topic){
XxlJobHelper.handleFail("未配置topic!");
return;
}
XxlJobHelper.log("topic: {}", topic);
KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) SpringContextUtil.getBean("kafkaConsumer");
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
XxlJobHelper.log("拉取到{}条记录!", records.count());
for (ConsumerRecord<String, String> record: records) {
NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(record.value(), NewSysNotificationVo.class);
try {
sysNotificationService.sendNotification(newSysNotificationVo);
XxlJobHelper.log("{} 消费成功!", record.value());
} catch (Exception e) {
XxlJobHelper.log("{} 消费失败, 原因:{}", record.value(), e.getMessage());
}
}
consumer.commitAsync();
}
public void init(){
logger.info("init notification job");
}
public void destroy(){
logger.info("notification job has been destroyed");
}
}

View File

@ -0,0 +1,38 @@
package cn.org.gitlink.notification.executor.service.jobhandler;
import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo;
import cn.org.gitlink.notification.model.service.notification.SysNotificationService;
import com.alibaba.fastjson.JSONObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Configuration
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = "${spring.kafka.consumer.group_id}")
public class NotificationListener {
private Logger logger = LogManager.getLogger(NotificationListener.class);
@Autowired
private SysNotificationService sysNotificationService;
@KafkaHandler
public void messageHandler(String message) {
try {
NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(message, NewSysNotificationVo.class);
sysNotificationService.sendNotification(newSysNotificationVo);
} catch (Exception e) {
logger.error(e);
}
}
@KafkaHandler(isDefault = true)
public void defaultHandler(Object object) {
logger.error("Unknow object received. ->" + object);
}//end of method
}

View File

@ -16,8 +16,8 @@ xxl.job.executor.logpath=./logs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
kafka.consumer.servers=localhost:29092
kafka.consumer.group.id=0
kafka.consumer.servers=localhost:29092,localhost:39092
kafka.consumer.group.id=gitlink-notification-group
kafka.consumer.auto.offset.reset=earliest
kafka.consumer.enable.auto.commit=false
kafka.consumer.auto.commit.interval=100

View File

@ -0,0 +1,37 @@
server:
port: 8083
logging:
config: classpath:logback.xml
spring:
kafka:
producer:
bootstrap_servers: localhost:29092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
consumer:
bootstrap_servers: localhost:29092
client_id: gitlink_consumer
group_id: group-gitlink-notification
auto_offset_reset: earliest
max_poll_records: 100
topic: topic-gitlink-notification
enable-auto-commit: true
#自动提交的时间间隔
auto-commit-interval: 1S
listener:
# 消费线程数
concurrency: 3
# 提交模式
ack-mode: record
datasource:
# 配置数据源类型
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:33306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false
username: gitlink
password: giTlinK0^827
#ip白名单列表多个ip用逗号隔开允许所有用*号
white-list: '*'

View File

@ -12,7 +12,7 @@ XXL_JOB_ADMIN_VERSION=2.3.0
MYSQL_CONTAINER_NAME=gitlink_mysql
REDIS_CONTAINER_NAME=gitlink_redis
XXL_JOB_ADMIN_CONTAINER_NAME=gitlink_xxl_job_admin
ZOOKEEPER_CONTAINER_NAME=gitlink_zookeepr
ZOOKEEPER_CONTAINER_NAME=gitlink_zookeeper
KAFKA_CONTAINER_01_NAME=gitlink_kafka_01
KAFKA_CONTAINER_02_NAME=gitlink_kafka_02

View File

@ -33,23 +33,6 @@ services:
networks:
- gitlink_network
xxl-job-admin:
image: xuxueli/xxl-job-admin:${XXL_JOB_ADMIN_VERSION}
container_name: ${XXL_JOB_ADMIN_CONTAINER_NAME}
hostname: xxl_job_admin
environment:
- JAVA_OPTS=-Xmx512m
- PARAMS=--spring.datasource.url=jdbc:mysql://${MYSQL_CONTAINER_NAME}:3306/gitlink_notification?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username=${MYSQL_USER} --spring.datasource.password=${MYSQL_PASSWORD}
volumes:
- ${DOCKER_DATA_PATH}/xxl-job-admin:/data/applogs
ports:
- ${XXL_JOB_ADMIN_LOCAL_PORT}:8080
depends_on:
- mysql
networks:
- gitlink_network
# See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:latest
@ -58,6 +41,8 @@ services:
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# volumes:
# - ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
ports:
- ${ZOOKEEPER_LOCAL_PORT}:2181
networks:
@ -72,10 +57,12 @@ services:
- zookeeper
ports:
- ${KAFKA_01_LOCAL_PORT}:29092
volumes:
- ${DOCKER_DATA_PATH}/${KAFKA_CONTAINER_01_NAME}:/var/lib/kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:${KAFKA_01_LOCAL_PORT}
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
@ -91,10 +78,12 @@ services:
- zookeeper
ports:
- ${KAFKA_02_LOCAL_PORT}:39092
# volumes:
# - ${DOCKER_DATA_PATH}/${KAFKA_CONTAINER_02_NAME}:/var/lib/kafka
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:${KAFKA_02_LOCAL_PORT}
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

View File

@ -34,6 +34,8 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
sysNotification.setContent(newSysNotificationVo.getContent());
sysNotification.setNotificationUrl(newSysNotificationVo.getNotification_url());
sysNotification.setType(newSysNotificationVo.getType());
sysNotification.setSource(newSysNotificationVo.getSource());
sysNotification.setExtra(newSysNotificationVo.getExtra());
List<String> list = Arrays.asList(newSysNotificationVo.getReceivers().split(","));
for (String receiver : list) {
sysNotification.setReceiver(Integer.parseInt(receiver));

View File

@ -27,6 +27,13 @@ spring:
# 连接超时时间(毫秒)
timeout: 1000
kafka:
producer:
bootstrap_servers: localhost:29092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
datasource:
# 配置数据源类型
driver-class-name: com.mysql.jdbc.Driver

View File

@ -13,23 +13,34 @@ import cn.org.gitlink.notification.model.dao.entity.vo.UpdateNotificationStatusV
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.BindingResult;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.Map;
@RestController
@RequestMapping(value = "/gns/notification")
@Configuration
public class NotificationController {
//kafka topic
private static final String GITLINK_NOTIFICATION_TOPIC = "gitlink_notification"; //在kafka中gitlink的topic
@Value("${spring.kafka.producer.topic}")
private String gitlinkNotificationTopic;
@Value("${spring.kafka.producer.partitions}")
private Integer partitions;
@Value("${spring.kafka.producer.replication_factor}")
private Short replicationFactor;
@Autowired
private KafkaUtil kafkaUtil;
@ -68,7 +79,8 @@ public class NotificationController {
newSysNotificationVo.setPlatform(platform);
try {
kafkaUtil.sendMessage(GITLINK_NOTIFICATION_TOPIC, JSONObject.toJSONString(newSysNotificationVo));
kafkaUtil.createTopic(Arrays.asList(new NewTopic(gitlinkNotificationTopic, partitions, replicationFactor)));
kafkaUtil.sendMessage(gitlinkNotificationTopic, JSONObject.toJSONString(newSysNotificationVo));
return DataPacketUtil.jsonSuccessResult();
} catch (Exception e) {
logger.error(e);

View File

@ -5,37 +5,18 @@ logging:
config: classpath:logback.xml
spring:
#kafka配置
kafka:
#这里改为你的kafka服务器ip和端口号
bootstrap-servers: localhost:29092
#=============== producer =======================
producer:
#如果该值大于零时,表示启用重试失败的发送次数
retries: 0
#每当多个记录被发送到同一分区时生产者将尝试将记录一起批量处理为更少的请求默认值为16384(单位字节)
batch-size: 16384
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数默认值为3355443
buffer-memory: 33554432
#key的Serializer类实现类实现了接口org.apache.kafka.common.serialization.Serializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#value的Serializer类实现类实现了接口org.apache.kafka.common.serialization.Serializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
consumer:
#用于标识此使用者所属的使用者组的唯一字符串
group-id: test-consumer-group
#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办默认值为latest表示自动将偏移重置为最新的偏移量
#可选的值为latest, earliest, none
auto-offset-reset: earliest
#消费者的偏移量将在后台定期提交默认值为true
enable-auto-commit: true
#如果'enable-auto-commit'为true则消费者偏移自动提交给Kafka的频率以毫秒为单位默认值为5000。
auto-commit-interval: 100
#密钥的反序列化器类实现类实现了接口org.apache.kafka.common.serialization.Deserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#值的反序列化器类实现类实现了接口org.apache.kafka.common.serialization.Deserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
bootstrap_servers: localhost:29092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
# 副本数
replication_factor: 1
# 分区数
partitions: 3
topic: topic-gitlink-notification
datasource:
# 配置数据源类型