当前位置: 首页 > news >正文

网站底部备案代码做简历的软件免费

网站底部备案代码,做简历的软件免费,php企业中英文网站源码,深圳网站优化咨询文章目录 day09_实时类标签/指标一、日志数据实时采集2、Flume简介2.3 项目日志数据采集Flume配置2.3.1 涉及的Flume组件和参数2.3.2 Nginx日志采集2.3.3 用户行为日志采集 二、Nginx日志数据统计1、日志格式说明2、数据ETL2.1 日志抽取2.1.1 正则表达式2.1.2 基于Spark实现Ngi… 文章目录 day09_实时类标签/指标一、日志数据实时采集2、Flume简介2.3 项目日志数据采集Flume配置2.3.1 涉及的Flume组件和参数2.3.2 Nginx日志采集2.3.3 用户行为日志采集 二、Nginx日志数据统计1、日志格式说明2、数据ETL2.1 日志抽取2.1.1 正则表达式2.1.2 基于Spark实现Nginx数据匹配 2.2 字段解析2.2.1 日期格式转换2.2.2 IP解析地理位置了解2.2.3 UA解析 2.3 完整代码2.4 使用Hive读取HDFS数据 3、指标统计1、尝试进行用户行为日志的数据ETL、指标统计 day09_实时类标签/指标 一、日志数据实时采集 2、Flume简介 2.3 项目日志数据采集Flume配置 zookeeper、Kafka的启动命令 启动zookeeper没有启动的才需要执行 /export/server/zookeeper/bin/zkServer.sh start启动Kafka cd /export/server/kafka/bin nohup ./kafka-server-start.sh ../config/server.sql 21 Kafka其他的相关命令 cd /export/server/kafka/bin 查看当前集群有哪些Topic ./kafka-topics.sh --list --bootstrap-server up01:9092 新建Topic分区数没要求副本数broker节点个数 ./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log 参看Topic的详细信息 ./kafka-topics.sh --describe --bootstrap-server up01:9092 --topic xtzg_nginx_log注意: 要提前创建好Kafka的Topic2.3.1 涉及的Flume组件和参数 source type: 类型固定值TAILDIR。能同时监控一个目录或者多个文件也能动态监控每个文件的变化还支持断点续传不会出现重复消费问题。 fiilegroups: 以空格分隔的文件组列表。每个文件组表示一组要跟踪的文件。 filegroups.filegroupName: 文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名。 positionFile: JSON格式的文件记录每个文件的inode、绝对路径和最后位置。注意: type的TAILDIR大小写不能随便写channel type: 类型固定值 org.apache.flume.channel.kafka.KafkaChannel kafka.bootstrap.servers: Kafka集群中的broker列表。格式hostname:port多个用逗号隔开。 kafka.topic: channel要用的topic parseAsFlumeEvent: 是否需要对采集到的数据解析为Event对象然后在内容前面增加topic前缀会导致后续的内容会有部分缺失的情况。一般是false补充 如果采集到的数据最终想要输出到Kafka中可以直接选择使用Kafka Channel。 注意: Kafka Channel和Kafka Sink虽然都是将数据输出到Kafka中但是两者的配置参数有区别2.3.2 Nginx日志采集 创建nginx_to_kafka.conf文件 nginx_to_kafka.conf配置文件内容如下 #定义组件 a1.sources r1 a1.channels c1#配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /export/data/workspace/user_profile/log_generate/datacollection/source_data/access-nginx.* a1.sources.r1.positionFile /export/data/flume/nginx_position.json#配置channel a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers up01:9092 a1.channels.c1.kafka.topic xtzg_nginx_log a1.channels.c1.parseAsFlumeEvent false#组装 a1.sources.r1.channels c1注意: 1- a1.sources.r1.filegroups.f1该参数值要改成你自己的路径2- 文件的模糊匹配的正则表达式中写的是.*表示匹配任意内容将上面的配置文件复制到/export/server/flume/conf cp /export/data/workspace/user_profile/scripts/flume/nginx_to_kafka.conf /export/server/flume/conf在Kafka上创建topic前提开启zk,kafka cd /export/server/kafka/bin./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log启动Flume cd /export/server/flumebin/flume-ng agent -n a1 -c conf/ -f conf/nginx_to_kafka.conf查看Kafka中的数据 cd /export/server/kafka/bin./kafka-console-consumer.sh --bootstrap-server up01:9092 --topic xtzg_nginx_log启动 运行python中的NginxLogSimulationData.py。查看kafka中数据变化如果看到新增数据则配置成功。确认无误后关停Flume采集任务。 2.3.3 用户行为日志采集 创建user_event_to_kafka.conf文件 user_event_to_kafka.conf配置文件内容如下 #定义组件 a1.sources r1 a1.channels c1#配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /export/data/workspace/user_profile/log_generate/datacollection/source_data/user-event.* a1.sources.r1.positionFile /export/data/flume/user_event_position.json#配置channel a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers up01:9092 a1.channels.c1.kafka.topic xtzg_user_event a1.channels.c1.parseAsFlumeEvent false#组装 a1.sources.r1.channels c1在Kafka上创建topic前提开启zk,kafka cd /export/server/kafkabin/kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_user_event --partitions 1 --replication-factor 1启动Flume cd /export/server/flumebin/flume-ng agent -n a1 -c conf/ -f conf/user_event_to_kafka.conf查看Kafka中的数据 cd /export/server/kafkabin/kafka-console-consumer.sh --bootstrap-server up01:9092 --from-beginning --topic xtzg_user_event启动 运行python中的EventSimulationJsonData.py。查看kafka中数据变化如果看到新增数据则配置成功。确认无误后关停Flume采集任务。 二、Nginx日志数据统计 1、日志格式说明 ​ Nginx发音 恩几可使是异步框架的网页服务器也可以用作反向代理、负载平衡器和HTTP缓存。该软件由俄罗斯程序员伊戈尔·赛索耶夫Игорь Сысоев开发并于2004年首次公开发布 Nginx日志包含access_log和error_log两种类型日志数据。项目中分析的数据为access_logNginx开源官网https://nginx.org/项目采集Nginx数据格式。以下为一条Nginx日志: 116.85.48.25 - - [12/Nov/2024:11:36:46 0800] GET /login.html HTTP/1.1 404 729 https://xtx.itcast.cn/referAFriend.html Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.18(0x17001233) NetType/WIFI Language/zh_CN -Nginx日志格式说明:116.85.48.25: 用户访问IP地址- - : 用户标识(cookie信息)[14/Jul/2022:17:40:41 0800]: 访问时间 时区GET : 请求方式/css/40.30d6d2b.css: 请求资源HTTP/1.1 : 请求的协议500 : 请求的状态码 (500 服务器错误, 200 成功 302 重定向 404 访问到未知资源)951 : 响应返回的字节大小https://www.htv.com/official/component?WT.mc_id3 : 来源的URL(从那个地方跳转到此页面)Mozilla/5......: 浏览器标识2、数据ETL 2.1 日志抽取 2.1.1 正则表达式 Java版本: (?ip\d\.\d\.\d\.\d) (- - \[)(?datetime[\s\S])(?t1\][\s])(?request[A-Z]) (?url[\S]*) (?protocol[\S])[] (?code\d) (?sendbytes\d) [](?refferer[\S]*) [](?useragent[\S\s])[] [](?proxyaddr[\S\s])[]Python版本: (?Pip.*?) - - \[(?Ptime.*?)\] (?Prequest.*?) (?Pstatus.*?) (?Pbytes.*?) (?Preferer.*?) (?Pua.*?) (?Pproxy_address.*)2.1.2 基于Spark实现Nginx数据匹配 代码实现: from pyspark.sql import SparkSession import os import pyspark.sql.functions as F from pyspark.sql.types import StringTypeos.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(nginx_etl)\.master(local[*])\.config(spark.sql.shuffle.partitions,2)\.getOrCreate()# 2- 数据输入读取Kafka中的数据startingOffsets,earliest该配置在实际工作中一般不需要配置。这里是为了开发代码方便init_df spark.readStream.format(kafka)\.option(kafka.bootstrap.servers,192.168.88.166:9092)\.option(subscribe,xtzg_nginx_log)\.option(startingOffsets,earliest)\.load()# 结构化流中不能以show()方式打印数据数据内容# init_df.show()# 3- 数据ETL处理# 3.1- value字段解码的操作cast(StringType())将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)下面两种方式都可以推荐使用第一种因为性能更好# type_cast_df init_df.select(init_df.value.cast(StringType()).alias(value))type_cast_df init_df.selectExpr(cast(value as string) as value)# 3.2- 通过正则表达式提取Nginx的字段pattern (?ip\d\.\d\.\d\.\d) (- - \[)(?datetime[\s\S])(?t1\][\s])(?request[A-Z]) (?url[\S]*) (?protocol[\S])[] (?code\d) (?sendbytes\d) [](?refferer[\S]*) [](?useragent[\S\s])[] [](?proxyaddr[\S\s])[]regexp_df type_cast_df.select(F.regexp_extract(value,pattern,1).alias(ip),F.regexp_extract(value,pattern,3).alias(datetime),F.regexp_extract(value,pattern,4).alias(t1),F.regexp_extract(value,pattern,5).alias(request),F.regexp_extract(value,pattern,6).alias(url),F.regexp_extract(value,pattern,7).alias(protocol),F.regexp_extract(value,pattern,8).alias(code),F.regexp_extract(value,pattern,9).alias(sendbytes),F.regexp_extract(value,pattern,10).alias(refferer),F.regexp_extract(value,pattern,11).alias(useragent),F.regexp_extract(value,pattern,12).alias(proxyaddr))# 4- 数据输出启动流式任务regexp_df.writeStream.format(console).outputMode(append).start().awaitTermination()运行结果截图 可能遇到的错误 原因: regexp_extract函数只能传递Java版的正则表达式不能用Python的2.2 字段解析 需求根据nginx日志ip标识唯一的用户需要ip分组统计得到用户访问的pv、uv、区域、状态码、终端设备的操作系统、设备品牌、浏览器、访问时间(年-月-日 时:分:秒) 2.2.1 日期格式转换 Python的datetime函数库 相关函数: strftime(): 把日期对象转成指定的时间格式的字符串strptime(): 把指定格式的日期字符串转换为日期对象 参考文档: https://docs.python.org/zh-cn/3/library/datetime.html#strftime-strptime-behavior解析格式: %d/%b/%Y:%H:%M:%S %z %Y-%m-%d %H:%M:%S 28/Jul/2022:16:22:07 0800 日期对象 2022-07-28 16:22:07 测试代码 Python方式 from datetime import datetimeif __name__ __main__:date_str 11/Feb/2025:14:34:49 0800print(datetime.strptime(date_str, %d/%b/%Y:%H:%M:%S %z).strftime(%Y-%m-%d %H:%M:%S))SparkSQL方式重点掌握 regexp_df.withColumn(datetime,F.from_unixtime(F.unix_timestamp(datetime,dd/MMM/yyyy:HH:mm:ss Z),yyyy-MM-dd HH:mm:ss))2.2.2 IP解析地理位置了解 根据IP解析地理位置 方式一: 使用ip解析地理位置API ip地址http://opendata.baidu.com/api.php?query117.136.12.79coresource_id6006oeutf8像百度地图开发平台 / 高德地图开放平台 … 都会提供IP解析的服务接口百度地图https://lbs.baidu.com/faq/api?titlewebapi/ip-api-base高德地图https://lbs.amap.com/api/webservice/guide/api/ipconfig其他平台https://www.nowapi.com/ 方式二: 了解使用geo_ip依赖包和GeoLite2-City.mmdb库 依赖包geoip2~4.5.0下载地址https://gitcode.com/crownp/geolite2_demo/blob/master/src/main/resources/GeoLite2-City.mmdb IP在线解析测试代码 Python的Requests库的介绍https://requests.readthedocs.io/en/latest/ #!/usr/bin/env python # desc : __coding__ utf-8 __author__ bytedanceimport requestsdef parse_ip(ip_str):params {query: ip_str,co: ,resource_id: 6006,oe: utf8,}# 发送请求response requests.get(urlhttps://opendata.baidu.com/api.php, paramsparams)# 解析响应内容result response.json()status result[status]if status 0:# 正常try:return result[data][0][location].split( )[0]except:return 未知区域else:return 未知区域if __name__ __main__:ip_str 127.0.0.1ip_str 10.254.1.97ip_str 157.148.69.76area parse_ip(ip_str)print(area)2.2.3 UA解析 UA说明 UA为useragent简称特指用户访问系统使用的客户端信息一般包含操作系统浏览器设备品牌信息等UA字符串信息http://useragentstring.com/使用需导入UA解析依赖包from user_agents import parseUA的作用 1.客户端识别通过User-Agent服务器能够识别客户端的类型和版本从而提供相应的内容和服务。比如在移动设备上展示适合屏幕大小的网页布局或在不同浏览器上提供兼容性优化。2.统计分析网站和应用开发者可以利用User-Agent来收集客户端的信息进行用户行为分析和统计。这有助于了解用户使用的设备和偏好以便进行产品和服务的改进。3.安全性User-Agent也可以用于安全验证和防止恶意行为。通过分析User-Agent服务器可以检测到异常或伪造的请求并采取相应的安全措施。 测试代码: from user_agents import parseif __name__ __main__:ua_str Mozilla/5.0 (iPhone; CPU iPhone OS 13_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0 MQQBrowser/11.0.7 Mobile/15E148 Safari/604.1 QBWebViewUA/2 QBWebViewType/1 WKType/1result parse(ua_str)# os操作系统信息print(os----------)print(result.os.family)print(result.os.version)print(result.os.version_string)# brower浏览器信息print(browser----------)print(result.browser.family)print(result.browser.version)print(result.browser.version_string)# device设备信息print(device----------)print(result.device.family)print(result.device.model)2.3 完整代码 需要将结果数据同时写入到Kafka和HDFS。清洗后的日志可以用于其他业务分析具有一定的价值。因为Kafka不能永久保存数据所以需要把数据存储到HDFS一份。 因为每天都有很多日志所以需要对日志进行分区。可以通过partitionBy()方法进行分区写入到HDFS。分区的字段需要进行计算。 另外为了减少小文件生成可以使用trigger来指定写入的时间间隔。 先创建Kafka的Topic cd /export/server/kafka/bin ./kafka-topics.sh --create --bootstrap-server up01:9092 --topic dwd_nginx_etl_result完整代码 from pyspark.sql import SparkSession import os import pyspark.sql.functions as F from pyspark.sql.types import StringType, MapType import requests from user_agents import parseos.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(nginx_etl)\.master(local[*])\.config(spark.sql.shuffle.partitions,2)\.getOrCreate()# 配置checkpointLocation路径推荐使用HDFS路径spark.conf.set(spark.sql.streaming.checkpointLocation, hdfs://192.168.88.166:8020/xtzg/chk)# 2- 数据输入读取Kafka中的数据startingOffsets,earliest该配置在实际工作中一般不需要配置。这里是为了开发代码方便init_df spark.readStream.format(kafka)\.option(kafka.bootstrap.servers,192.168.88.166:9092)\.option(subscribe,xtzg_nginx_log)\.option(startingOffsets,earliest)\.load()# 结构化流中不能以show()方式打印数据数据内容# init_df.show()# 3- 数据ETL处理# 3.1- value字段解码的操作cast(StringType())将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)下面两种方式都可以推荐使用第一种因为性能更好# type_cast_df init_df.select(init_df.value.cast(StringType()).alias(value))type_cast_df init_df.selectExpr(cast(value as string) as value)# 3.2- 通过正则表达式提取Nginx的字段pattern (?ip\d\.\d\.\d\.\d) (- - \[)(?datetime[\s\S])(?t1\][\s])(?request[A-Z]) (?url[\S]*) (?protocol[\S])[] (?code\d) (?sendbytes\d) [](?refferer[\S]*) [](?useragent[\S\s])[] [](?proxyaddr[\S\s])[]# 这里不允许使用Python正则表达式只能使用Java正则表达式# pattern (?Pip.*?) - - \[(?Ptime.*?)\] (?Prequest.*?) (?Pstatus.*?) (?Pbytes.*?) (?Preferer.*?) (?Pua.*?) (?Pproxy_address.*)regexp_df type_cast_df.select(F.regexp_extract(value,pattern,1).alias(ip),F.regexp_extract(value,pattern,3).alias(datetime),F.regexp_extract(value,pattern,4).alias(t1),F.regexp_extract(value,pattern,5).alias(request),F.regexp_extract(value,pattern,6).alias(url),F.regexp_extract(value,pattern,7).alias(protocol),F.regexp_extract(value,pattern,8).alias(code),F.regexp_extract(value,pattern,9).alias(sendbytes),F.regexp_extract(value,pattern,10).alias(refferer),F.regexp_extract(value,pattern,11).alias(useragent),F.regexp_extract(value,pattern,12).alias(proxyaddr))# 3.3- 日期时间格式转换datetime_df regexp_df.withColumn(datetime,F.from_unixtime(F.unix_timestamp(datetime,dd/MMM/yyyy:HH:mm:ss Z),yyyy-MM-dd HH:mm:ss))# 3.4- IP地理位置解析F.udf(returnTypeStringType())def parse_ip(ip_str):params {query: ip_str,co: ,resource_id: 6006,oe: utf8,}# 发送请求response requests.get(urlhttps://opendata.baidu.com/api.php, paramsparams)# 解析响应内容result response.json()status result[status]if status 0:# 正常try:return result[data][0][location].split( )[0]except:return 未知区域else:return 未知区域area_df datetime_df.withColumn(area,parse_ip(ip))# 3.5- UA解析为什么这里用户自定义函数推荐返回字典方便后续取值F.udf(returnTypeMapType(keyTypeStringType(),valueTypeStringType()))def parse_ua(ua_str):result parse(ua_str)os result.os.familybrowser result.browser.familydevice result.device.modelreturn {os:os,browser:browser,device:device}ua_df area_df.withColumn(os,parse_ua(useragent)[os])\.withColumn(browser, parse_ua(useragent)[browser])\.withColumn(device, parse_ua(useragent)[device])# 4- 数据输出启动流式任务# 4.1- 输出到HDFS# 新增一个分区字段dt_df ua_df.withColumn(dt,F.split(datetime, )[0])# partitionBy表示按照哪个字段进行分区dt_df.writeStream.format(orc).partitionBy(dt)\.option(path,hdfs://192.168.88.166:8020/xtzg/etl/dwd_nginx_etl_result)\.start()# 4.2- 输出到Kafka# 注意一般将数据内容转换为JSON格式输出到Kafka中为了后续使用方便# 注意输出到Kafka中的字段名称只能叫valuekafka_df ua_df.select(F.to_json(F.struct(ip,datetime,t1,request,url,protocol,code,sendbytes,refferer,useragent,proxyaddr,area,os,browser,device)).alias(value))kafka_df.writeStream.format(kafka)\.option(kafka.bootstrap.servers,192.168.88.166:9092)\.option(topic,dwd_nginx_etl_result)\.start()# 4.3- 输出到控制台为了测试# awaitTermination()只能加在最后一个start()的后面dt_df.writeStream.format(console).outputMode(append).start().awaitTermination()可能遇到的错误一 原因: 结构化流中将数据输出到文件系统中需要配置checkpointLocation可能遇到的错误二 原因: 输出到Kafka中的字段名称只能叫value2.4 使用Hive读取HDFS数据 创建表 CREATE external TABLE dwd.dwd_nginx_etl_result (ip string,datetime string,t1 string,request string,url string,protocol string,code string,sendbytes string,refferer string,useragent string,proxyaddr string,area string,os string,browser string,device string )COMMENT nginx日志PARTITIONED BY (dt string)STORED AS ORCLOCATION /xtzg/etl/dwd_nginx_etl_resultTBLsql (orc.compress SNAPPY) ;同步分区 MSCK REPAIR TABLE dwd.dwd_nginx_etl_result;3、指标统计 需求 统计实时请求总数(pv) 统计用户数(uv) 统计用户访问所在区域省(类似抖音的位置显示) 统计用户响应状态码 统计用户使用设备终端信息 统计用户操作系统信息 统计用户首次访问系统的时间 统计用户末次访问系统的时间ip 用户访问系统的唯一地址 pv访问系统的页面次数 uv访问系统的用户数 area访问系统用户来自的区域根据ip解析出地址位置 status_code访问系统用户请求http协议响应状态码 device_os设备终端从ua中提取手机或电脑的系统 device_brand设备品牌名称从ua中提取手机或电脑的品牌 browser_name访问系统用户使用的浏览器名称 first_access_time用户首次访问系统的时间 last_access_time用户首次访问系统的时间Doris建表语句 使用unique模型。 CREATE DATABASE IF NOT EXISTS log_analysis_db; CREATE TABLE IF NOT EXISTS log_analysis_db.nginx_log_result (ip varchar(15) comment ip地址,pv int comment pv数,uv int comment uv数,area varchar(50) comment 用户所在区域根据ip解析,status_code varchar(10) comment 请求响应状态码,device_os varchar(50) comment 设备系统从ua中提取手机或电脑使用的系统,device_brand varchar(50) comment 从ua中提取手机或电脑的品牌,browser_name varchar(50) comment 电脑和手机使用浏览器记录浏览器简称,first_access_time datetime comment nginx日志记录首次访问时间,last_access_time datetime comment nginx日志记录末次访问时间 ) UNIQUE KEY(ip) DISTRIBUTED BY HASH(ip) BUCKETS 10 sql(replication_num 1);完整代码 from pyspark.sql import SparkSession, DataFrame import os import pyspark.sql.functions as F from pyspark.sql.types import StringTypeos.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder \.appName(nginx_analysis) \.master(local[*]) \.config(spark.sql.shuffle.partitions, 2) \.getOrCreate()# 2- 数据输入读取Kafka中的数据init_df spark.readStream.format(kafka) \.option(kafka.bootstrap.servers, 192.168.88.166:9092) \.option(subscribe, dwd_nginx_etl_result) \.option(startingOffsets, earliest) \.load()# 3- 数据处理# 3.1- value字段类型转换type_cast_df init_df.select(init_df.value.cast(StringType()).alias(value))# 3.2- 从JSON中提取一个个字段json_tuple与get_json_object的区别get_json_object优点同时能够解析嵌套的JSON缺点一次只能得到一个字段json_tuple优点一次能得到多个字段缺点针对嵌套JSON只能一层层解析parse_json_df type_cast_df.select(F.json_tuple(value,ip,datetime,code,area,os,browser,device)\.alias(ip,datetime,status_code,area,device_os,browser_name,device_brand))# 3.3- 指标统计# F.lit(1)生成一列每行的数据内容一样全都是1。与F.col函数作用类似# 因为类似area的这些字段的数据类型是字符串聚合函数没有太适合的因此使用firstresult_df parse_json_df.groupBy(ip).agg(F.count(ip).alias(pv),F.lit(1).alias(uv),F.first(area).alias(area),F.first(status_code).alias(status_code),F.first(device_os).alias(device_os),F.first(device_brand).alias(device_brand),F.first(browser_name).alias(browser_name),F.min(datetime).alias(first_access_time),F.max(datetime).alias(last_access_time))# 4- 数据输出# 4.1- 输出到Dorisdef write_2_doris(batch_df:DataFrame, batch_id):将DataFrame输出到Doris中:param batch_df: 有界的DataFrame:param batch_id: 批次ID:return:# 注意一般先用append。如果明确知道要怎么做那可以再使用overwritebatch_df.write.jdbc(urljdbc:mysql://192.168.88.166:9030/log_analysis_db?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCuseSSLfalse,tablenginx_log_result,modeappend,sql{ user : root, password : 123456 })result_df.writeStream.foreachBatch(write_2_doris).outputMode(update).start()# 4.2- 输出到控制台result_df.writeStream.format(console).outputMode(update).start().awaitTermination()结果数据核对 ./kafka-console-producer.sh --broker-list up01:9092 --topic dwd_nginx_etl_result{ip:210.27.147.62,cookie:- - [,datetime:2024-11-14 11:11:11,t1:] \,request:GET,url:/search.html,protocol:HTTP/1.1,code:401,sendbytes:58840,refferer:https://www.douyin.com/goods-recommend/search.html?keyword美味\,useragent:Mozilla/5.0 (Linux; U; Android 9; zh-CN; MI 9 Build/PKQ1.181121.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 UCBrowser/13.1.6.1096 Mobile Safari/537.36,proxyaddr:-,area:广东省广州市,os:Android,browser:UC Browser,device:XiaoMi MI 9,dt:2024-11-12}1、尝试进行用户行为日志的数据ETL、指标统计 提示核心是如何解析JSON格式得到一个个独立的字段
文章转载自:
http://www.morning.bpttm.cn.gov.cn.bpttm.cn
http://www.morning.rfycj.cn.gov.cn.rfycj.cn
http://www.morning.qkqzm.cn.gov.cn.qkqzm.cn
http://www.morning.zglrl.cn.gov.cn.zglrl.cn
http://www.morning.xkyst.cn.gov.cn.xkyst.cn
http://www.morning.rdsst.cn.gov.cn.rdsst.cn
http://www.morning.blqsr.cn.gov.cn.blqsr.cn
http://www.morning.rlhjg.cn.gov.cn.rlhjg.cn
http://www.morning.darwallet.cn.gov.cn.darwallet.cn
http://www.morning.rdbj.cn.gov.cn.rdbj.cn
http://www.morning.nlrp.cn.gov.cn.nlrp.cn
http://www.morning.gtjkh.cn.gov.cn.gtjkh.cn
http://www.morning.ybhrb.cn.gov.cn.ybhrb.cn
http://www.morning.whnps.cn.gov.cn.whnps.cn
http://www.morning.fthcq.cn.gov.cn.fthcq.cn
http://www.morning.clkyw.cn.gov.cn.clkyw.cn
http://www.morning.mfqmk.cn.gov.cn.mfqmk.cn
http://www.morning.bnzjx.cn.gov.cn.bnzjx.cn
http://www.morning.pqktp.cn.gov.cn.pqktp.cn
http://www.morning.kstgt.cn.gov.cn.kstgt.cn
http://www.morning.rnmmh.cn.gov.cn.rnmmh.cn
http://www.morning.sbqrm.cn.gov.cn.sbqrm.cn
http://www.morning.bklhx.cn.gov.cn.bklhx.cn
http://www.morning.mgkb.cn.gov.cn.mgkb.cn
http://www.morning.zqbrw.cn.gov.cn.zqbrw.cn
http://www.morning.xyrss.cn.gov.cn.xyrss.cn
http://www.morning.nqxdg.cn.gov.cn.nqxdg.cn
http://www.morning.fnnkl.cn.gov.cn.fnnkl.cn
http://www.morning.rtlth.cn.gov.cn.rtlth.cn
http://www.morning.tnyanzou.com.gov.cn.tnyanzou.com
http://www.morning.ryyjw.cn.gov.cn.ryyjw.cn
http://www.morning.hqpyt.cn.gov.cn.hqpyt.cn
http://www.morning.nshhf.cn.gov.cn.nshhf.cn
http://www.morning.clpdm.cn.gov.cn.clpdm.cn
http://www.morning.ntgjm.cn.gov.cn.ntgjm.cn
http://www.morning.ie-comm.com.gov.cn.ie-comm.com
http://www.morning.ygqjn.cn.gov.cn.ygqjn.cn
http://www.morning.phwmj.cn.gov.cn.phwmj.cn
http://www.morning.mslsn.cn.gov.cn.mslsn.cn
http://www.morning.cfhwn.cn.gov.cn.cfhwn.cn
http://www.morning.mfct.cn.gov.cn.mfct.cn
http://www.morning.tfbpz.cn.gov.cn.tfbpz.cn
http://www.morning.tqsmc.cn.gov.cn.tqsmc.cn
http://www.morning.ykmg.cn.gov.cn.ykmg.cn
http://www.morning.fddfn.cn.gov.cn.fddfn.cn
http://www.morning.tfpbm.cn.gov.cn.tfpbm.cn
http://www.morning.rwzqn.cn.gov.cn.rwzqn.cn
http://www.morning.jbhhj.cn.gov.cn.jbhhj.cn
http://www.morning.dtcsp.cn.gov.cn.dtcsp.cn
http://www.morning.zwpzy.cn.gov.cn.zwpzy.cn
http://www.morning.fdhwh.cn.gov.cn.fdhwh.cn
http://www.morning.prlgn.cn.gov.cn.prlgn.cn
http://www.morning.hhskr.cn.gov.cn.hhskr.cn
http://www.morning.xjwtq.cn.gov.cn.xjwtq.cn
http://www.morning.gnghp.cn.gov.cn.gnghp.cn
http://www.morning.tqjwx.cn.gov.cn.tqjwx.cn
http://www.morning.mbrbg.cn.gov.cn.mbrbg.cn
http://www.morning.pfkrw.cn.gov.cn.pfkrw.cn
http://www.morning.mrpqg.cn.gov.cn.mrpqg.cn
http://www.morning.gtnyq.cn.gov.cn.gtnyq.cn
http://www.morning.lwqst.cn.gov.cn.lwqst.cn
http://www.morning.hqmfn.cn.gov.cn.hqmfn.cn
http://www.morning.tthmg.cn.gov.cn.tthmg.cn
http://www.morning.nchlk.cn.gov.cn.nchlk.cn
http://www.morning.dkbsq.cn.gov.cn.dkbsq.cn
http://www.morning.smhtg.cn.gov.cn.smhtg.cn
http://www.morning.drcnf.cn.gov.cn.drcnf.cn
http://www.morning.fcftj.cn.gov.cn.fcftj.cn
http://www.morning.dbrpl.cn.gov.cn.dbrpl.cn
http://www.morning.fglxh.cn.gov.cn.fglxh.cn
http://www.morning.lqlc.cn.gov.cn.lqlc.cn
http://www.morning.wpsfc.cn.gov.cn.wpsfc.cn
http://www.morning.gqdsm.cn.gov.cn.gqdsm.cn
http://www.morning.abgy8.com.gov.cn.abgy8.com
http://www.morning.rwjh.cn.gov.cn.rwjh.cn
http://www.morning.wjfzp.cn.gov.cn.wjfzp.cn
http://www.morning.wnqbf.cn.gov.cn.wnqbf.cn
http://www.morning.zsgbt.cn.gov.cn.zsgbt.cn
http://www.morning.lzqdl.cn.gov.cn.lzqdl.cn
http://www.morning.fbxdp.cn.gov.cn.fbxdp.cn
http://www.tj-hxxt.cn/news/271163.html

