乡镇美丽乡村建设网站信息,网站备案的流程,工作室 网站 备案,万州房地产网站建设问题背景#xff1a;cep模板 对数据源设置分组条件后#xff0c;告警的数据#xff0c;和分组条件对不上#xff0c; 掺杂了#xff0c;其他的不同组的数据#xff0c;产生了告警 策略条件#xff1a;
选择了两个kafka的的topic的数据作为数据源#xff0c;
对A 数据…问题背景cep模板 对数据源设置分组条件后告警的数据和分组条件对不上 掺杂了其他的不同组的数据产生了告警 策略条件
选择了两个kafka的的topic的数据作为数据源
对A 数据源 test-topic1 进行条件过滤 过滤条件为login_type 1
对B 数据源 test-topic2进行条件过滤过滤条件为login_type 2
分组条件 为 src_iphostname两个字段进行分组
进行followby 关联。时间关联的最大时间间隔为 60秒
运行并行度设置为3 通过SourceStream打印的原始数据
2 {src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666859021060,create_time_desc:2022-10-27 16:23:41,event_type_value:single,id:67d32010-1f66-4850-b110-a7087e419c64_0,login_type:1}
2 {src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666859020192,create_time_desc:2022-10-27 16:23:40,event_type_value:single,id:67d32010-1f66-4850-b110-a7087e419c64_0,login_type:1}
1 {src_ip:172.11.11.1,hostname:hostname2,as:B,create_time:1666859021231,create_time_desc:2022-10-27 16:23:41,event_type_value:single,id:67d32010-1f66-4850-b110-a7087e419c64_0,login_type :2}
经过cep处理后产了告警
产生告警:{A[{src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666859021060,create_time_desc:2022-10-27 16:23:41,event_type_value:single,id:67d32010-1f66-4850-b110-a7087e419c64_0,login_type:1}, {src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666859020192,create_time_desc:2022-10-27 16:23:40,event_type_value:single,id:67d32010-1f66-4850-b110-a7087e419c64_0,login_type:1}], B[{src_ip:172.11.11.1,hostname:hostname2,as:B,create_time:1666859021231,create_time_desc:2022-10-27 16:23:41,event_type_value:single,id:67d32010-1f66-4850-b110-a7087e419c64_0,login_type:2}]}经过src_ip和hostname分组后 理论上应该只分组后的相同的 scr_iphostname进行事件关联告警
结果其他的分组数据也参和进来关联告警了。 期望的是 login_type 1 出现至少两次 接着login_type2的至少出现1次且相同的src_ip和hostname
然后结果是下面数据也产生了告警。
{src_ip:172.11.11.1,hostname:hostname1,login_type:1} {src_ip:172.11.11.1,hostname:hostname1,login_type:1} {src_ip:172.11.11.1,hostname:hostname1,login_type:2}
怀疑是分组没生效。 然后debug数据源那块的方法kafkaStreamSource 里面有进行分组debug后发现确实也进行了keyby
后来找不到其他问题纠结了下 怀疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一个KeyedSteam了。 所以
出现问题的原始代码数据源代码
//程序具体执行流程DataStreamJSONObject sourceStream SourceProcess.getKafkaStream(env, rule);DataStreamJSONObject resultStream TransformProcess.process(sourceStream, rule);SinkProcess.sink(resultStream, rule);public static DataStreamJSONObject getKafkaStream(StreamExecutionEnvironment env, Rule rule) {DataStreamJSONObject inputStream null;ListEvent events rule.getEvents();if (events.size() SharingConstant.NUMBER_ZERO) {for (Event event : events) {FlinkKafkaConsumerJSONObject kafkaConsumer new KafkaSourceFunction(rule, event).init();if (inputStream ! null) {// 多条 stream 合成一条 streaminputStream inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));} else {// 只有一条 streaminputStream kafkaStreamSource(env, event, rule, kafkaConsumer);}}}return inputStream;}private static DataStreamJSONObject kafkaStreamSource(StreamExecutionEnvironment env,Event event,Rule rule,FlinkKafkaConsumerJSONObject kafkaConsumer) {DataStreamJSONObject inputStream env.addSource(kafkaConsumer);// 对多个黑白名单查询进行循环String conditions event.getConditions();while (conditions.contains(SharingConstant.ARGS_NAME)) {// 使用新的redis 数据结构进行 s.include 过滤inputStream AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);conditions conditions.replace(s, );}// 一般过滤处理inputStream AsyncDataStream.orderedWait(inputStream,new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);// kafka source 进行 keyBy 处理return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}public static DataStreamJSONObject keyedBy(DataStreamJSONObject input, MapString, String groupBy) {if (null groupBy || groupBy.isEmpty() ||.equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){return input;}return input.keyBy(new TwoEventKeySelector(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));}public class TwoEventKeySelector implements KeySelectorJSONObject, String {private static final long serialVersionUID 8534968406068735616L;private final String groupBy;public TwoEventKeySelector(String groupBy) {this.groupBy groupBy;}Overridepublic String getKey(JSONObject event) {StringBuilder keys new StringBuilder();for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {keys.append(event.getString(key));}return keys.toString();}
} 问题出现在这里
// 多条 stream 合成一条 stream inputStream inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer)); kafkaStreamSource()这个方法返回的是 KeyedStream ,
两个KeyedStream unio合并后 本来以为返回时KeyedStream结果确是DataStream类型 结果导致cep分组不生效一个告警中出现了其他分组的数据。 解决方法 就是在cep pattern前 根据是否有分组条件再KeyedBy一次 private static DataStreamJSONObject patternProcess(DataStreamJSONObject inputStream, Rule rule) {PatternGen patternGenerator new PatternGen(rule.getPatterns(), rule.getWindow().getSize());PatternJSONObject, JSONObject pattern patternGenerator.getPattern();if (!rule.getGroupBy().isEmpty()){inputStream KeyedByStream.keyedBy(inputStream, rule.getGroupBy());}PatternStreamJSONObject patternStream CEP.pattern(inputStream, pattern);return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects())); 输入数据 {src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666860300012,create_time_desc:2022-10-27 16:45:00,event_type_value:single,id:1288a709-d2b3-41c9-b7b7-e45149084514_0,login_type:1}{src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666860299272,create_time_desc:2022-10-27 16:44:59,event_type_value:single,id:1288a709-d2b3-41c9-b7b7-e45149084514_0,login_type:1}{src_ip:172.11.11.1,hostname:hostname2,as:B,create_time:1666860300196,create_time_desc:2022-10-27 16:45:00,event_type_value:single,id:1288a709-d2b3-41c9-b7b7-e45149084514_0,login_type:2}
不产生告警符合预期 再次输入同分组的数据
2 {src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666860369307,create_time_desc:2022-10-27 16:46:09,event_type_value:single,id:61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,app_id:1}
2 {src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666860368471,create_time_desc:2022-10-27 16:46:08,event_type_value:single,id:61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,app_id:1}
2 {src_ip:172.11.11.1,hostname:hostname1,as:B,create_time:1666860369478,create_time_desc:2022-10-27 16:46:09,event_type_value:single,id:61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,app_id:2}
产生告警:{A[{src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666860368471,create_time_desc:2022-10-27 16:46:08,event_type_value:single,id:61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,app_id:1}, {src_ip:172.11.11.1,hostname:hostname1,as:A,create_time:1666860369307,create_time_desc:2022-10-27 16:46:09,event_type_value:single,id:61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,app_id:1}], B[{src_ip:172.11.11.1,hostname:hostname1,as:B,create_time:1666860369478,create_time_desc:2022-10-27 16:46:09,event_type_value:single,id:61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,app_id:2}]}
告警输出:{org_log_id:61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,event_category_id:1,event_technique_type:无,event_description:1,alarm_first_time:1666860368471,src_ip:172.11.11.1,hostname:hostname1,intelligence_id:,strategy_category_id:422596451785379862,intelligence_type:,id:cc1cd8cd-a626-4916-bdd3-539ea57e898f,event_nums:3,event_category_label:资源开发,severity:info,create_time:1666860369647,strategy_category_name:网络威胁分析,rule_name:ceptest,risk_score:1,data_center:guo-sen,baseline:[],sop_id:,event_device_type:无,rule_id:214,policy_type:pattern,strategy_category:/NetThreatAnalysis,internal_event:1,event_name:ceptest,event_model_source:/RuleEngine/OnLine,alarm_last_time:1666860369478}
产生告警符合预期