cep优化

This commit is contained in:
yudan.chen 2021-01-25 17:14:18 +08:00
parent e6a9747e3d
commit cf59892269
4 changed files with 74 additions and 2 deletions

View File

@ -0,0 +1,19 @@
package com.blueegg.loginfail_detect.cep
import java.util
import com.blueegg.loginfail_detect.{LoginEvent, LoginFailWarning}
import org.apache.flink.cep.PatternSelectFunction
// 实现自定义PatternSelectFunction
class LoginFailEventMatch3() extends PatternSelectFunction[LoginEvent, LoginFailWarning]{
// 这个map的key是前面pattern的名字
override def select(pattern: util.Map[String, util.List[LoginEvent]]): LoginFailWarning = {
// 当前匹配到的事件序列就保存在Map里
val iter = pattern.get("fail").iterator()
val firstFailEvent = iter.next()
val secondFailEvent = iter.next()
val thirdFailEvent = iter.next()
LoginFailWarning(firstFailEvent.userId, firstFailEvent.timestamp, thirdFailEvent.timestamp, "login fail")
}
}

View File

@ -38,7 +38,7 @@ object LoginFailWithCep2 {
val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
// 3.检出符合模式的数据流需要调用select
val loginFailWarningStream = patternStream.select(new LoginFailEventMatch())
val loginFailWarningStream = patternStream.select(new LoginFailEventMatch2())
loginFailWarningStream.print()
env.execute("login fail with cep job")

View File

@ -0,0 +1,44 @@
package com.blueegg.loginfail_detect.cep
import com.blueegg.loginfail_detect.LoginEvent
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
// 5秒之内有3次失败报警
object LoginFailWithCep3 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val resource = getClass.getResource("/LoginLog.csv")
val inputStream = env.readTextFile(resource.getPath)
val loginEventStream = inputStream
.map(data => {
val arr = data.split(",")
LoginEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
override def extractTimestamp(element: LoginEvent): Long = element.timestamp * 1000
})
// 1. 定义匹配的模式要求是一个登陆失败事件后紧跟另一个登陆失败事件
val loginFailPattern = Pattern
.begin[LoginEvent]("fail").where(_.eventType == "fail").times(3).consecutive()
.within(Time.seconds(5))
// 2.将模式应用到数据流上得到一个PatternStream
val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
// 3.检出符合模式的数据流需要调用select
val loginFailWarningStream = patternStream.select(new LoginFailEventMatch3())
loginFailWarningStream.print()
env.execute("login fail with cep job")
}
}

View File

@ -79,4 +79,13 @@ https://www.bilibili.com/video/BV1Qp4y1Y7YN?p=101
2秒内连续两次登陆失败告警
2、LoginFailAdvance
时效性做了改进1秒出现两次失败了就报警
时效性做了改进1秒出现两次失败了就报警
3、LoginFailWithCep
cep实现
4、LoginFailWithCep2
5秒内有3次失败告警
5、LoginFailWithCep3
优化后cep实现