Compare commits
12 Commits
82a7de637d
...
06a65062f8
Author | SHA1 | Date |
---|---|---|
曾伟 | 06a65062f8 | |
曾伟 | b7c4a84f6f | |
baladiwei | ac14038d30 | |
巴拉迪维 | b0ce76fa3d | |
baladiwei | 42d046c6f0 | |
巴拉迪维 | 8026280580 | |
baladiwei | fbd598a372 | |
万佳 | 1dd8a8993a | |
巴拉迪维 | 75cc4ac2c9 | |
baladiwei | 32b640a719 | |
巴拉迪维 | f714f4d58e | |
baladiwei | 5dad2d9701 |
|
@ -0,0 +1,48 @@
|
|||
package cn.org.gitlink.notification.common.utils;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@Configuration
|
||||
public class KafkaProducerConfig {
|
||||
|
||||
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Value("${spring.kafka.producer.client_id:#{null}}")
|
||||
private String clientId;
|
||||
|
||||
@Value("${spring.kafka.producer.retries:#{null}}")
|
||||
private Integer retries;
|
||||
|
||||
@Value("${spring.kafka.producer.batch_size:#{null}}")
|
||||
private Integer batchSize;
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerConfigs(),true);
|
||||
}
|
||||
|
||||
@Bean
|
||||
ProducerFactory<String, String> producerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
props.put(ProducerConfig.RETRIES_CONFIG, retries);
|
||||
props.put(ProducerConfig.ACKS_CONFIG, "all");
|
||||
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(props);
|
||||
}
|
||||
}
|
|
@ -1,11 +1,17 @@
|
|||
package cn.org.gitlink.notification.common.utils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.kafka.clients.admin.*;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.Collection;
|
||||
|
@ -26,15 +32,16 @@ import java.util.stream.Collectors;
|
|||
@Component
|
||||
public class KafkaUtil {
|
||||
|
||||
@Value("${spring.kafka.bootstrap-servers:#{null}}")
|
||||
private String springKafkaBootstrapServers;
|
||||
private Logger logger = LogManager.getLogger(KafkaUtil.class);
|
||||
|
||||
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
|
||||
private String kafkaServer;
|
||||
|
||||
private AdminClient adminClient;
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate kafkaTemplate;
|
||||
|
||||
|
||||
/**
|
||||
* 初始化AdminClient
|
||||
* '@PostConstruct该注解被用来修饰一个非静态的void()方法。
|
||||
|
@ -44,29 +51,26 @@ public class KafkaUtil {
|
|||
@PostConstruct
|
||||
private void initAdminClient() {
|
||||
Map<String, Object> props = new HashMap<>(1);
|
||||
if (springKafkaBootstrapServers == null) {
|
||||
if (kafkaServer == null) {
|
||||
return;
|
||||
}
|
||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
|
||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
|
||||
adminClient = KafkaAdminClient.create(props);
|
||||
}
|
||||
|
||||
/**
|
||||
* 新增topic,支持批量
|
||||
*/
|
||||
public void createTopic(Collection<NewTopic> newTopics) {
|
||||
adminClient.createTopics(newTopics);
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除topic,支持批量
|
||||
*/
|
||||
public void deleteTopic(Collection<String> topics) {
|
||||
adminClient.deleteTopics(topics);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定topic的信息
|
||||
* 获取 Topic 的信息
|
||||
*
|
||||
* @param topics
|
||||
* @return
|
||||
*/
|
||||
public String getTopicInfo(Collection<String> topics) {
|
||||
AtomicReference<String> info = new AtomicReference<String>("");
|
||||
|
@ -77,28 +81,55 @@ public class KafkaUtil {
|
|||
}
|
||||
});
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
logger.error(e);
|
||||
}
|
||||
return info.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取全部topic
|
||||
* 获取所有 Topic
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public List<String> getAllTopic() {
|
||||
try {
|
||||
return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
logger.error(e);
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 往topic中发送消息
|
||||
* 往 Kafka 发送消息
|
||||
*
|
||||
* @param topic
|
||||
* @param message
|
||||
*/
|
||||
public void sendMessage(String topic, String message) {
|
||||
kafkaTemplate.send(topic, message);
|
||||
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
|
||||
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
logger.error(throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(SendResult<String, String> stringStringSendResult) {
|
||||
logger.debug(stringStringSendResult.getRecordMetadata());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 往 kafka 发送信息
|
||||
*
|
||||
* @param topic
|
||||
* @param message
|
||||
* @param callback
|
||||
*/
|
||||
public void sendMessage(String topic, String message, ListenableFutureCallback callback) {
|
||||
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
|
||||
future.addCallback(callback);
|
||||
}
|
||||
}
|
|
@ -21,13 +21,6 @@
|
|||
<version>${springboot.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/com.xuxueli/xxl-job-core -->
|
||||
<dependency>
|
||||
<groupId>com.xuxueli</groupId>
|
||||
<artifactId>xxl-job-core</artifactId>
|
||||
<version>2.3.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.org.gitlink.notification</groupId>
|
||||
<artifactId>gns-common</artifactId>
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
package cn.org.gitlink.notification.executor.core.config;
|
||||
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -13,46 +17,43 @@ import java.util.Map;
|
|||
@EnableKafka
|
||||
public class KafkaConsumerConfig {
|
||||
|
||||
@Value("${kafka.consumer.servers}")
|
||||
@Value("${spring.kafka.consumer.bootstrap_servers:#{null}}")
|
||||
private String servers;
|
||||
@Value("${kafka.consumer.enable.auto.commit}")
|
||||
private boolean enableAutoCommit;
|
||||
@Value("${kafka.consumer.session.timeout:10000}")
|
||||
private String sessionTimeout;
|
||||
@Value("${kafka.consumer.auto.commit.interval}")
|
||||
private String autoCommitInterval;
|
||||
@Value("${kafka.consumer.group.id}")
|
||||
|
||||
@Value("${spring.kafka.consumer.group_id}")
|
||||
private String groupId;
|
||||
@Value("${kafka.consumer.auto.offset.reset}")
|
||||
|
||||
@Value("${spring.kafka.consumer.client_id}")
|
||||
private String clientId;
|
||||
|
||||
@Value("${spring.kafka.consumer.auto_offset_reset}")
|
||||
private String autoOffsetReset;
|
||||
@Value("${kafka.consumer.max.poll.records}")
|
||||
|
||||
@Value("${spring.kafka.consumer.max_poll_records}")
|
||||
private String maxPollRecords;
|
||||
|
||||
@Bean("kafkaConsumer")
|
||||
public KafkaConsumer<String, String> consumer() {
|
||||
return new KafkaConsumer<>(consumerConfigs());
|
||||
@Value("${spring.kafka.consumer.topic}")
|
||||
private String topic;
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, String> consumerListenerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
|
||||
factory.setConsumerFactory(consumerConfigs());
|
||||
factory.setRecordFilterStrategy(record -> record.topic().toLowerCase().equals(this.topic));
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
||||
public Map<String, Object> consumerConfigs() {
|
||||
@Bean
|
||||
public ConsumerFactory<String, Object> consumerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("bootstrap.servers", servers);
|
||||
//每个消费者分配独立的组号
|
||||
props.put("group.id", groupId);
|
||||
//如果value合法,则自动提交偏移量
|
||||
props.put("enable.auto.commit", enableAutoCommit);
|
||||
// 每次拉取10条
|
||||
props.put("max.poll.records", maxPollRecords);
|
||||
//设置多久一次更新被消费消息的偏移量
|
||||
props.put("auto.commit.interval.ms", autoCommitInterval);
|
||||
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
|
||||
props.put("session.timeout.ms", sessionTimeout);
|
||||
//自动重置offset
|
||||
props.put("auto.offset.reset", autoOffsetReset);
|
||||
props.put("key.deserializer",
|
||||
"org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put("value.deserializer",
|
||||
"org.apache.kafka.common.serialization.StringDeserializer");
|
||||
return props;
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
|
||||
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
package cn.org.gitlink.notification.executor.core.config;
|
||||
|
||||
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class XXLJobConfig {
|
||||
private Logger logger = LoggerFactory.getLogger(XXLJobConfig.class);
|
||||
|
||||
@Value("${xxl.job.admin.addresses}")
|
||||
private String adminAddresses;
|
||||
|
||||
@Value("${xxl.job.accessToken}")
|
||||
private String accessToken;
|
||||
|
||||
@Value("${xxl.job.executor.appname}")
|
||||
private String appname;
|
||||
|
||||
@Value("${xxl.job.executor.address}")
|
||||
private String address;
|
||||
|
||||
@Value("${xxl.job.executor.ip}")
|
||||
private String ip;
|
||||
|
||||
@Value("${xxl.job.executor.port}")
|
||||
private int port;
|
||||
|
||||
@Value("${xxl.job.executor.logpath}")
|
||||
private String logPath;
|
||||
|
||||
@Value("${xxl.job.executor.logretentiondays}")
|
||||
private int logRetentionDays;
|
||||
|
||||
|
||||
@Bean
|
||||
public XxlJobSpringExecutor xxlJobExecutor() {
|
||||
logger.info(">>>>>>>>>>> xxl-job config init.");
|
||||
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
|
||||
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
|
||||
xxlJobSpringExecutor.setAppname(appname);
|
||||
xxlJobSpringExecutor.setAddress(address);
|
||||
xxlJobSpringExecutor.setIp(ip);
|
||||
xxlJobSpringExecutor.setPort(port);
|
||||
xxlJobSpringExecutor.setAccessToken(accessToken);
|
||||
xxlJobSpringExecutor.setLogPath(logPath);
|
||||
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
|
||||
|
||||
return xxlJobSpringExecutor;
|
||||
}
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
package cn.org.gitlink.notification.executor.service.jobhandler;
|
||||
|
||||
import cn.org.gitlink.notification.common.utils.SpringContextUtil;
|
||||
import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo;
|
||||
import cn.org.gitlink.notification.model.service.notification.SysNotificationService;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.xxl.job.core.context.XxlJobHelper;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* 系统消息消费执行器
|
||||
* @author zengwei
|
||||
* @Date 2021-09-09
|
||||
*/
|
||||
@Component
|
||||
public class NotificationJob {
|
||||
private static Logger logger = LoggerFactory.getLogger(NotificationJob.class);
|
||||
|
||||
@Value("${topic.notification.name: #{null}}")
|
||||
private String topic;
|
||||
|
||||
@Autowired
|
||||
private SysNotificationService sysNotificationService;
|
||||
|
||||
/**
|
||||
* 系统消息处理入口,在xxl-job-admin添加任务时,JobHandler栏填注解内名字
|
||||
* @author zengwei
|
||||
* @Date 2021-09-09
|
||||
* */
|
||||
@XxlJob("notificationMessageHandler")
|
||||
public void notificationMessageHandler() {
|
||||
|
||||
if (null == topic){
|
||||
XxlJobHelper.handleFail("未配置topic!");
|
||||
return;
|
||||
}
|
||||
|
||||
XxlJobHelper.log("topic: {}", topic);
|
||||
|
||||
KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) SpringContextUtil.getBean("kafkaConsumer");
|
||||
consumer.subscribe(Collections.singletonList(topic));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||
XxlJobHelper.log("拉取到{}条记录!", records.count());
|
||||
|
||||
for (ConsumerRecord<String, String> record: records) {
|
||||
NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(record.value(), NewSysNotificationVo.class);
|
||||
try {
|
||||
sysNotificationService.sendNotification(newSysNotificationVo);
|
||||
XxlJobHelper.log("{} 消费成功!", record.value());
|
||||
} catch (Exception e) {
|
||||
XxlJobHelper.log("{} 消费失败, 原因:{}", record.value(), e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
consumer.commitAsync();
|
||||
|
||||
}
|
||||
|
||||
public void init(){
|
||||
logger.info("init notification job");
|
||||
}
|
||||
public void destroy(){
|
||||
logger.info("notification job has been destroyed");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package cn.org.gitlink.notification.executor.service.jobhandler;
|
||||
|
||||
import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo;
|
||||
import cn.org.gitlink.notification.model.service.notification.SysNotificationService;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.KafkaHandler;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Configuration
|
||||
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = "${spring.kafka.consumer.group_id}")
|
||||
public class NotificationListener {
|
||||
|
||||
private Logger logger = LogManager.getLogger(NotificationListener.class);
|
||||
|
||||
@Autowired
|
||||
private SysNotificationService sysNotificationService;
|
||||
|
||||
@KafkaHandler
|
||||
public void messageHandler(String message) {
|
||||
try {
|
||||
NewSysNotificationVo newSysNotificationVo = JSONObject.parseObject(message, NewSysNotificationVo.class);
|
||||
sysNotificationService.sendNotification(newSysNotificationVo);
|
||||
} catch (Exception e) {
|
||||
logger.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaHandler(isDefault = true)
|
||||
public void defaultHandler(Object object) {
|
||||
logger.error("Unknow object received. ->" + object);
|
||||
}//end of method
|
||||
}
|
|
@ -16,8 +16,8 @@ xxl.job.executor.logpath=./logs/xxl-job/jobhandler
|
|||
### xxl-job executor log-retention-days
|
||||
xxl.job.executor.logretentiondays=30
|
||||
|
||||
kafka.consumer.servers=localhost:29092
|
||||
kafka.consumer.group.id=0
|
||||
kafka.consumer.servers=localhost:29092,localhost:39092
|
||||
kafka.consumer.group.id=gitlink-notification-group
|
||||
kafka.consumer.auto.offset.reset=earliest
|
||||
kafka.consumer.enable.auto.commit=false
|
||||
kafka.consumer.auto.commit.interval=100
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
server:
|
||||
port: 8083
|
||||
|
||||
logging:
|
||||
config: classpath:logback.xml
|
||||
|
||||
spring:
|
||||
kafka:
|
||||
producer:
|
||||
bootstrap_servers: localhost:29092
|
||||
client_id: gitlink_producer_01
|
||||
retries: 5
|
||||
batch_size: 16384
|
||||
consumer:
|
||||
bootstrap_servers: localhost:29092
|
||||
client_id: gitlink_consumer
|
||||
group_id: group-gitlink-notification
|
||||
auto_offset_reset: earliest
|
||||
max_poll_records: 100
|
||||
topic: topic-gitlink-notification
|
||||
enable-auto-commit: true
|
||||
#自动提交的时间间隔
|
||||
auto-commit-interval: 1S
|
||||
listener:
|
||||
# 消费线程数
|
||||
concurrency: 3
|
||||
# 提交模式
|
||||
ack-mode: record
|
||||
|
||||
datasource:
|
||||
# 配置数据源类型
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
url: jdbc:mysql://127.0.0.1:33306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false
|
||||
username: gitlink
|
||||
password: giTlinK0^827
|
||||
#ip白名单列表,多个ip用逗号隔开,允许所有用*号
|
||||
white-list: '*'
|
|
@ -12,7 +12,7 @@ XXL_JOB_ADMIN_VERSION=2.3.0
|
|||
MYSQL_CONTAINER_NAME=gitlink_mysql
|
||||
REDIS_CONTAINER_NAME=gitlink_redis
|
||||
XXL_JOB_ADMIN_CONTAINER_NAME=gitlink_xxl_job_admin
|
||||
ZOOKEEPER_CONTAINER_NAME=gitlink_zookeepr
|
||||
ZOOKEEPER_CONTAINER_NAME=gitlink_zookeeper
|
||||
KAFKA_CONTAINER_01_NAME=gitlink_kafka_01
|
||||
KAFKA_CONTAINER_02_NAME=gitlink_kafka_02
|
||||
|
||||
|
|
|
@ -33,23 +33,6 @@ services:
|
|||
networks:
|
||||
- gitlink_network
|
||||
|
||||
|
||||
xxl-job-admin:
|
||||
image: xuxueli/xxl-job-admin:${XXL_JOB_ADMIN_VERSION}
|
||||
container_name: ${XXL_JOB_ADMIN_CONTAINER_NAME}
|
||||
hostname: xxl_job_admin
|
||||
environment:
|
||||
- JAVA_OPTS=-Xmx512m
|
||||
- PARAMS=--spring.datasource.url=jdbc:mysql://${MYSQL_CONTAINER_NAME}:3306/gitlink_notification?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai --spring.datasource.username=${MYSQL_USER} --spring.datasource.password=${MYSQL_PASSWORD}
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/xxl-job-admin:/data/applogs
|
||||
ports:
|
||||
- ${XXL_JOB_ADMIN_LOCAL_PORT}:8080
|
||||
depends_on:
|
||||
- mysql
|
||||
networks:
|
||||
- gitlink_network
|
||||
|
||||
# See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:latest
|
||||
|
@ -58,6 +41,8 @@ services:
|
|||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_TICK_TIME: 2000
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
|
||||
ports:
|
||||
- ${ZOOKEEPER_LOCAL_PORT}:2181
|
||||
networks:
|
||||
|
@ -72,10 +57,12 @@ services:
|
|||
- zookeeper
|
||||
ports:
|
||||
- ${KAFKA_01_LOCAL_PORT}:29092
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/${KAFKA_CONTAINER_01_NAME}:/var/lib/kafka
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:${KAFKA_01_LOCAL_PORT}
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
|
@ -91,10 +78,12 @@ services:
|
|||
- zookeeper
|
||||
ports:
|
||||
- ${KAFKA_02_LOCAL_PORT}:39092
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/${KAFKA_CONTAINER_02_NAME}:/var/lib/kafka
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 2
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:39092
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:${KAFKA_02_LOCAL_PORT}
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
|
|
|
@ -34,6 +34,8 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
sysNotification.setContent(newSysNotificationVo.getContent());
|
||||
sysNotification.setNotificationUrl(newSysNotificationVo.getNotification_url());
|
||||
sysNotification.setType(newSysNotificationVo.getType());
|
||||
sysNotification.setSource(newSysNotificationVo.getSource());
|
||||
sysNotification.setExtra(newSysNotificationVo.getExtra());
|
||||
List<String> list = Arrays.asList(newSysNotificationVo.getReceivers().split(","));
|
||||
for (String receiver : list) {
|
||||
sysNotification.setReceiver(Integer.parseInt(receiver));
|
||||
|
|
|
@ -27,6 +27,13 @@ spring:
|
|||
# 连接超时时间(毫秒)
|
||||
timeout: 1000
|
||||
|
||||
kafka:
|
||||
producer:
|
||||
bootstrap_servers: localhost:29092
|
||||
client_id: gitlink_producer_01
|
||||
retries: 5
|
||||
batch_size: 16384
|
||||
|
||||
datasource:
|
||||
# 配置数据源类型
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
|
|
|
@ -13,23 +13,34 @@ import cn.org.gitlink.notification.model.dao.entity.vo.UpdateNotificationStatusV
|
|||
import com.alibaba.fastjson.JSONObject;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import io.swagger.annotations.ApiParam;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.validation.BindingResult;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
@RestController
|
||||
@RequestMapping(value = "/gns/notification")
|
||||
@Configuration
|
||||
public class NotificationController {
|
||||
|
||||
//kafka topic
|
||||
private static final String GITLINK_NOTIFICATION_TOPIC = "gitlink_notification"; //在kafka中gitlink的topic
|
||||
@Value("${spring.kafka.producer.topic}")
|
||||
private String gitlinkNotificationTopic;
|
||||
|
||||
@Value("${spring.kafka.producer.partitions}")
|
||||
private Integer partitions;
|
||||
|
||||
@Value("${spring.kafka.producer.replication_factor}")
|
||||
private Short replicationFactor;
|
||||
|
||||
@Autowired
|
||||
private KafkaUtil kafkaUtil;
|
||||
|
@ -68,7 +79,8 @@ public class NotificationController {
|
|||
newSysNotificationVo.setPlatform(platform);
|
||||
|
||||
try {
|
||||
kafkaUtil.sendMessage(GITLINK_NOTIFICATION_TOPIC, JSONObject.toJSONString(newSysNotificationVo));
|
||||
kafkaUtil.createTopic(Arrays.asList(new NewTopic(gitlinkNotificationTopic, partitions, replicationFactor)));
|
||||
kafkaUtil.sendMessage(gitlinkNotificationTopic, JSONObject.toJSONString(newSysNotificationVo));
|
||||
return DataPacketUtil.jsonSuccessResult();
|
||||
} catch (Exception e) {
|
||||
logger.error(e);
|
||||
|
|
|
@ -5,37 +5,18 @@ logging:
|
|||
config: classpath:logback.xml
|
||||
|
||||
spring:
|
||||
#kafka配置
|
||||
kafka:
|
||||
#这里改为你的kafka服务器ip和端口号
|
||||
bootstrap-servers: localhost:29092
|
||||
#=============== producer =======================
|
||||
producer:
|
||||
#如果该值大于零时,表示启用重试失败的发送次数
|
||||
retries: 0
|
||||
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节)
|
||||
batch-size: 16384
|
||||
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443
|
||||
buffer-memory: 33554432
|
||||
#key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
|
||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
#value的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
|
||||
value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
#=============== consumer =======================
|
||||
consumer:
|
||||
#用于标识此使用者所属的使用者组的唯一字符串
|
||||
group-id: test-consumer-group
|
||||
#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
|
||||
#可选的值为latest, earliest, none
|
||||
auto-offset-reset: earliest
|
||||
#消费者的偏移量将在后台定期提交,默认值为true
|
||||
enable-auto-commit: true
|
||||
#如果'enable-auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
|
||||
auto-commit-interval: 100
|
||||
#密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
#值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
|
||||
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
bootstrap_servers: localhost:29092
|
||||
client_id: gitlink_producer_01
|
||||
retries: 5
|
||||
batch_size: 16384
|
||||
# 副本数
|
||||
replication_factor: 1
|
||||
# 分区数
|
||||
partitions: 3
|
||||
topic: topic-gitlink-notification
|
||||
|
||||
|
||||
datasource:
|
||||
# 配置数据源类型
|
||||
|
|
Loading…
Reference in New Issue