p121 排除刷单用户

This commit is contained in:
yudan.chen 2021-01-23 10:00:38 +08:00
parent 724f66f258
commit 3a8bb2ce5c
5 changed files with 107 additions and 2 deletions

View File

@ -0,0 +1,40 @@
package com.blueegg.market_analysis.ad
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object AdClickAnalysisFilter {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 从文件中读取数据
val resource = getClass.getResource("/AdClickLog.csv")
val inputStream = env.readTextFile(resource.getPath)
// 转换成样例类并提取时间戳和watermark
val adLogStream = inputStream
.map(data => {
val arr = data.split(",")
AdClickLog(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
// 插入一步过滤操作并将有刷单行为的用户输出到侧输出流黑名单报警
val filterBlackListUserStream: DataStream[AdClickLog] = adLogStream
.keyBy(data => (data.userId, data.adId))
.process(new FilterBlackListUserResult(100))
// 开窗聚合统计
val adCountResultStream = filterBlackListUserStream
.keyBy(_.province)
.timeWindow(Time.hours(1), Time.seconds(5))
.aggregate(new AdCountAgg(), new AdCountWindowResult())
// adCountResultStream.print("count result")
filterBlackListUserStream.getSideOutput(new OutputTag[BlackListUserWarning]("warning")).print("warning")
env.execute("ad count statistics job")
}
}

View File

@ -0,0 +1,4 @@
package com.blueegg.market_analysis.ad
// 侧输出流黑名单报警信息样例类
case class BlackListUserWarning(userId: Long, adId: Long, msg: String)

View File

@ -0,0 +1,47 @@
package com.blueegg.market_analysis.ad
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// 自定义KeyedProcessFunction
class FilterBlackListUserResult(maxCount: Long) extends KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]{
// 定义状态保存用户对广告的点击量每天0点定时清空状态的时间戳标记当前用户是否已经进入黑名单
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))
lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))
lazy val isBlackState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black", classOf[Boolean]))
override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {
val curCount = countState.value()
// 判断只要是第一个数据来了直接注册0点的清空状态定时器
if (curCount == 0) {
// ctx.timerService().currentProcessingTime()/(1000 * 60 * 60 * 24) //1970至今的一个整天数再加1表示明天减8表示东8区
val ts = (ctx.timerService().currentProcessingTime()/(1000 * 60 * 60 * 24) + 1) * (24 * 60 * 60 * 1000) - 8 * 60 * 60 * 1000// 明天第一个时间点开始的时间戳
resetTimerTsState.update(ts)
ctx.timerService().registerProcessingTimeTimer(ts)
}
// 判断count值是否已经达到定义的阈值如果超过就输出到黑名单
if (curCount >= maxCount) {
// 判断是否已经在黑名单里没有的话才输出侧输出流
if (!isBlackState.value()){
isBlackState.update(true)
ctx.output((new OutputTag[BlackListUserWarning]("warning")), BlackListUserWarning(value.userId, value.adId, "Click ad over" + maxCount + "times today."))
}
return
}
// 正常情况count加1然后将数据原样输出
countState.update(curCount + 1)
out.collect(value)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
if (timestamp == resetTimerTsState.value()) {
isBlackState.clear()
countState.clear()
}
}
}

View File

@ -0,0 +1,9 @@
```text
Error:(31, 20) not enough arguments for constructor OutputTag: (implicit evidence$1: org.apache.flink.api.common.typeinfo.TypeInformation[com.blueegg.market_analysis.ad.BlackListUserWarning])org.apache.flink.streaming.api.scala.OutputTag[com.blueegg.market_analysis.ad.BlackListUserWarning].
Unspecified value parameter evidence$1.
ctx.output(new OutputTag[BlackListUserWarning]("warning"), BlackListUserWarning(value.userId, value.adId, "Click ad over" + maxCount + "times today."))
```
还是隐式转换的问题原import org.apache.flink.streaming.api.scala.OutputTag 改 import org.apache.flink.streaming.api.scala._

View File

@ -65,5 +65,10 @@ https://www.bilibili.com/video/BV1Qp4y1Y7YN?p=101
1、AppMarketByChannel
分渠道市场统计需要按两个字段分组的场景process代替aggregate的实现
2、AdClickAnalysis
统计一天页面广告点击量和PageView是一样的
## 页面广告分析
1、AdClickAnalysis
统计一天页面广告点击量和PageView是一样的
2、AdClickAnalysisFilter
把刷单用户排除1小时内下单超过100次报警