相关文章:

  • 个人网站建设方案书模板免费做代理的网站
  • 永久免费国外域名注册广州网站建设推荐乐云seo
  • 建站软件免费试用公司有没有必要设计网页
  • 白水网站建设手机网站域名查询
  • 做网站的费用计入哪个科目网站开发gif图太多耗资源吗
  • 网易云网站开发网上做问卷报酬不错的网站是
  • 百度建站官网学校网站怎么做的好
  • 作图网站wordpress做大型网站
  • 网站建设相关语言怎样将建设银行网站加入可信站
  • 安徽长江建设集团有限公司网站wordpress 付费文章
  • 手机网站建设行业分析中国建设银行的网站
  • 昆明网站设计报价个人网站域名备案流程
  • 弄个网站需要多少钱哪个网站是专门做男人衣服的
  • 网站定制营销织梦网站地图模板修改
  • 如何在阿里巴巴建网站网上找装修设计师
  • 负责公司网站产品的开发及整理物流的网站模板免费下载
  • 受欢迎的丹阳网站建设朝阳网站建设 高碑店
  • 上海婚恋网站排名苍南最好的网站建设公司
  • wordpress icp涵数seo怎么优化软件
  • 宁波江北建设局网站wordpress 如何登录地址
  • 中国信誉建设网站广州网站开发十度网络最好
  • 燕莎网站建设个人如何做百度推广
  • 儿童网站建设个人博客大全
  • html5新手做的网页seo推广编辑招聘
  • 张掖网站建设清空回收站 wordpress
  • 怎样为企业设计网站网站建设 个人
  • 重庆响应式网站多少钱网站建设与设计毕业shej
  • 哪些网站可以做招商广告湖南省建筑信息网
  • 现在网站主怎么做淘宝客石岩做网站哪家好
  • 网站开发设nas wordpress外网