Compare commits

...

72 Commits

Author SHA1 Message Date
曾伟 ddb7098080 kafka producer和consumer初始操作修改
1.初始化时创建topic
2.初始化时没有kafka相关配置信息则不进行kafka连接
2021-09-18 14:13:00 +08:00
baladiwei 728efe9a57 Merge pull request '时区配置' (#100) from wanjia9506/gitlink-notification-system:dev_gitlink_model into master 2021-09-17 16:38:00 +08:00
万佳 ea8b9b3d37 Merge branch 'master' of https://git.trustie.net/Gitlink/gitlink-notification-system into dev_gitlink_model 2021-09-17 16:35:57 +08:00
万佳 8b6ef05fae 时区配置 2021-09-17 16:35:16 +08:00
baladiwei 1aeb8228d6 Merge pull request '处理 -v 映射的问题' (#99) from baladiwei/gitlink-notification-system:master into master 2021-09-17 15:15:42 +08:00
巴拉迪维 ca8035ffdf 处理 -v 映射的问题 2021-09-17 15:14:58 +08:00
baladiwei d6be64936f Merge pull request 'docker scripts later' (#98) from baladiwei/gitlink-notification-system:master into master 2021-09-17 15:06:07 +08:00
巴拉迪维 21969e11b4 docker scripts alter 2021-09-17 15:04:57 +08:00
万佳 2fd5441e15 subject长度限定修改 2021-09-17 14:46:29 +08:00
baladiwei 9cb9cc66a3 Merge pull request '调整代码部分代码结构,优化docker脚本' (#97) from baladiwei/gitlink-notification-system:master into master 2021-09-17 14:33:27 +08:00
巴拉迪维 fd965f0af4 调整代码部分代码结构,优化docker脚本 2021-09-17 14:32:44 +08:00
万佳 d4156ed7a6 代码完善 2021-09-17 14:30:01 +08:00
baladiwei ecaaa0cc7c Merge pull request '脚本 docker 调整' (#96) from baladiwei/gitlink-notification-system:master into master 2021-09-17 13:38:15 +08:00
巴拉迪维 ca3226d2b0 脚本调整
1、启动 docker 容器的脚本合并为一行
2、停止 docker 时对项目进行 mvn clean 操作
3、响应的 windows 脚本同步修改
2021-09-17 13:37:18 +08:00
万佳 935b2cdb5a 代码完善 2021-09-17 13:35:28 +08:00
baladiwei 12101057ae Merge pull request '1.Application添加注解 2.系统消息批量insert' (#94) from wanjia9506/gitlink-notification-system:dev_gitlink_model into master 2021-09-17 13:17:39 +08:00
baladiwei 42009c70c6 Merge pull request 'final check of timezone' (#95) from baladiwei/gitlink-notification-system:master into master 2021-09-17 13:17:07 +08:00
巴拉迪维 77dfbb9ceb final check of timezone 2021-09-17 13:16:29 +08:00
万佳 c485627592 Merge branch 'master' of https://git.trustie.net/Gitlink/gitlink-notification-system into dev_gitlink_model 2021-09-17 13:14:35 +08:00
baladiwei ad2626e646 Merge pull request '社区容器的时区' (#93) from baladiwei/gitlink-notification-system:master into master 2021-09-17 13:14:35 +08:00
巴拉迪维 4de908dfd8 社区容器的时区 2021-09-17 13:13:54 +08:00
万佳 3df4ef193f 1.Application添加@EnableTransactionManagement注解
2.系统消息批量insert
2021-09-17 13:09:27 +08:00
baladiwei dafc2a75e4 Merge pull request 'timezone of kafka' (#92) from baladiwei/gitlink-notification-system:master into master 2021-09-17 13:09:24 +08:00
巴拉迪维 0c02b16cf2 timezone of kafka 2021-09-17 13:08:46 +08:00
baladiwei 9ec6ef20b6 Merge pull request 'timezone of zookeeper' (#91) from baladiwei/gitlink-notification-system:master into master 2021-09-17 13:07:45 +08:00
巴拉迪维 91e118929d timezone of zookeeper 2021-09-17 13:07:05 +08:00
baladiwei 3e8174db0b Merge pull request 'timezone settings' (#90) from baladiwei/gitlink-notification-system:master into master 2021-09-17 13:04:24 +08:00
巴拉迪维 99d4f8da14 timezone of container 2021-09-17 13:03:48 +08:00
baladiwei bbd21bf5d4 Merge pull request '容器时区设置' (#89) from baladiwei/gitlink-notification-system:master into master 2021-09-17 12:44:55 +08:00
巴拉迪维 4285549fde 容器时区设置 2021-09-17 12:44:10 +08:00
baladiwei fd1745a009 Merge pull request 'email发送bug修复' (#88) from wanjia9506/gitlink-notification-system:dev_gitlink_model into master 2021-09-17 11:53:46 +08:00
万佳 f346f9e8b8 email发送bug修复 2021-09-17 11:52:44 +08:00
baladiwei 9eafaa7340 Merge pull request '去掉xxl-job相关的配置' (#87) from baladiwei/gitlink-notification-system:master into master 2021-09-17 11:49:52 +08:00
巴拉迪维 a17e2be4e6 去掉xxl-job相关的配置 2021-09-17 11:49:11 +08:00
baladiwei b46d89602c Merge pull request '处理忽略文件' (#86) from baladiwei/gitlink-notification-system:master into master 2021-09-17 11:20:05 +08:00
巴拉迪维 be91bbbe5c 处理忽略文件 2021-09-17 11:19:27 +08:00
baladiwei 7a1d06767b Merge pull request '处理email相关的配置' (#85) from baladiwei/gitlink-notification-system:master into master 2021-09-17 11:05:07 +08:00
巴拉迪维 cb36b2a42f 处理email相关的配置 2021-09-17 11:04:01 +08:00
baladiwei 201ff14880 Merge pull request 'email分配发送任务,email发送' (#84) from wanjia9506/gitlink-notification-system:dev_gitlink_model into master 2021-09-17 10:55:40 +08:00
万佳 75754b5246 email分配发送任务,email发送 2021-09-17 10:52:07 +08:00
万佳 a9224761cc Merge branch 'master' of https://git.trustie.net/Gitlink/gitlink-notification-system into dev_gitlink_model 2021-09-17 08:59:18 +08:00
baladiwei f6d2e3fc52 Merge pull request 'readme alter' (#83) from baladiwei/gitlink-notification-system:master into master 2021-09-16 16:25:17 +08:00
巴拉迪维 ee8889658e readme alter 2021-09-16 16:24:30 +08:00
baladiwei 98ab3d51b3 Merge pull request '新增发送Email的逻辑' (#82) from baladiwei/gitlink-notification-system:master into master 2021-09-16 11:19:34 +08:00
巴拉迪维 a504e3634b 新增发送Email的逻辑 2021-09-16 11:18:15 +08:00
万佳 fb748c6309 Merge branch 'dev_gitlink_model' of https://git.trustie.net/wanjia9506/gitlink-notification-system into dev_gitlink_model 2021-09-16 10:19:31 +08:00
万佳 ab294784f2 email建表sql 2021-09-16 10:18:24 +08:00
万佳 39fb424d14 Merge branch 'dev_gitlink_model' of https://git.trustie.net/wanjia9506/gitlink-notification-system into dev_gitlink_model 2021-09-16 09:56:15 +08:00
万佳 b1299713e5 批量更新优化 2021-09-16 09:48:39 +08:00
万佳 017e055822 Merge branch 'master' of https://git.trustie.net/Gitlink/gitlink-notification-system into dev_gitlink_model 2021-09-15 16:55:38 +08:00
baladiwei 66b09f16c7 Merge pull request 'Readme update' (#81) from baladiwei/gitlink-notification-system:master into master 2021-09-15 16:52:04 +08:00
巴拉迪维 2ee0e32953 readme update 2021-09-15 16:51:14 +08:00
万佳 81dff44d81 Merge branch 'master' of https://git.trustie.net/Gitlink/gitlink-notification-system into dev_gitlink_model
# 请输入一个提交信息以解释此合并的必要性,尤其是将一个更新后的上游分支
# 合并到主题分支。
#
# 以 '#' 开始的行将被忽略,而空的提交说明将终止提交。
2021-09-15 16:49:52 +08:00
巴拉迪维 6cebd22f6f update readme again 2021-09-15 16:48:56 +08:00
巴拉迪维 56f09bd3db update readme 2021-09-15 16:47:33 +08:00
baladiwei 3b5c11bb10 Merge pull request 'EmailJobsListener注释KafkaListener注解' (#80) from wanjia9506/gitlink-notification-system:dev_gitlink_model into master 2021-09-15 16:28:15 +08:00
万佳 2903ac29c7 EmailJobsListener注释KafkaListener注解 2021-09-15 16:16:43 +08:00
baladiwei 4a59fb481e Merge pull request '调整Readme' (#79) from baladiwei/gitlink-notification-system:master into master 2021-09-15 15:44:49 +08:00
巴拉迪维 350f8ce5e5 修改Readme 2021-09-15 15:44:00 +08:00
万佳 4cc9282c74 Merge branch 'dev_gitlink_model' of https://git.trustie.net/wanjia9506/gitlink-notification-system into dev_gitlink_model
# 请输入一个提交信息以解释此合并的必要性,尤其是将一个更新后的上游分支
# 合并到主题分支。
#
# 以 '#' 开始的行将被忽略,而空的提交说明将终止提交。
2021-09-15 15:38:35 +08:00
万佳 c14c7750eb email_send_records批量更新 2021-09-15 15:04:36 +08:00
baladiwei af94790098 Merge pull request '修改 ReadMe:新增运行方式说明' (#78) from baladiwei/gitlink-notification-system:master into master 2021-09-15 14:47:18 +08:00
巴拉迪维 38c5d045d8 修改 ReadMe:新增运行方式说明 2021-09-15 14:46:31 +08:00
baladiwei 90f044a380 Merge pull request '邮件model层完善,添加emailListener及emailService' (#77) from wanjia9506/gitlink-notification-system:dev_gitlink_model into master 2021-09-15 13:21:30 +08:00
万佳 a89bdb09b7 EmailService中返回值修改 2021-09-15 13:02:36 +08:00
baladiwei f3778a8ae0 Merge pull request '修改docker配置' (#76) from baladiwei/gitlink-notification-system:master into master 2021-09-15 12:28:36 +08:00
巴拉迪维 9237ab0273 修改docker配置 2021-09-15 12:27:56 +08:00
万佳 5ed6e03faf Merge branch 'master' of https://git.trustie.net/Gitlink/gitlink-notification-system into dev_gitlink_model 2021-09-15 11:38:04 +08:00
万佳 b3e98577ae 邮件model层完善,添加emailListener及emailService 2021-09-15 11:36:39 +08:00
baladiwei a029446998 Merge pull request '处理json格式化字符串中的时间格式问题' (#75) from baladiwei/gitlink-notification-system:master into master 2021-09-15 11:04:38 +08:00
巴拉迪维 369838cf45 处理json格式化字符串中的时间格式问题 2021-09-15 11:04:00 +08:00
baladiwei 5d1115d9c3 Merge pull request '添加bat脚本' (#74) from DavidZeng/gitlink-notification-system:dev_add_middleware_conf into master 2021-09-15 11:00:44 +08:00
46 changed files with 694 additions and 293 deletions

3
.gitignore vendored
View File

@ -93,4 +93,5 @@ target/
/logs/
*/src/main/resources/application.yml
*/src/main/resources/application.yml
*/src/main/resources/mail.properties

View File

@ -1,13 +1,13 @@
![](/api/attachments/365506?raw=true)
![](/api/attachments/366811?raw=true)
### 主要组成部分:
1. **xxl-job 调度管理中心** :提供对分布式的任务集中监控、管理等功能
2. **xxl-job 执行器** :消息中心的业务逻辑实现,目前只实现「系统通知」功能,后续可以考虑 `email-executor`、`text-message-executor` 等实现
3. **GNS-Writer**:给 `Forge业务层` 调用的 API 网关,负责从业务逻辑层接受信息
1. **管理后台(在建)** :提供对分布式的任务集中监控、管理等功能
2. **各类执行器** :消息中心的业务逻辑实现,目前只实现「系统通知」和「邮件通知」功能,后续可以考虑短信通知、微信通知等实现
3. **GNS-Writer**:给 `业务层` 调用的 API 网关,负责从业务逻辑层接受信息
4. **middleware**:独立运行的中间件服务, 包括消息中间件、缓存中间件、DB
5. **GNS-Reader**:给 `Forge业务层` 调用的 API 网关,负责提供业务数据
5. **GNS-Reader**:给 `业务层` 调用的 API 网关,负责提供业务数据
### 一些约定:
@ -22,4 +22,24 @@
3. 在本地分支修改任务并推送到自己名下的仓库
4. 在 [仓库首页](https://forgeplus.trustie.net/projects/Gitlink/gitlink-notification-system) 触发 pull request 并等待审核处理
### 使用 Docker 运行
1. git pull 拉取本仓库并保存到本地
2. 下载并安装 [Docker Engine](https://docs.docker.com/engine/install/)
3. 配置好 Java 环境以及 Maven 环境 (Java 8 版本以上、Maven 3 版本以上)
4. 复制配置文件,在仓库跟目录执行:
```
cp middleware/.env.example middleware/.env
cp reader/src/main/resources/application.yml.example reader/src/main/resources/application.yml
cp writer/src/main/resources/application.yml.example writer/src/main/resources/application.yml
cp executor/src/main/resources/application.yml.example executor/src/main/resources/application.yml
```
5. 修改 `{repo}/middleware/.env` 文件里 `SQL_SCRIPT_PATH``DOCKER_DATA_PATH` 绝对路径到本地磁盘
6. 进入到仓库根目录下的 `{repo}/middleware` 目录并执行 `start_docker_compose.sh` 脚本
7. 浏览器访问
1. 数据推送相关接口http://localhost:58082/doc.html
2. 数据读取相关接口http://localhost:58080/doc.html
8. 停止 Docker 时环境时请执行 `{repo}/middleware/end_docker_compose.sh` 脚本

View File

@ -1,4 +1,4 @@
package cn.org.gitlink.notification.common.utils;
package cn.org.gitlink.notification.common.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
@ -19,9 +19,6 @@ public class KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
private String bootstrapServers;
@Value("${spring.kafka.producer.client_id:#{null}}")
private String clientId;
@Value("${spring.kafka.producer.retries:#{null}}")
private Integer retries;
@ -30,19 +27,23 @@ public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerConfigs(),true);
return new KafkaTemplate<>(producerConfigs(), true);
}
@Bean
ProducerFactory<String, String> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 如果当前配置信息内没有kafka producer相关配置则不做参数设置此处解决没有设置配置信息而引起的空指针异常
if (null != bootstrapServers) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}
return new DefaultKafkaProducerFactory<>(props);
}
}

View File

@ -0,0 +1,35 @@
package cn.org.gitlink.notification.common.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Service;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
@PropertySource(value = "mail.properties")
@Service(value = "GitlinkMailUtils")
public class EmailUtils {
@Value("${spring.mail.username}")
private String emailFrom;
@Autowired
private JavaMailSender mailSender;
public void sendMail(String subject, String recipient, String content) throws MessagingException {
MimeMessage mail = mailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(mail, true);
helper.setTo(recipient);
helper.setFrom(emailFrom);
helper.setSubject(subject);
helper.setText(content, true);
mailSender.send(mail);
}
}

View File

@ -14,10 +14,7 @@ import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -37,6 +34,18 @@ public class KafkaUtil {
@Value("${spring.kafka.producer.bootstrap_servers:#{null}}")
private String kafkaServer;
@Value("${spring.kafka.producer.topic:#{null}")
private String topic;
@Value("${spring.kafka.producer.mail_topic:#{null}")
private String mailTopic;
@Value("${spring.kafka.producer.partitions:#{null}}")
private Integer partitions;
@Value("${spring.kafka.producer.replication_factor:#{null}}")
private Short replicationFactor;
private AdminClient adminClient;
@Autowired
@ -46,16 +55,38 @@ public class KafkaUtil {
* 初始化AdminClient
* '@PostConstruct该注解被用来修饰一个非静态的void方法
* @PostConstruct修饰的方法会在服务器加载Servlet的时候运行并且只会被服务器执行一次
* PostConstruct在构造函数之后执行init方法之前执行
* PostConstruct在构造函数之后执行init方法之前执行ls
*/
@PostConstruct
private void initAdminClient() {
Map<String, Object> props = new HashMap<>(1);
// 如果当前配置信息内没有kafka producer相关配置则不对adminClient做初始化
if (kafkaServer == null) {
return;
}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
adminClient = KafkaAdminClient.create(props);
// 初始化topics
// TODO: 如果后面新增topic需要在这里添加
if (null !=partitions && null != replicationFactor) {
List<NewTopic> topics = new LinkedList<>();
if (null != topic) {
topics.add(new NewTopic(topic, partitions, replicationFactor));
}
if (null != mailTopic) {
topics.add(new NewTopic(mailTopic, partitions, replicationFactor));
}
if (!topics.isEmpty()) {
this.createTopic(topics);
}
}
}
public void createTopic(Collection<NewTopic> newTopics) {

26
db/gns-email.sql Normal file
View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS `gitlink_email_jobs`;
CREATE TABLE `gitlink_email_jobs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sender` int(11) NOT NULL COMMENT '发送者id',
`emails` text NOT NULL COMMENT '收件人全部邮件地址',
`subject` varchar(500) DEFAULT NULL COMMENT '邮件主题',
`content` text NOT NULL COMMENT '邮件内容',
`created_at` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`dispatched_at` datetime DEFAULT NULL COMMENT '处理时间',
`dispatched_status` int(11) DEFAULT '-1' COMMENT '发送状态:-1 未处理,1 处理成功,2 处理失败',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
DROP TABLE IF EXISTS `gitlink_email_send_records`;
CREATE TABLE `gitlink_email_send_records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`email` varchar(500) NOT NULL DEFAULT '' COMMENT '收件人邮件地址',
`job_id` int(11) NOT NULL COMMENT '邮件详情id',
`created_at` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`sent_at` datetime DEFAULT NULL COMMENT '发送时间',
`status` int(11) DEFAULT '-1' COMMENT '发送状态:-1 未发送,1 发送成功,2 发送失败',
PRIMARY KEY (`id`),
KEY `index_on_email_and_status` (`email`,`status`),
KEY `index_on_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

View File

@ -1,121 +0,0 @@
#
# XXL-JOB v2.3.0
# Copyright (c) 2015-present, xuxueli.
CREATE database if NOT EXISTS `gitlink_notification` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `gitlink_notification`;
SET NAMES utf8mb4;
CREATE TABLE `xxl_job_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
`schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',
`schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',
`misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID多个逗号分隔',
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态0-停止1-运行',
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_id` int(11) NOT NULL COMMENT '任务主键ID',
`executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
`trigger_code` int(11) NOT NULL COMMENT '调度-结果',
`trigger_msg` text COMMENT '调度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
`handle_code` int(11) NOT NULL COMMENT '执行-状态',
`handle_msg` text COMMENT '执行-日志',
`alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态0-默认、1-无需告警、2-告警成功、3-告警失败',
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`),
KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_log_report` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
`running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
`suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
`fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_logglue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` int(11) NOT NULL COMMENT '任务主键ID',
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_registry` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(50) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
`title` varchar(12) NOT NULL COMMENT '执行器名称',
`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型0=自动注册、1=手动录入',
`address_list` text COMMENT '执行器地址列表,多地址逗号分隔',
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(50) NOT NULL COMMENT '账号',
`password` varchar(50) NOT NULL COMMENT '密码',
`role` tinyint(4) NOT NULL COMMENT '角色0-普通用户、1-管理员',
`permission` varchar(255) DEFAULT NULL COMMENT '权限执行器ID列表多个逗号分割',
PRIMARY KEY (`id`),
UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT '锁名称',
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' );
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
commit;

View File

@ -3,9 +3,11 @@ package cn.org.gitlink.notification.executor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ComponentScan("cn.org.gitlink.notification.*")
@SpringBootApplication
@EnableTransactionManagement
public class ExecutorApplication {
public static void main(String[] args) {
SpringApplication.run(ExecutorApplication.class, args);}

View File

@ -20,12 +20,6 @@ public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap_servers:#{null}}")
private String servers;
@Value("${spring.kafka.consumer.group_id}")
private String groupId;
@Value("${spring.kafka.consumer.client_id}")
private String clientId;
@Value("${spring.kafka.consumer.auto_offset_reset}")
private String autoOffsetReset;
@ -46,9 +40,7 @@ public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);

View File

@ -0,0 +1,119 @@
package cn.org.gitlink.notification.executor.service.email;
import cn.org.gitlink.notification.common.utils.EmailUtils;
import cn.org.gitlink.notification.model.dao.entity.EmailJob;
import cn.org.gitlink.notification.model.dao.entity.EmailSendRecord;
import cn.org.gitlink.notification.model.service.notification.EmailJobsService;
import cn.org.gitlink.notification.model.service.notification.EmailSendRecordsService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.mail.MessagingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Component
public class EmailService {
private Logger logger = LogManager.getLogger(EmailService.class);
//邮件任务处理状态
private static final Integer EMAIL_JOB_NOT_DISPATCHED = -1; //处理成功
private static final Integer EMAIL_JOB_DISPATCHED_SUCCESS = 1; //处理成功
private static final Integer EMAIL_JOB_DISPATCHED_FAIL = 2; //处理失败
//邮件发送记录状态
private static final Integer EMAIL_UNSENT_RECORD = -1; //处理成功
private static final Integer EMAIL_SENT_SUCCESS = 1; //处理成功
private static final Integer EMAIL_SENT_FAIL = 2; //处理失败
@Autowired
private EmailJobsService emailJobsService;
@Autowired
private EmailSendRecordsService emailSendRecordsService;
@Autowired
private EmailUtils emailUtils;
/**
* 处理邮件发送任务根据emails添加到邮件发送记录表中
*
* @param platform 平台编码
* @param dispatchNumber 待处理发送任务列表数量
* @return: void
* @Author: wanjia
* @Date: 2021/9/15
*/
@Transactional
public void DispatchEmailJobs(String platform, Integer dispatchNumber) {
//获取指定数量待处理列表
List<EmailJob> emailJobList = new ArrayList<>();
try {
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, EMAIL_JOB_NOT_DISPATCHED, dispatchNumber);
} catch (Exception e) {
logger.error("获取未处理邮件任务列表失败:\n" + e);
}
//将EmailJob分配到email_send_records中
Boolean flag = null;
for (EmailJob emailJob : emailJobList) {
try {
flag = emailSendRecordsService.newEmailSendRecords(platform, emailJob.getEmails(), emailJob.getId());
} catch (Exception e) {
logger.error("处理EmailJob失败email_job_id: " + emailJob.getId() + "\n" + e);
}
//EmailJob分配成功更新状态
try {
emailJobsService.markEmailJobsAs(platform, emailJob.getId(), new Date(), flag ? EMAIL_JOB_DISPATCHED_SUCCESS : EMAIL_JOB_DISPATCHED_FAIL);
} catch (Exception e) {
logger.error("更新EmailJob状态失败email_job_id: " + emailJob.getId() + "\n" + e);
}
}
}
/**
* 发送邮件
*
* @param platform 平台编码
* @param sentNumber 一次发送数量
* @return: void
* @Author: wanjia
* @Date: 2021/9/15
*/
@Transactional
public void sendEmail(String platform, Integer sentNumber) {
//获取待发送列表
List<EmailSendRecord> emailSendRecordList = new ArrayList<>();
try {
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, EMAIL_UNSENT_RECORD, sentNumber);
} catch (Exception e) {
logger.error("获取未发送邮件列表失败:\n" + e);
}
//发送邮件
Boolean flag = null;
for (EmailSendRecord unSentEmailSendRecord : emailSendRecordList) {
flag = false;
try {
emailUtils.sendMail(unSentEmailSendRecord.getSubject(), unSentEmailSendRecord.getEmail(), unSentEmailSendRecord.getContent());
flag = true;
unSentEmailSendRecord.setSentAt(new Date());
unSentEmailSendRecord.setStatus(flag ? EMAIL_SENT_SUCCESS : EMAIL_SENT_FAIL);
} catch (MessagingException e) {
logger.error("发送邮件失败email: " + unSentEmailSendRecord.getEmail() + "\n" + e);
}
}
//更新emailSendRecord状态
try {
emailSendRecordsService.markEmailSendRecordsAs(platform, emailSendRecordList);
} catch (Exception e) {
logger.error(e);
}
}
}

View File

@ -0,0 +1,53 @@
package cn.org.gitlink.notification.executor.service.jobhandler;
import cn.org.gitlink.notification.common.utils.KafkaUtil;
import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobVo;
import cn.org.gitlink.notification.model.service.notification.EmailJobsService;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
@Configuration
@KafkaListener(topics = "${spring.kafka.consumer.topic_email}", groupId = "${spring.kafka.consumer.group_id_email}")
public class EmailJobsListener {
private Logger logger = LogManager.getLogger(EmailJobsListener.class);
@Autowired
private EmailJobsService emailJobsService;
@Value("${spring.kafka.producer.topic_new_email_remind}")
private String gitlinkNewEmailRemindTopic;
@Autowired
private KafkaUtil kafkaUtil;
@KafkaHandler
public void messageHandler(String message) {
try {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
Boolean flag = emailJobsService.sendEmail(newEmailJobVo);
//if the message is inserted successfully, send a new email-job message to kafka
if (flag){
kafkaUtil.sendMessage(gitlinkNewEmailRemindTopic, JSONObject.toJSONString(newEmailJobVo));
}
} catch (Exception e) {
logger.error(e);
}
}
@KafkaHandler(isDefault = true)
public void defaultHandler(Object object) {
logger.error("Unknow object received. ->" + object);
}//end of method
}

View File

@ -0,0 +1,39 @@
package cn.org.gitlink.notification.executor.service.jobhandler;
import cn.org.gitlink.notification.executor.service.email.EmailService;
import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobVo;
import com.alibaba.fastjson.JSONObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Configuration
@KafkaListener(topics = "${spring.kafka.consumer.topic_new_email_remind}", groupId = "${spring.kafka.consumer.group_id_email}")
public class EmailSentListener {
private Logger logger = LogManager.getLogger(EmailSentListener.class);
@Autowired
private EmailService emailService;
@KafkaHandler
public void messageHandler(String message) {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
//处理邮件任务
emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), 1);
//发送邮件
emailService.sendEmail(newEmailJobVo.getPlatform(), 100);
}
@KafkaHandler(isDefault = true)
public void defaultHandler(Object object) {
logger.error("Unknow object received. ->" + object);
}//end of method
}

View File

@ -8,23 +8,25 @@ spring:
kafka:
producer:
bootstrap_servers: kafka1:9092,kafka2:9092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
replication_factor: 1
partitions: 3
topic_new_email_remind: topic-gitlink-new-email-remind
consumer:
bootstrap_servers: kafka1:9092,kafka2:9092
client_id: gitlink_consumer
group_id: group-gitlink-notification
group_id_email: group-gitlink-email
group_id_new_email_remind: group-gitlink-new-email-remind
auto_offset_reset: earliest
max_poll_records: 100
topic: topic-gitlink-notification
enable-auto-commit: true
#自动提交的时间间隔
auto-commit-interval: 1S
topic: topic-gitlink-notification
topic_email: topic-gitlink-email
topic_new_email_remind: topic-gitlink-new-email-remind
listener:
# 消费线程数
concurrency: 3
# 提交模式
ack-mode: record
redis:
@ -40,10 +42,16 @@ spring:
timeout: 1000
datasource:
# 配置数据源类型
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://mysql:3306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false
username: gitlink
password: giTlinK0^827
#ip白名单列表多个ip用逗号隔开允许所有用*号
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
serialization:
indent_output: true
fail_on_empty_beans: false
white-list: '*'

View File

@ -0,0 +1,12 @@
spring.mail.host=smtp.exmail.qq.com
spring.mail.default-encoding=utf-8
spring.mail.port=465
spring.mail.username=gitlink@barats.cn
spring.mail.password=
spring.mail.properties[mail.smtp.auth]=true
spring.mail.properties[mail.smtp.connectiontimeout]=10000
spring.mail.properties[mail.smtp.timeout]=10000
spring.mail.properties[mail.smtp.writetimeout]=10000
spring.mail.properties[mail.smtp.starttls.enable]=true
spring.mail.properties[mail.smtp.socketFactory.port]=465
spring.mail.properties[mail.smtp.socketFactory.class]=javax.net.ssl.SSLSocketFactory

View File

@ -1 +1 @@
docker-compose -f services.yml down && docker volume prune -f
docker-compose -f services.yml down && docker volume prune -f && mvn -f ../pom.xml clean

View File

@ -1,2 +1 @@
docker-compose -f services.yml down
docker volume prune -f
docker-compose -f services.yml down && docker volume prune -f && mvn -f ../pom.xml clean

View File

@ -13,16 +13,15 @@ services:
- TZ=Asia/Shanghai
volumes:
- ${DOCKER_DATA_PATH}/mysql:/var/lib/mysql
- ${SQL_SCRIPT_PATH}/xxl-job-structure.sql:/docker-entrypoint-initdb.d/0000.sql
- ${SQL_SCRIPT_PATH}/gns-notification.sql:/docker-entrypoint-initdb.d/0001.sql
- ${SQL_SCRIPT_PATH}/hehui-gns-notification.sql:/docker-entrypoint-initdb.d/0002.sql
- ${SQL_SCRIPT_PATH}/gns-email.sql:/docker-entrypoint-initdb.d/0003.sql
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
ports:
- ${MYSQL_LOCAL_PORT}:3306
networks:
- gitlink_network
redis:
image: redis:${REDIS_VERSION}
container_name: ${REDIS_CONTAINER_NAME}
@ -30,6 +29,8 @@ services:
volumes:
- ${DOCKER_DATA_PATH}/redis/data:/data
- ${DOCKER_DATA_PATH}/redis/logs:/logs
environment:
- TZ=Asia/Shanghai
ports:
- ${REDIS_LOCAL_PORT}:6379
networks:
@ -45,8 +46,8 @@ services:
ZOOKEEPER_TICK_TIME: 2000
ports:
- ${ZOOKEEPER_LOCAL_PORT}:2181
volumes:
- ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
# volumes:
# - ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
networks:
- gitlink_network
@ -59,8 +60,10 @@ services:
- zookeeper
ports:
- ${KAFKA_01_LOCAL_PORT}:29092
volumes:
- ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}:/var/lib/kafka
# volumes:
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/lib:/var/lib/kafka
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/logs:/var/logs/kafka
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/conf:/etc/kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
@ -80,8 +83,10 @@ services:
- zookeeper
ports:
- ${KAFKA_02_LOCAL_PORT}:39092
volumes:
- ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}:/var/lib/kafka
# volumes:
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/lib:/var/lib/kafka
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/logs:/var/logs/kafka
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/conf:/etc/kafka
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
@ -92,7 +97,6 @@ services:
networks:
- gitlink_network
gitlink-reader:
container_name: ${GNS_READER_CONTAINER_NAME}
hostname: gitlink_reader
@ -102,8 +106,10 @@ services:
dockerfile: middleware/reader.Dockerfile
networks:
- gitlink_network
environment:
- TZ=Asia/Shanghai
volumes:
- ${DOCKER_DATA_PATH}/logs/:/data/logs/
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
depends_on:
- kafka1
- kafka2
@ -121,8 +127,10 @@ services:
dockerfile: middleware/writer.Dockerfile
networks:
- gitlink_network
environment:
- TZ=Asia/Shanghai
volumes:
- ${DOCKER_DATA_PATH}/logs/:/data/logs/
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
depends_on:
- kafka1
- kafka2
@ -141,10 +149,14 @@ services:
networks:
- gitlink_network
volumes:
- ${DOCKER_DATA_PATH}/logs/:/data/logs/
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
environment:
- TZ=Asia/Shanghai
depends_on:
- kafka1
- kafka2
- redis
- mysql
ports:
- ${GNS_EXECUTOR_LOCAL_PORT}:8083

View File

@ -1,2 +1 @@
mvn -f ../pom.xml clean package -DskipTests
docker-compose -f services.yml up --build --force-recreate
mvn -f ../pom.xml clean package -DskipTests && docker-compose -f services.yml up --build --force-recreate

View File

@ -20,7 +20,6 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<configuration>
<includeSystemScope>true</includeSystemScope>
</configuration>

View File

@ -2,8 +2,10 @@ package cn.org.gitlink.notification.model;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableTransactionManagement
public class ModelApplication {
public static void main(String[] args) {
SpringApplication.run(ModelApplication.class, args);

View File

@ -21,11 +21,11 @@ public class EmailJob {
private String content;
@JsonProperty("created_at")
@JsonFormat(pattern = "yyyy-MM-dd HH:dd:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createdAt;
@JsonProperty("dispatched_at")
@JsonFormat(pattern = "yyyy-MM-dd HH:dd:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date dispatchedAt;
private Integer dispatchedStatus;

View File

@ -1,6 +1,7 @@
package cn.org.gitlink.notification.model.dao.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -16,15 +17,21 @@ public class EmailSendRecord {
private Integer jobId;
@JsonProperty("created_at")
@JsonFormat(pattern = "yyyy-MM-dd HH:dd:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createdAt;
@JsonProperty("sent_at")
@JsonFormat(pattern = "yyyy-MM-dd HH:dd:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date sentAt;
private Integer status;
@TableField(exist = false)
private String content;
@TableField(exist = false)
private String subject;
public Integer getId() {
return id;
}
@ -72,4 +79,20 @@ public class EmailSendRecord {
public void setStatus(Integer status) {
this.status = status;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getSubject() {
return subject;
}
public void setSubject(String subject) {
this.subject = subject;
}
}

View File

@ -22,7 +22,7 @@ public class SysNotification {
private String notificationUrl;
@JsonProperty("created_at")
@JsonFormat(pattern = "yyyy-MM-dd HH:dd:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createdAt;
private Integer status;

View File

@ -19,5 +19,5 @@ public interface EmailJobsMapper extends BaseMapper<EmailJob> {
int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record);
List<EmailJob> getNotDispatchedEmailJobs(@Param("platform") String platform);
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("size") Integer size);
}

View File

@ -22,4 +22,8 @@ public interface EmailSendRecordsMapper extends BaseMapper<EmailSendRecord> {
//批量插入邮件发送任务记录
int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList);
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("size") Integer sentNumber);
//批量更新邮件发送任务记录
int updateEmailSendRecordsBatch(@Param("platform") String platform, @Param("list") List<EmailSendRecord> emailSendRecordList);
}

View File

@ -51,4 +51,7 @@ public interface SysNotificationMapper extends BaseMapper<SysNotification> {
@Param("receiver") Integer receiver,
@Param("notificationIds") String notificationIds,
@Param("type") Integer type);
//批量插入系统消息数据
int insertSysNotificationsBatch(@Param("platform") String platform, @Param("list") List<SysNotification> sysNotificationList);
}

View File

@ -10,9 +10,9 @@ import java.util.List;
public interface EmailJobsService extends IService<EmailJob> {
/**
* @Description: 添加发送邮件任务
* 添加发送邮件任务
*
* @Param newEmailJobVo
* @param newEmailJobVo
* @return: boolean
* @Author: wanjia
* @Date: 2021/9/13
@ -20,26 +20,28 @@ public interface EmailJobsService extends IService<EmailJob> {
boolean sendEmail(NewEmailJobVo newEmailJobVo) throws Exception;
/**
* @Description: 获取所有未处理邮件任务列表
* 获取所有未处理邮件任务列表
*
* @Param platform 平台编码
* @param platform 平台编码
* @param dispatchedStatus 发送状态-1 未处理,1 处理成功,2 处理失败
* @param size 列表大小
* @return: List<EmailJob>
* @Author: wanjia
* @Date: 2021/9/13
*/
List<EmailJob> getNotDispatchedEmailJobs(String platform) throws Exception;
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) throws Exception;
/**
* @Description:
* 更新邮件任务状态
*
* @Param platform 平台编码
* @Param emailJobId 邮件任务id
* @Param dispatchedAt 处理时间
* @Param dispatchedStatus 处理状态 -1 未处理,1 处理成功,2 处理失败
* @param platform 平台编码
* @param emailJobId 邮件任务id
* @param dispatchedAt 处理时间
* @param dispatchedStatus 处理状态 -1 未处理,1 处理成功,2 处理失败
* @return: int
* @Author: wanjia
* @Date: 2021/9/13
*/
int markEmailJobAs(String platform, Integer emailJobId, Date dispatchedAt, Integer dispatchedStatus) throws Exception;
int markEmailJobsAs(String platform, Integer emailJobId, Date dispatchedAt, Integer dispatchedStatus) throws Exception;
}

View File

@ -4,13 +4,15 @@ import cn.org.gitlink.notification.model.dao.entity.EmailSendRecord;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.Date;
import java.util.List;
public interface EmailSendRecordsService extends IService<EmailSendRecord> {
/**
* @Description: 新增邮件任务到邮件发送记录表中
* @Param platform 平台编码
* @Param emails 邮件地址eg: w@163.com,j@163.com
* @Param jobId 邮件任务id
* 新增邮件任务到邮件发送记录表中
*
* @param platform 平台编码
* @param emails 邮件地址eg: w@163.com,j@163.com
* @param jobId 邮件任务id
* @return: boolean
* @Author: wanjia
* @Date: 2021/9/13
@ -18,16 +20,26 @@ public interface EmailSendRecordsService extends IService<EmailSendRecord> {
boolean newEmailSendRecords(String platform, String emails, Integer jobId) throws Exception;
/**
* @Description:
* 获取发送记录列表
*
* @param platform 平台编码
* @param status 邮件发送记录状态 -1未发送 1发送成功 2发送失败
* @param size 列表数量
* @return: List<EmailSendRecord>
* @Author: wanjia
* @Date: 2021/9/15
*/
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer size) throws Exception;
/**
* 变更邮件发送记录状态
*
* @Param platform 平台编码
* @Param emailSendRecordId 邮件发送记录id
* @Param sentAt 发送时间
* @Param status 发送状态 -1 未发送,1 发送成功,2 发送失败
* @param platform 平台编码
* @param emailSendRecordList 待更新列表
* @return: int
* @Author: wanjia
* @Date: 2021/9/13
*/
int markEmailSendRecordAs(String platform, Integer emailSendRecordId, Date sentAt, Integer status) throws Exception;
int markEmailSendRecordsAs(String platform, List<EmailSendRecord> emailSendRecordList) throws Exception;
}

View File

@ -5,10 +5,7 @@ import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobVo;
import cn.org.gitlink.notification.model.dao.mapper.EmailJobsMapper;
import cn.org.gitlink.notification.model.service.notification.EmailJobsService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.stereotype.Service;
import java.util.Date;
@ -26,12 +23,12 @@ public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob>
}
@Override
public List<EmailJob> getNotDispatchedEmailJobs(String platform) {
return baseMapper.getNotDispatchedEmailJobs(platform);
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) {
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, size);
}
@Override
public int markEmailJobAs(String platform, Integer emailJobId, Date dispatchedAt, Integer dispatchedStatus) throws Exception {
public int markEmailJobsAs(String platform, Integer emailJobId, Date dispatchedAt, Integer dispatchedStatus) throws Exception {
EmailJob emailJob = new EmailJob();
emailJob.setId(emailJobId);
emailJob.setDispatchedAt(dispatchedAt);

View File

@ -9,7 +9,6 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@Service
@ -18,11 +17,11 @@ public class EmailSendRecordsServiceImpl extends ServiceImpl<EmailSendRecordsMap
@Override
@Transactional
public boolean newEmailSendRecords(String platform, String emails, Integer jobId) {
EmailSendRecord emailSendRecord = new EmailSendRecord();
emailSendRecord.setJobId(jobId);
List<String> list = Arrays.asList(emails.split(","));
List<EmailSendRecord> emailSendRecordList = new ArrayList<EmailSendRecord>();
for (String email : list){
EmailSendRecord emailSendRecord = new EmailSendRecord();
emailSendRecord.setJobId(jobId);
emailSendRecord.setEmail(email);
emailSendRecordList.add(emailSendRecord);
}
@ -31,11 +30,12 @@ public class EmailSendRecordsServiceImpl extends ServiceImpl<EmailSendRecordsMap
}
@Override
public int markEmailSendRecordAs(String platform, Integer emailSendRecordId, Date sentAt, Integer status) throws Exception {
EmailSendRecord emailSendRecord = new EmailSendRecord();
emailSendRecord.setId(emailSendRecordId);
emailSendRecord.setSentAt(sentAt);
emailSendRecord.setStatus(status);
return baseMapper.updateByPrimaryKeySelective(platform, emailSendRecord);
public int markEmailSendRecordsAs(String platform, List<EmailSendRecord> emailSendRecordList){
return baseMapper.updateEmailSendRecordsBatch(platform, emailSendRecordList);
}
@Override
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer sentNumber){
return baseMapper.getRecordsByStatus(platform, status, sentNumber);
}
}

View File

@ -12,6 +12,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -28,20 +29,24 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
@Override
@Transactional
public boolean sendNotification(NewSysNotificationVo newSysNotificationVo) throws Exception {
SysNotification sysNotification = new SysNotification();
sysNotification.setSender(newSysNotificationVo.getSender());
sysNotification.setContent(newSysNotificationVo.getContent());
sysNotification.setNotificationUrl(newSysNotificationVo.getNotification_url());
sysNotification.setType(newSysNotificationVo.getType());
sysNotification.setSource(newSysNotificationVo.getSource());
sysNotification.setExtra(newSysNotificationVo.getExtra());
List<SysNotification> sysNotificationList = new ArrayList<>();
List<String> list = Arrays.asList(newSysNotificationVo.getReceivers().split(","));
for (String receiver : list) {
SysNotification sysNotification = new SysNotification();
sysNotification.setSender(newSysNotificationVo.getSender());
sysNotification.setContent(newSysNotificationVo.getContent());
sysNotification.setNotificationUrl(newSysNotificationVo.getNotification_url());
sysNotification.setType(newSysNotificationVo.getType());
sysNotification.setSource(newSysNotificationVo.getSource());
sysNotification.setExtra(newSysNotificationVo.getExtra());
sysNotification.setReceiver(Integer.parseInt(receiver));
//TODO: 需要批量插入的支持
baseMapper.insertSelective(newSysNotificationVo.getPlatform(), sysNotification);
sysNotificationList.add(sysNotification);
}
baseMapper.insertSysNotificationsBatch(newSysNotificationVo.getPlatform(), sysNotificationList);
for (String receiver : list){
this.delUserCache(newSysNotificationVo.getPlatform(), Integer.parseInt(receiver));
}
return true;
}

View File

@ -125,8 +125,11 @@
dispatched_status = #{record.dispatchedStatus,jdbcType=INTEGER}
where id = #{record.id,jdbcType=INTEGER}
</update>
<select id="getNotDispatchedEmailJobs" resultMap="BaseResultMap">
<select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap">
select * from ${platform}_email_jobs
where dispatched_status = -1
where dispatched_status = #{dispatchedStatus} order by id desc
<if test="size != null">
limit #{size}
</if>
</select>
</mapper>

View File

@ -117,4 +117,36 @@
)
</foreach >
</insert >
<select id="getRecordsByStatus" resultMap="BaseResultMap">
select a.id, a.email, a.job_id, a.created_at, a.sent_at, a.status, b.subject, b.content
from ${platform}_email_send_records a
left join ${platform}_email_jobs b on a.job_id = b.id
where status = #{status} order by id desc
<if test="size != null">
limit #{size}
</if>
</select>
<update id="updateEmailSendRecordsBatch" parameterType="list">
update ${platform}_email_send_records
<trim prefix="set" suffixOverrides=",">
<trim prefix="sent_at =case" suffix="end,">
<foreach collection="list" item="item" index="index">
<if test="item.sentAt!=null">
when id=#{item.id} then #{item.sentAt}
</if>
</foreach>
</trim>
<trim prefix=" status =case" suffix="end,">
<foreach collection="list" item="item" index="index">
<if test="item.status!=null">
when id=#{item.id} then #{item.status}
</if>
</foreach>
</trim>
</trim>
where id in
<foreach collection="list" item="item" separator="," open="(" close=")">
#{item.id}
</foreach>
</update>
</mapper>

View File

@ -213,4 +213,20 @@
and id in (${notificationIds})
</if>
</update>
<insert id ="insertSysNotificationsBatch" parameterType="java.util.List" >
<selectKey resultType ="java.lang.Integer" keyProperty= "id"
order= "AFTER">
SELECT LAST_INSERT_ID()
</selectKey >
insert into ${platform}_sys_notification
(id, sender, receiver, content, notification_url, type, source, extra)
values
<foreach collection ="list" item="record" index= "index" separator =",">
(
#{record.id,jdbcType=INTEGER}, #{record.sender,jdbcType=INTEGER}, #{record.receiver,jdbcType=INTEGER},
#{record.content,jdbcType=VARCHAR}, #{record.notificationUrl,jdbcType=VARCHAR}, #{record.type,jdbcType=TINYINT},
#{record.source,jdbcType=VARCHAR}, #{record.extra,jdbcType=VARCHAR}
)
</foreach >
</insert >
</mapper>

View File

@ -2,8 +2,10 @@ package cn.org.gitlink.notification.model;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableTransactionManagement
public class ModelApplicationTest {
public static void main(String[] args) {
SpringApplication.run(ModelApplicationTest.class, args);

View File

@ -0,0 +1,69 @@
package cn.org.gitlink.notification.model.service.notification;
import cn.org.gitlink.notification.model.dao.entity.EmailJob;
import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobVo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import java.util.Date;
import java.util.List;
@RunWith(MockitoJUnitRunner.class)
public class EmailServiceTest {
@Mock
EmailJobsService emailJobsService;
@Mock
EmailSendRecordsService emailSendRecordsService;
Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
public void testEmailService(){
NewEmailJobVo emailJobVo = new NewEmailJobVo();
emailJobVo.setPlatform("gitlink");
emailJobVo.setSender(1);
emailJobVo.setEmails("w@163.com,j@163.com");
emailJobVo.setSubject("subjectTest");
emailJobVo.setContent("contentTest");
//添加发送邮件任务
try {
Assert.isTrue(emailJobsService.sendEmail(emailJobVo), "done");
} catch (Exception e) {
e.printStackTrace();
}
//获取待处理邮件任务列表
try {
List<EmailJob> emailJobs = emailJobsService.getEmailJobsByDispatchedStatus("gitlink",-1,10);
Assert.notEmpty(emailJobs, "success");
} catch (Exception e) {
e.printStackTrace();
}
//获取待处理邮件任务列表并处理邮件任务在records表中插入发送记录数据
try {
List<EmailJob> emailJobs = emailJobsService.getEmailJobsByDispatchedStatus("gitlink", -1, 10);
for (EmailJob emailJob : emailJobs){
emailSendRecordsService.newEmailSendRecords("gitlink", emailJob.getEmails(), emailJob.getId());
}
//发送邮件成功后更新邮件发送记录
// int count = emailSendRecordsService.markEmailSendRecordsAs("gitlink", 1, new Date(), 1);
// Assert.isTrue(count > 0, "update email_send_record status success");
} catch (Exception e){
e.printStackTrace();
}
try {
int count = emailJobsService.markEmailJobsAs("gitlink", 1, new Date(), 1);
Assert.isTrue(count > 0, "update status success");
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -13,8 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ModelApplicationTest.class)
@ -28,22 +26,19 @@ public class ServiceTests {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
public void testSysNotificationService() {
// int i = sysNotificationService.getUnreadNotificationCount("gitlink", 234);
// List<SysNotification> sysNotificationList = sysNotificationService.getUnreadNotificationByType("gitlink", 100, 20, 1);
// Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification(1, 10, "", "gitlink", 100);
// NewSysNotificationVo newSysNotificationVo = new NewSysNotificationVo();
// newSysNotificationVo.setSender((long) -1);
// newSysNotificationVo.setReceivers("7,8");
// newSysNotificationVo.setContent("baladiwei 在 gitlink-notification-system 修改ReadMe 文件的标题");
// newSysNotificationVo.setNotification_url("www.baidu.com");
// newSysNotificationVo.setPlatform("gitlink");
// boolean j = sysNotificationService.sendNotification(newSysNotificationVo);
// logger.info("insertResult:" + j);
// int k = sysNotificationService.markNotificationAs("gitlink", "2,3", 2);
// logger.info("updateStatus:" + k);
// int l = sysNotificationService.getUnreadNotificationByType(1,"gitlink",1);
// logger.info("SysNotificationNumber:" + l);
public void testSysNotificationService() throws Exception {
int i = sysNotificationService.getNotificationCount("gitlink", 234,1,1);
Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification("gitlink", 100,1,1,1,20);
NewSysNotificationVo newSysNotificationVo = new NewSysNotificationVo();
newSysNotificationVo.setSender(1);
newSysNotificationVo.setReceivers("7,8");
newSysNotificationVo.setContent("baladiwei 在 gitlink-notification-system 修改ReadMe 文件的标题");
newSysNotificationVo.setNotification_url("www.baidu.com");
newSysNotificationVo.setPlatform("gitlink");
boolean j = sysNotificationService.sendNotification(newSysNotificationVo);
logger.info("insertResult:" + j);
int k = sysNotificationService.markNotificationAs("gitlink", 100, "1,2",2,1);
logger.info("updateStatus:" + k);
}
@Test

View File

@ -67,11 +67,18 @@
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
<version>${springboot.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -3,9 +3,11 @@ package cn.org.gitlink.notification.reader;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ComponentScan("cn.org.gitlink.notification.*")
@SpringBootApplication
@EnableTransactionManagement
public class ReaderApiApplication {
public static void main(String[] args) {
SpringApplication.run(ReaderApiApplication.class, args);

View File

@ -7,39 +7,28 @@ logging:
spring:
redis:
# Redis数据库索引默认为0
database: 0
# Redis服务器地址
host: redis
# Redis服务器连接端口
port: 6379
# Redis服务器连接密码默认为空
password:
pool:
# 连接池最大连接数(使用负值表示没有限制)
max-active: 200
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1
# 连接池中的最大空闲连接
max-idle: 10
# 连接池中的最小空闲连接
min-idle: 0
# 连接超时时间(毫秒)
timeout: 1000
kafka:
producer:
bootstrap_servers: kafka1:9092,kafka2:9092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
datasource:
# 配置数据源类型
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://mysql:3306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false
username: gitlink
password: giTlinK0^827
#ip白名单列表多个ip用逗号隔开允许所有用*号
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
serialization:
indent_output: true
fail_on_empty_beans: false
white-list: '*'

View File

@ -0,0 +1,10 @@
spring.mail.host=smtp.exmail.qq.com
spring.mail.default-encoding=utf-8
spring.mail.port=465
spring.mail.username=gitlink@barats.cn
spring.mail.password=
spring.mail.properties[mail.smtp.auth]=true
spring.mail.properties[mail.smtp.connectiontimeout]=10000
spring.mail.properties[mail.smtp.timeout]=10000
spring.mail.properties[mail.smtp.writetimeout]=10000
spring.mail.properties[mail.smtp.starttls.enable]=true

View File

@ -3,9 +3,11 @@ package cn.org.gitlink.notification;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ComponentScan("cn.org.gitlink.notification.*")
@SpringBootApplication
@EnableTransactionManagement
public class WriterApiApplication {
public static void main(String[] args) {
SpringApplication.run(WriterApiApplication.class, args);

View File

@ -5,12 +5,8 @@ import cn.org.gitlink.notification.common.response.DataPacketUtil;
import cn.org.gitlink.notification.common.response.ResponseData;
import cn.org.gitlink.notification.common.utils.KafkaUtil;
import cn.org.gitlink.notification.common.utils.ValidatorUtils;
import cn.org.gitlink.notification.model.dao.entity.EmailJob;
import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobParamsVo;
import cn.org.gitlink.notification.model.dao.entity.vo.NewEmailJobVo;
import cn.org.gitlink.notification.model.dao.entity.vo.NewSysNotificationVo;
import cn.org.gitlink.notification.model.service.notification.EmailJobsService;
import cn.org.gitlink.notification.model.service.notification.EmailSendRecordsService;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@ -19,13 +15,12 @@ import org.apache.logging.log4j.Logger;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.BindingResult;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
import java.util.List;
import java.util.Map;
@RestController
@ -33,18 +28,18 @@ import java.util.Map;
@Configuration
public class EmailJobsController {
//todo yml文件中配置topic后替换
private static final String GITLINK_EMAIL_TOPIC = "topic_gitlink_email";
private Logger logger = LogManager.getLogger(EmailJobsController.class);
@Value("${spring.kafka.producer.topic_email}")
private String gitlinkEmailTopic;
@Autowired
private KafkaUtil kafkaUtil;
@ApiOperation("发送邮件任务")
@RequestMapping(path = "/{platform}", method = RequestMethod.POST)
@ResponseBody
public ResponseData sendNotification(@ApiParam(value = "平台编码", required = true)
public ResponseData sendEmail(@ApiParam(value = "平台编码", required = true)
@PathVariable(name = "platform") String platform,
@Validated @RequestBody NewEmailJobParamsVo newEmailJobParamsVo,
@ -69,9 +64,7 @@ public class EmailJobsController {
newEmailJobVo.setPlatform(platform);
try {
//todo creatTopic
kafkaUtil.sendMessage(GITLINK_EMAIL_TOPIC, JSONObject.toJSONString(newEmailJobVo));
kafkaUtil.sendMessage(gitlinkEmailTopic, JSONObject.toJSONString(newEmailJobVo));
return DataPacketUtil.jsonSuccessResult();
} catch (Exception e) {
logger.error(e);

View File

@ -13,7 +13,6 @@ import cn.org.gitlink.notification.model.service.notification.SysNotificationSer
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.BeanUtils;
@ -25,7 +24,6 @@ import org.springframework.validation.BindingResult;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.Map;
@RestController
@ -36,12 +34,6 @@ public class NotificationController {
@Value("${spring.kafka.producer.topic}")
private String gitlinkNotificationTopic;
@Value("${spring.kafka.producer.partitions}")
private Integer partitions;
@Value("${spring.kafka.producer.replication_factor}")
private Short replicationFactor;
@Autowired
private KafkaUtil kafkaUtil;
@ -79,7 +71,6 @@ public class NotificationController {
newSysNotificationVo.setPlatform(platform);
try {
kafkaUtil.createTopic(Arrays.asList(new NewTopic(gitlinkNotificationTopic, partitions, replicationFactor)));
kafkaUtil.sendMessage(gitlinkNotificationTopic, JSONObject.toJSONString(newSysNotificationVo));
return DataPacketUtil.jsonSuccessResult();
} catch (Exception e) {

View File

@ -8,12 +8,12 @@ spring:
kafka:
producer:
bootstrap_servers: kafka1:9092,kafka2:9092
client_id: gitlink_producer_01
retries: 5
batch_size: 16384
replication_factor: 1
partitions: 3
topic: topic-gitlink-notification
topic_email: topic-gitlink-email
redis:
database: 0
@ -27,12 +27,17 @@ spring:
min-idle: 0
timeout: 1000
datasource:
# 配置数据源类型
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://mysql:3306/gitlink_notification?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowMultiQueries=true&useSSL=false
username: gitlink
password: giTlinK0^827
#ip白名单列表多个ip用逗号隔开允许所有用*号
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
serialization:
indent_output: true
fail_on_empty_beans: false
white-list: '*'

View File

@ -0,0 +1,10 @@
spring.mail.host=smtp.exmail.qq.com
spring.mail.default-encoding=utf-8
spring.mail.port=465
spring.mail.username=gitlink@barats.cn
spring.mail.password=
spring.mail.properties[mail.smtp.auth]=true
spring.mail.properties[mail.smtp.connectiontimeout]=10000
spring.mail.properties[mail.smtp.timeout]=10000
spring.mail.properties[mail.smtp.writetimeout]=10000
spring.mail.properties[mail.smtp.starttls.enable]=true