订单支付实时监控
This commit is contained in:
parent
cf59892269
commit
7a893b6e5b
|
@ -0,0 +1,2 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="JAVA_MODULE" version="4" />
|
|
@ -0,0 +1,21 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>retailer-ueba</artifactId>
|
||||
<groupId>com.blueegg</groupId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>OrderPayDetect</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,78 @@
|
|||
34729,create,,1558430842
|
||||
34730,create,,1558430843
|
||||
34729,pay,sd76f87d6,1558430844
|
||||
34730,pay,3hu3k2432,1558430845
|
||||
34731,create,,1558430846
|
||||
34731,pay,35jue34we,1558430849
|
||||
34732,create,,1558430852
|
||||
34733,create,,1558430855
|
||||
34734,create,,1558430859
|
||||
34732,pay,32h3h4b4t,1558430861
|
||||
34735,create,,1558430862
|
||||
34734,pay,435kjb45d,1558430863
|
||||
34733,pay,766lk5nk4,1558430864
|
||||
34736,create,,1558430866
|
||||
34737,create,,1558430868
|
||||
34735,pay,5k432k4n,1558430869
|
||||
34738,create,,1558430871
|
||||
34739,create,,1558430874
|
||||
34736,pay,435kjb45s,1558430875
|
||||
34740,create,,1558430877
|
||||
34741,create,,1558430882
|
||||
34742,create,,1558430884
|
||||
34743,create,,1558430885
|
||||
34744,create,,1558430886
|
||||
34745,create,,1558430889
|
||||
34746,create,,1558430892
|
||||
34747,create,,1558430893
|
||||
34748,create,,1558430895
|
||||
34746,pay,3243hr9h9,1558430895
|
||||
34738,pay,43jhin3k4,1558430896
|
||||
34745,pay,8xz09ddsaf,1558430896
|
||||
34741,pay,88df0wn92,1558430896
|
||||
34749,create,,1558430899
|
||||
34747,pay,329d09f9f,1558430893
|
||||
34743,pay,3hefw8jf,1558430900
|
||||
34750,create,,1558430901
|
||||
34737,pay,324jnd45s,1558430902
|
||||
34751,create,,1558430902
|
||||
34744,pay,499dfano2,1558430903
|
||||
34752,create,,1558430905
|
||||
34742,pay,435kjb4432,1558430906
|
||||
34753,create,,1558430906
|
||||
34739,pay,98x0f8asd,1558430907
|
||||
34754,create,,1558430908
|
||||
34755,create,,1558430911
|
||||
34740,pay,392094j32,1558430913
|
||||
34756,create,,1558430913
|
||||
34753,pay,8c6vs8dd,1558430913
|
||||
34757,create,,1558430915
|
||||
34749,pay,324n0239,1558430916
|
||||
34755,pay,8x0zvy8w3,1558430918
|
||||
34758,create,,1558430921
|
||||
34759,create,,1558430922
|
||||
34752,pay,rnp435rk,1558430925
|
||||
34760,create,,1558430926
|
||||
34761,create,,1558430927
|
||||
34762,create,,1558430933
|
||||
34748,pay,809saf0ff,1558430934
|
||||
34763,create,,1558430936
|
||||
34764,create,,1558430938
|
||||
34765,create,,1558430940
|
||||
34751,pay,24309dsf,1558430941
|
||||
34750,pay,sad90df3,1558430941
|
||||
34761,pay,902dsqw45,1558430943
|
||||
34766,create,,1558430944
|
||||
34767,create,,1558430949
|
||||
34768,pay,88snrn932,1558430950
|
||||
34759,pay,9203kmfn,1558430950
|
||||
34754,pay,3245nbo7,1558430950
|
||||
34758,pay,32499fd9w,1558430950
|
||||
34760,pay,390mf2398,1558430960
|
||||
34757,pay,d8938034,1558430962
|
||||
34762,pay,84309dw31r,1558430983
|
||||
34763,pay,sddf9809ew,1558431068
|
||||
34764,pay,832jksmd9,1558431079
|
||||
34765,pay,m23sare32e,1558431082
|
||||
34766,pay,92nr903msa,1558431095
|
||||
34756,pay,9032n4fd2,1558431951
|
|
|
@ -0,0 +1,40 @@
|
|||
ewr342as4,wechat,1558430845
|
||||
sd76f87d6,wechat,1558430847
|
||||
3hu3k2432,alipay,1558430848
|
||||
8fdsfae83,alipay,1558430850
|
||||
32h3h4b4t,wechat,1558430852
|
||||
766lk5nk4,wechat,1558430855
|
||||
435kjb45d,alipay,1558430859
|
||||
5k432k4n,wechat,1558430862
|
||||
435kjb45s,wechat,1558430866
|
||||
324jnd45s,wechat,1558430868
|
||||
43jhin3k4,wechat,1558430871
|
||||
98x0f8asd,alipay,1558430874
|
||||
392094j32,wechat,1558430877
|
||||
88df0wn92,alipay,1558430882
|
||||
435kjb4432,alipay,1558430884
|
||||
3hefw8jf,alipay,1558430885
|
||||
499dfano2,wechat,1558430886
|
||||
8xz09ddsaf,wechat,1558430889
|
||||
3243hr9h9,wechat,1558430892
|
||||
329d09f9f,alipay,1558430893
|
||||
809saf0ff,wechat,1558430895
|
||||
324n0239,wechat,1558430899
|
||||
sad90df3,alipay,1558430901
|
||||
24309dsf,alipay,1558430902
|
||||
rnp435rk,wechat,1558430905
|
||||
8c6vs8dd,wechat,1558430906
|
||||
3245nbo7,alipay,1558430908
|
||||
8x0zvy8w3,alipay,1558430911
|
||||
9032n4fd2,wechat,1558430913
|
||||
d8938034,wechat,1558430915
|
||||
32499fd9w,alipay,1558430921
|
||||
9203kmfn,alipay,1558430922
|
||||
390mf2398,alipay,1558430926
|
||||
902dsqw45,wechat,1558430927
|
||||
84309dw31r,alipay,1558430933
|
||||
sddf9809ew,alipay,1558430936
|
||||
832jksmd9,wechat,1558430938
|
||||
m23sare32e,wechat,1558430940
|
||||
92nr903msa,wechat,1558430944
|
||||
sdafen9932,alipay,1558430949
|
|
|
@ -0,0 +1,4 @@
|
|||
package com.blueegg.orderpay_detect
|
||||
|
||||
// 定义输入输出样例类类型
|
||||
case class OrderEvent(orderId: Long, eventType: String, txId: String, timestamp: Long)
|
|
@ -0,0 +1,12 @@
|
|||
package com.blueegg.orderpay_detect
|
||||
|
||||
import java.util
|
||||
|
||||
import org.apache.flink.cep.PatternSelectFunction
|
||||
|
||||
class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{
|
||||
override def select(pattern: util.Map[String, util.List[OrderEvent]]): OrderResult = {
|
||||
val payedOrderId = pattern.get("pay").iterator().next().orderId
|
||||
OrderResult(payedOrderId, "payed successfully")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
package com.blueegg.orderpay_detect
|
||||
|
||||
case class OrderResult(orderId: Long, resultMsg: String)
|
|
@ -0,0 +1,46 @@
|
|||
package com.blueegg.orderpay_detect
|
||||
|
||||
import org.apache.flink.cep.scala.CEP
|
||||
import org.apache.flink.cep.scala.pattern.Pattern
|
||||
import org.apache.flink.streaming.api.TimeCharacteristic
|
||||
import org.apache.flink.streaming.api.scala._
|
||||
import org.apache.flink.streaming.api.windowing.time.Time
|
||||
|
||||
object OrderTimeOut {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val env = StreamExecutionEnvironment.getExecutionEnvironment
|
||||
env.setParallelism(1)
|
||||
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
|
||||
|
||||
// 0. 从文件中读取数据
|
||||
val resource = getClass.getResource("/OrderLog.csv")
|
||||
val orderEventStream = env.readTextFile(resource.getPath)
|
||||
.map(data => {
|
||||
val arr = data.split(",")
|
||||
OrderEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
|
||||
})
|
||||
.assignAscendingTimestamps(_.timestamp * 1000)
|
||||
.keyBy(_.orderId)
|
||||
|
||||
// 1. 定义一个pattern
|
||||
val orderPayPattern = Pattern
|
||||
.begin[OrderEvent]("create").where(_.eventType == "create")
|
||||
.followedBy("pay").where(_.eventType == "pay")
|
||||
.within(Time.minutes(15))
|
||||
|
||||
// 2. 将pattern应用到数据流上,进行模式检测
|
||||
val patternStream = CEP.pattern(orderEventStream, orderPayPattern)
|
||||
|
||||
// 3. 定义侧输出流标签,用于处理超时事件
|
||||
val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")
|
||||
|
||||
// 4. 调用select方法,提取并处理匹配的成功支付事件以及超时事件
|
||||
val resultStream = patternStream.select(orderTimeoutOutputTag,
|
||||
new OrderTimeoutSelect(),
|
||||
new OrderPaySelect())
|
||||
|
||||
resultStream.print("payed")
|
||||
resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")
|
||||
env.execute("order timeout job")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package com.blueegg.orderpay_detect
|
||||
|
||||
import java.util
|
||||
|
||||
import org.apache.flink.cep.PatternTimeoutFunction
|
||||
|
||||
// 实现自定义的PatternTimeoutFunction以及PatternSelectFunction
|
||||
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{
|
||||
override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimestamp: Long): OrderResult = {
|
||||
val timeoutOrderId = pattern.get("create").iterator().next().orderId
|
||||
OrderResult(timeoutOrderId, "timeout " + ": +" + timeoutTimestamp)
|
||||
}
|
||||
}
|
|
@ -88,4 +88,9 @@ cep实现
|
|||
5秒内有3次失败告警
|
||||
|
||||
5、LoginFailWithCep3
|
||||
优化后cep实现
|
||||
优化后cep实现
|
||||
|
||||
## 订单支付实时监控
|
||||
|
||||
1、OrderTimeOut
|
||||
cep实现订单成功支付的和15分钟内未支付超时的
|
Loading…
Reference in New Issue