企业网站的基本内容和营销功能,用asp.net做的网站贴吧,设计工作室与网站建设工作室,wordpress 不显示文章归档文章目录 day16_推荐系统和总结一、推荐实现1、基于流行度推荐#xff08;掌握#xff09;1.1 近期热门商品推荐1.2 个人热门商品推荐 2、基于隐语义模型的协同过滤推荐#xff08;了解#xff09;2.1 ALS算法介绍2.2 推荐代码 3、基于物品的协同过滤推荐#xff08;了解掌握1.1 近期热门商品推荐1.2 个人热门商品推荐 2、基于隐语义模型的协同过滤推荐了解2.1 ALS算法介绍2.2 推荐代码 3、基于物品的协同过滤推荐了解4、基于用户的协同过滤推荐了解5、基于关联规则的推荐熟悉5.1 关联规则详解5.2 FP-growth算法理解5.3 SparkMLlib中的FP-growth算法5.4 完整代码 6、服务部署了解 day16_推荐系统和总结
一、推荐实现
推荐系统一般是由Java后端与前端人员进行开发的大数据开发人员比较少参与主要是提供数据。
为了实现推荐功能需要启动Hadoop、Hive、ES、Doris、SparkSubmit
启动Hadoop、启动Hivecd /./up01.sh start启动ES1- 切换用户su es2- 进入目录cd /home/es/elasticsearch-7.10.2/bin3- 启动elasticsearch -d4- 退出es用户exit启动Doris/export/server/doris/fe/bin/start_fe.sh --daemon/export/server/doris/be/bin/start_be.sh --daemon/export/server/doris/apache_hdfs_broker/bin/start_broker.sh --daemon启动SparkSubmitcd /export/server/spark/sbin./start-thriftserver.sh \--hiveconf hive.server2.thrift.port10001 \--hiveconf hive.server2.thrift.bind.hostup01 \--hiveconf spark.sql.warehouse.dirhdfs://up01:8020/user/hive/warehouse \--master local[*]1、基于流行度推荐掌握
基于流行度推荐也就是基于统计的推荐主要用来解决用户的冷启动问题对于新用户首次登录。基于流行度的推荐也可以用于单独的热门商品模块。
1.1 近期热门商品推荐
可以按商品销售的单量进行倒序排序然后存入Doirs中。表中可以多存入一些数据在使用时根据品类进行倒序查询取到相关商品即可。
计算的sql如下
-- 近期热门商品推荐
selectcurrent_date() as recommend_date, -- 推荐时间区别是什么时候推荐的third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量
from dwm.dwm_sold_goods_sold_dtl_i
where-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))40 and goods_no is not null
group by third_category_no,third_category_name,goods_no,goods_name
order by order_count desc
limit 300 -- 推荐比项目经理要求的推荐数目多一些Doris的建表语句如下
create database if not exists recommend_db;
CREATE TABLE IF NOT EXISTS recommend_db.popular_hot_goods (recommend_date DATE comment 计算日期,goods_no bigint comment 商品编码,third_category_no STRING comment 三级品类编码,third_category_name STRING comment 三级品类名称,goods_name STRING comment 商品名称,order_count INT comment 销售数量
)
UNIQUE KEY(recommend_date, goods_no)
comment 热门商品推荐
PARTITION BY RANGE(recommend_date) ()
DISTRIBUTED BY HASH(goods_no) BUCKETS 1
sql (dynamic_partition.create_history_partition true,dynamic_partition.enable true,dynamic_partition.time_unit DAY,dynamic_partition.start -365,dynamic_partition.end 3,dynamic_partition.prefix p,dynamic_partition.buckets 10,replication_allocation tag.location.default: 1
);注意: 1- 如果多个字段作为UNIQUE KEY那么String类型不能够使用。因此这里将goods_no的类型进行了强制转换。为了将不同日期的数据分开存放这里使用动态分区表。2- 建表中的字段顺序需要与UNIQUE KEY中字段顺序保持一致。并且UNIQUE KEY中的字段要放在最上面。如果不遵守会报如下的错推荐代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as Fos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(topn_goods_recommend)\.master(local[*]) \.config(spark.sql.warehouse.dir, hdfs://up01:8020/user/hive/warehouse) \.config(hive.metastore.uris, thrift://up01:9083) \.config(spark.sql.shuffle.partitions,2)\.enableHiveSupport() \.getOrCreate()# 2- 读取分析Hive中的数据获取TOPN的热门商品topn_goods_df spark.sql(selectcurrent_date() as recommend_date, -- 推荐时间区别是什么时候推荐的third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量from dwm.dwm_sold_goods_sold_dtl_iwhere-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))40 and goods_no is not nullgroup by third_category_no,third_category_name,goods_no,goods_nameorder by order_count desclimit 300 -- 推荐比项目经理要求的推荐数目多一些)# 3- 数据存储到Doris中topn_goods_df.write.jdbc(urljdbc:mysql://192.168.88.166:9030/recommend_db?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCuseSSLfalse,tablepopular_hot_goods,modeappend,sql{ user : root, password : 123456 })# 4- 释放资源spark.stop()1.2 个人热门商品推荐
也就是根据销售单量统计每个人喜欢购买的前N个商品
计算的sql如下
selectcurrent_date() as recommend_date, -- 推荐时间区别是什么时候推荐的zt_id as user_id,goods_no,goods_name,third_category_no,third_category_name,order_count
from (select*,row_number() over(partition by zt_id order by order_count desc) as rnfrom (selectzt_id,third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量from dwm.dwm_sold_goods_sold_dtl_iwhere-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))40 and goods_no is not nulland zt_id is not null and zt_id!0group by zt_id,third_category_no,third_category_name,goods_no,goods_name) tmp_1
) tmp_2 where rn20Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.popular_person_hot_goods (recommend_date DATE comment 计算日期,user_id INT comment 会员ID,goods_no STRING comment 商品编码,goods_name STRING comment 商品名称,third_category_no STRING comment 三级品类编码,third_category_name STRING comment 三级品类名称,order_count INT comment 订单数
)
UNIQUE KEY(recommend_date, user_id)
comment 个人热门商品推荐
PARTITION BY RANGE(recommend_date) ()
DISTRIBUTED BY HASH(user_id) BUCKETS 1
sql (dynamic_partition.create_history_partition true,dynamic_partition.enable true,dynamic_partition.time_unit DAY,dynamic_partition.start -365,dynamic_partition.end 3,dynamic_partition.prefix p,dynamic_partition.buckets 10,replication_allocation tag.location.default: 1
);推荐代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as Fos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(topn_user_goods_recommend)\.master(local[*]) \.config(spark.sql.warehouse.dir, hdfs://up01:8020/user/hive/warehouse) \.config(hive.metastore.uris, thrift://up01:9083) \.config(spark.sql.shuffle.partitions,8)\.enableHiveSupport() \.getOrCreate()# 2- 读取分析Hive中的数据获取TOPN的热门商品topn_goods_df spark.sql(selectcurrent_date() as recommend_date, -- 推荐时间区别是什么时候推荐的zt_id as user_id,goods_no,goods_name,third_category_no,third_category_name,order_countfrom (select*,row_number() over(partition by zt_id order by order_count desc) as rnfrom (selectzt_id,third_category_no,third_category_name,goods_no,goods_name,count(order_no) as order_count -- 订单量from dwm.dwm_sold_goods_sold_dtl_iwhere-- 过滤最近一段时间内的销售数据datediff(current_date(),to_date(trade_date))40 and goods_no is not nulland zt_id is not null and zt_id!0group by zt_id,third_category_no,third_category_name,goods_no,goods_name) tmp_1) tmp_2 where rn20)# 3- 数据存储到Doris中topn_goods_df.write.jdbc(urljdbc:mysql://192.168.88.166:9030/recommend_db?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCuseSSLfalse,tablepopular_person_hot_goods,modeappend,sql{ user : root, password : 123456 })# 4- 释放资源spark.stop()2、基于隐语义模型的协同过滤推荐了解
基于隐语义模型的协同过滤方法结合了协同过滤的思想和隐语义模型的技术通过矩阵分解等方法将用户-项目交互矩阵分解为两个低维矩阵分别表示用户在隐空间中的向量和项目在隐空间中的向量。
2.1 ALS算法介绍
ALS算法是2008年以来用的比较多的协同过滤算法。它已经集成到Spark的Mllib库中使用起来比较方便。从协同过滤的分类来说ALS算法属于User-Item CF也叫做混合CF。它同时考虑了User和Item两个方面。
spark.ml目前支持基于模型的协同过滤使用交替最小二乘法ALS算法实现。
spark.ml的实现具有以下参数
numBlocks用户和物品将被分成的块数以便并行计算默认为10rank模型中的潜在因子数量默认为10maxIter运行的最大迭代次数默认为10regParam在ALS中指定的正则化参数默认为1.0implicitPrefs指定是否使用显式反馈ALS变体或适用于隐式反馈数据的变体默认为false表示使用显式反馈alpha适用于ALS隐式反馈变体的参数决定了对偏好观察的基本置信度默认为1.0nonnegative指定是否对最小二乘法使用非负约束默认为false
注意基于DataFrame的ALS API目前仅支持整数类型的用户和物品ID。
2.2 推荐代码
如果使用基于ALS的协同过滤模型进行推荐关键是要构造用户对商品的评分数据。评分主要来源于用户的行为包括浏览、加购、下单、购买、退单、评论、收藏等一般在企业中都会将这些因素考虑进去。
具体的评分方法是浏览 1分加购 2分下单 3分 支付 5分退单 -5分。
Doris建表语句
CREATE DATABASE IF NOT EXISTS recommend_db;
CREATE TABLE IF NOT EXISTS recommend_db.als_goods_for_user (user_id INT comment 用户id,goods_nos STRING comment 推荐的商品列表
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
sql(replication_num 1);推荐代码
import os
from datetime import datetime
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, DoubleTypeos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/pythondef get_best_parameter(df):# 切分数据集training, test df.randomSplit([0.8, 0.2], seed88)# 使用ALS构建推荐模型# 将冷启动策略设置为“drop”以确保不会获得NaN评估指标als ALS(userColuser_id, itemColgoods_no, ratingColscore, coldStartStrategydrop)# 创建参数网格param_grid ParamGridBuilder() \.addGrid(als.rank, [5, 10, 15]) \.addGrid(als.maxIter, [5, 10, 20]) \.addGrid(als.regParam, [0.01, 0.05, 0.1]) \.build()# 创建评估器evaluator RegressionEvaluator(metricNamermse, labelColscore, predictionColprediction)# 创建交叉验证器crossval CrossValidator(estimatorals,estimatorParamMapsparam_grid,evaluatorevaluator,numFolds3) # 3折交叉验证# 训练模型cv_model crossval.fit(training)# 选择最佳模型best_model cv_model.bestModel# 评估最佳模型predictions best_model.transform(test)rmse evaluator.evaluate(predictions)print(最优模型的均方根误差为 str(rmse))# 获取最佳参数rank best_model._java_obj.parent().getRank()maxIter best_model._java_obj.parent().getMaxIter()regParam best_model._java_obj.parent().getRegParam()print(最佳参数组合)print(rank: , rank)print(maxIter: , maxIter)print(regParam: , regParam)return rank, maxIter, regParamif __name__ __main__:# 1)创建整合Hive的SparkSession# 1- 创建SparkSession对象spark SparkSession.builder \.appName(recommend) \.master(local[*]) \.config(spark.sql.warehouse.dir, hdfs://up01:8020/user/hive/warehouse) \.config(hive.metastore.uris, thrift://up01:9083) \.config(spark.sql.shuffle.partitions, 5) \.enableHiveSupport() \.getOrCreate()# 2从业务库中计算历史评分select_sql select zt_id as user_id, goods_no, sum(if(trade_type in(2, 5), -1, 1)) * 5 as scorefrom dwm.dwm_sold_goods_sold_dtl_iwhere dt date_sub(current_date, 90) and dt date_sub(current_date, 1)and zt_id ! 0 and zt_id is not nullgroup by zt_id, goods_nohive_df spark.sql(select_sql)hive_df.show()# 3读取并解析日志数据file_path hdfs://up01:8020/xtzg/etl/dwd_user_event_etl_result/dt2025-02-14# 读取ORC格式的数据log_df spark.read.format(orc).load(file_path).select(user_id, F.split(goods_name, )[0].alias(goods_no), (F.col(is_browse)*1 F.col(is_cart)*2 F.col(is_order)*3 F.col(is_buy)*5 - F.col(is_back_order)*5).alias(score) )log_df.show()# 4) 数据合并并聚合# 因为als模型中需要userCol和itemCol都是整型所以需要将类型转成int又因为goods_no有0开头的所以需要再前边拼接一个数字# 因为频次过多会导致评分过大所以可以使用log将数据变平滑union_df hive_df.unionAll(log_df).groupby(user_id, goods_no).agg(F.sum(score).alias(score)).\select(F.col(user_id).astype(IntegerType()).alias(user_id), F.concat(F.lit(1), F.col(goods_no)).astype(IntegerType()).alias(goods_no), score)# union_df.printSchema()union_df.show()# 5) 训练模型并得到推荐结果# 获取最佳超参数# rank, maxIter, regParam get_best_parameter(union_df)rank, maxIter, regParam 15, 20, 0.1als ALS(rankrank, maxItermaxIter, regParamregParam, userColuser_id, itemColgoods_no, ratingColscore,coldStartStrategydrop)als_model: ALSModel als.fit(union_df)# 为每个用户生成十大商品推荐userRecs als_model.recommendForAllUsers(10)userRecs.printSchema()# userRecs.show(truncateFalse)# 处理 goods_no将int转为str,并去掉前缀1doris_df userRecs.withColumn(goods_nos,F.expr(TRANSFORM(recommendations, x - named_struct(goods_no, substr(CAST(x.goods_no AS STRING), 2),rating, x.rating)))).select(user_id, F.col(goods_nos).astype(StringType()).alias(goods_nos))doris_df.printSchema()doris_df.show(truncateFalse)# 保存到 Dorisdoris_df.write.jdbc(urljdbc:mysql://192.168.88.166:9030/recommend_db?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCuseSSLfalse,tableals_goods_for_user,modeappend,sql{user: root, password: 123456})# 释放资源spark.stop()3、基于物品的协同过滤推荐了解
基于物品的协同过滤就是计算出每个标的物最相似的标的物列表然后就可以为用户推荐用户喜欢的标的物相似的标的物。 这里可以借助ALS算法生成的矩阵来完成。
离线计算的ALS 算法算法最终会为用户、商品分别生成最终的特征矩阵分别是表示用户特征矩阵的U(K x U)矩阵每个用户由 K 个特征描述表示物品特征矩阵的V(I x K)矩阵每个物品也由 K 个特征描述。
Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.als_sim_goods_list (id INT comment id,goods_no STRING comment 商品编码,sim_goods_list STRING comment 相似的商品列表
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
sql(replication_num 1);推荐代码写到ALS的后面
# 6)为每个商品生成十大用户推荐
# goodsRecs als_model.recommendForAllItems(10)# 通过 itemFactors 获得商品的特征表达
# als_model.itemFactors.show(truncateFalse)
# 获取商品id及对应的特征表达
item_factors_df als_model.itemFactors.select(F.expr(substr(cast(id as string), 2) as goods_no), features)
item_factors_df.show(truncateFalse)# 定义计算余弦相似度的 UDF
def consin_sim(vec1, vec2):vec1 np.array(vec1)vec2 np.array(vec2)num np.dot(vec1, vec2)# np.linalg.norm()用于求范数默认是二范数denom np.linalg.norm(vec1) * np.linalg.norm(vec2)if denom 0:return 0.0return round(float(num / denom), 4)consin_sim_udf F.udf(consin_sim, DoubleType())# item_factors_df自关联计算相似度再将相似度小于0.75的过滤掉
cartesian_goods_df item_factors_df.alias(df1) \.crossJoin(item_factors_df.alias(df2)) \.filter(F.col(df1.goods_no) ! F.col(df2.goods_no)) \.withColumn(simScore, consin_sim_udf(F.col(df1.features), F.col(df2.features))) \.filter(simScore 0.75)# 按照 goods_no 进行分组并构建推荐结果
goods_recs_df cartesian_goods_df.groupBy(df1.goods_no) \.agg(F.collect_list(F.struct(F.col(df2.goods_no).alias(rec_goods_no), F.col(simScore).alias(score))).alias(rec_goods_nos))# 对相似的goods_no列表进行排序并选取前10个【因为个别商品相似的商品太多所以只保留10个即可】
# 使用 expr 和 array_sort 函数进行排序并使用 slice 函数只保留前10个元素
sorted_df goods_recs_df.withColumn(sim_goods_list,F.expr(slice(array_sort(rec_goods_nos, (x, y) - case when x.score y.score then -1 when x.score y.score then 1 else 0 end), 1, 10))
).withColumn(id,F.expr(cast(concat(1, goods_no) as int))
).select(id, goods_no, F.col(sim_goods_list).astype(StringType()).alias(sim_goods_list))# 显示结果
sorted_df.printSchema()
sorted_df.show(truncateFalse)# 保存到 Doris
write_to_doris(sorted_df, recommend_db.als_sim_goods_list)4、基于用户的协同过滤推荐了解
UserCF算法主要是考虑用户与用户之间的相似度给用户推荐和他兴趣相似的其他用户喜欢的物品。俗话说物以群分人以类聚人们总是倾向于跟自己志同道合的人交朋友。同理你朋友喜欢的东西你大概率也可能会喜欢UserCF算法正是利用了这个原理。
Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.user_cf_goods_for_user (user_id INT comment 用户id,goods_nos STRING comment 推荐的商品列表
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
sql(replication_num 1);推荐代码
import osfrom pyspark.ml.feature import CountVectorizer, Normalizer
from pyspark.sql import DataFrame, Window, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, ArrayType, StructType, StructField, StringType, FloatTypeos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/pythonif __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder \.appName(recommend) \.master(local[*]) \.config(spark.sql.warehouse.dir, hdfs://up01:8020/user/hive/warehouse) \.config(hive.metastore.uris, thrift://up01:9083) \.config(spark.sql.shuffle.partitions, 5) \.enableHiveSupport() \.getOrCreate()# 2)从es读取标签数据并将标签数据合并es_df spark.read.format(es) \.option(es.nodes, 192.168.88.166:9200) \.option(es.resource, user_profile_tags) \.option(es.read.field.include, user_id,tags_id_times,tags_id_once,tags_id_streaming) \.load()temp_df es_df.select(user_id, F.concat(F.coalesce(tags_id_times, F.lit()), F.lit(,),F.coalesce(tags_id_once, F.lit()), F.lit(,),F.coalesce(tags_id_streaming, F.lit())).alias(tags))temp_df.show()# 3)将标签数据转换成向量# 使用split函数将字符串切分成数组然后使用filter将的元素过滤掉tags_df temp_df.select(user_id, F.split(tags, ,).alias(tags)).select(user_id, F.expr(filter(tags, x - x ! )).alias(tags))# tags_df.show(truncateFalse)# 将标签数组转换为向量cv CountVectorizer(inputColtags, outputColfeatures)model cv.fit(tags_df)user_df model.transform(tags_df)# 因为数据量比较大容易运行不出来所以可以抽样取少量数据# user_df user_df.sample(fraction0.01, seed66)user_df.show(truncateFalse)# 4)计算用户相似度# 标准化向量normalizer Normalizer(inputColfeatures, outputColnorm_features)norm_df normalizer.transform(user_df).select(user_id, norm_features)norm_df.printSchema()# norm_df.show(truncateFalse)# 将稀疏向量列转换为稠密向量列def to_dense(vector):return vector.toArray().tolist()to_dense_udf F.udf(to_dense, ArrayType(DoubleType()))dense_df norm_df.withColumn(dense_features, to_dense_udf(norm_features))# dense_df.show(truncateFalse)# 计算用户之间的余弦相似度join_df dense_df.alias(u1).join(dense_df.alias(u2), F.col(u1.user_id) ! F.col(u2.user_id)) \.select(F.col(u1.user_id).alias(user1), F.col(u2.user_id).alias(user2),F.col(u1.dense_features).astype(ArrayType(FloatType())).alias(f1),F.col(u2.dense_features).astype(ArrayType(FloatType())).alias(f2))user_sim join_df.select(user1, user2, F.zip_with(f1, f2, lambda x, y: x * y).alias(f3)) \.withColumn(cosine_sim, F.round(F.aggregate(f3, F.lit(0.0), lambda acc, x: acc x), 4)) \.select(user1, user2, cosine_sim)# print(-----------------------,user_sim.count(),------------------------------)user_sim.printSchema()# user_sim.show(truncateFalse)# 5)获取每个用户最相似的10个用户# 定义窗口函数windowSpec Window.partitionBy(user1).orderBy(F.col(cosine_sim).desc())# 取rn前10的列rn_df user_sim.withColumn(rn, F.row_number().over(windowSpec)).filter(rn 10)# rn_df.show(truncateFalse)# 6)查询每个用户评分最高的商品# 计算评分时因为不同用户购买频次不同会导致评分差距过大在进行商品推荐时该评分对结果影响很大所以可以对score使用log函数将这种变化变平缓些select_sql select user_id, goods_no, round(log(score), 3) as scorefrom (selectuser_id, goods_no, score, rank() over(partition by user_id order by score desc) as rnfrom (select zt_id as user_id, goods_no, sum(if(trade_type in(2, 5), -1, 1)) * 5 as scorefrom dwm.dwm_sold_goods_sold_dtl_iwhere dt date_sub(current_date, 90) and dt date_sub(current_date, 1)and zt_id ! 0 and zt_id is not nullgroup by zt_id, goods_no) tmpwhere score 0 -- score为0或负的不推荐) twhere rn 10hive_df spark.sql(select_sql)# hive_df.show()# 按照 goods_no 进行分组并构建推荐结果prefer_goods_df hive_df.groupBy(user_id) \.agg(F.collect_list(F.struct(goods_no, score)).alias(prefer_goods_nos))# prefer_goods_df.show(truncateFalse)# 7)用户关联商品给用户进行推荐# 关联商品join_df rn_df.join(prefer_goods_df, rn_df[user1] prefer_goods_df[user_id], inner).select(user1, user2, cosine_sim, F.col(prefer_goods_nos).alias(user1_goods_no))join_df join_df.join(prefer_goods_df, join_df[user2] prefer_goods_df[user_id], inner).select(user1, user2, cosine_sim, user1_goods_no, F.col(prefer_goods_nos).alias(user2_goods_no))# join_df.show(truncateFalse)join_df.printSchema()# 定义一个udf,将cosine_sim,user1_goods_no和user2_goods_no都传进去去掉user2_goods_no中的user1_goods_no并计算user2_goods_no的分数def calculate_score(cosine_sim, user1_goods_no, user2_goods_no):user1_goods [item.goods_no for item in user1_goods_no]user2_goods []for item in user2_goods_no:if item[goods_no] not in user1_goods:user2_goods.append({goods_no: item[goods_no], score: round(item[score] * cosine_sim, 3)})return user2_goods# 返回值类型schema ArrayType(StructType([StructField(goods_no, StringType(), nullableFalse),StructField(score, DoubleType(), nullableFalse)]))calculate_score_udf F.udf(calculate_score, schema)# 获取用户及推荐的商品rec_df join_df.select(F.col(user1).alias(user_id), calculate_score_udf(cosine_sim, user1_goods_no, user2_goods_no).alias(rec_goods)).\filter(F.size(F.col(rec_goods)) 0)# rec_df.show(truncateFalse)# 展开 rec_goods 中的元素然后按照 user_id 进行分组并聚合成列表goods_recs_df rec_df.withColumn(goods_no_element, F.explode(F.col(rec_goods))).groupBy(user_id).agg(F.collect_list(goods_no_element).alias(rec_goods_nos))# 对推荐的goods_no列表进行排序并选取前10个【因为个别用户推荐的商品太多所以只保留10个即可】# 使用 expr 和 array_sort 函数进行排序并使用 slice 函数只保留前10个元素sorted_df goods_recs_df.withColumn(rec_goods_list,F.expr(slice(array_sort(rec_goods_nos, (x, y) - case when x.score y.score then -1 when x.score y.score then 1 else 0 end), 1, 10))).select(user_id, F.col(rec_goods_list).astype(StringType()).alias(goods_nos))# sorted_df.show(truncateFalse)# 8)结果保存# 保存到 Dorissorted_df.write.jdbc(urljdbc:mysql://192.168.88.166:9030/recommend_db?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCuseSSLfalse,tableuser_cf_goods_for_user,modeappend,sql{user: root, password: 123456})spark.stop()5、基于关联规则的推荐熟悉
5.1 关联规则详解
什么是关联规则(Association Rules)?
答关联规则是数据挖掘中的概念通过分析数据找到数据之间的关联。电商中经常用来分析购买物品之间的相关性例如“购买尿布的用户有大概率购买啤酒”这就是一个关联规则。
什么是关联规则推荐(Association Rule Based Recommendaion)?
答顾名思义利用关联规则来实施推荐。关联规则推荐的目标是希望达到“将尿布放入购物车之后再推荐啤酒”比“直接推荐啤酒”获取有更好的售卖效果。
关联规则推荐的典型应用
线下可以将尿布和啤酒放在一起;
线上可以在用户将尿布放入购物车后立刻推荐啤酒。
如何实施
假设某电商会售卖ABCD四种商品历史上共5笔订单分别卖出{A,B,C}, {B,C,D}, {A,B,C,D}, {A,C}, {C}
5.2 FP-growth算法理解
常用的算法有Aprior算法和FP-growth算法FP-growth算法比Apriori算法效率更高并且在PySpark中对FP-growth算法进行了实现所以这里重点讲一下FP-growth算法原理。
FP-growth(Frequent Pattern Tree, 频繁模式树),是韩家炜老师提出的挖掘频繁项集的方法是将数据集存储在一个特定的称作FP树的结构之后发现频繁项集或频繁项对即常在一块出现的元素项的集合FP树。
5.3 SparkMLlib中的FP-growth算法
spark.ml中提供了FPGrowth()方法来实现FP-growth算法。spark.ml的FP-growth实现接受以下超参数
minSupport将一个项目集识别为频繁项目集的最低支持度。例如如果一个项目在5个事务中出现了3次它的支持度就是3/50.6。minConfidence生成关联规则的最低置信度。置信度是表明一个关联规则被发现为真的频率。例如如果在事务中项目集X出现了4次而X和Y共同出现了2次则规则X Y的置信度为2/40.5。该参数不会影响频繁项目集的挖掘但会指定从频繁项目集中生成关联规则的最低置信度。numPartitions用于分配工作的分区数。默认情况下该参数未设置使用输入数据集的分区数。
模型训练完成后会生成 FPGrowthModel 对象。FPGrowthModel 提供以下方法或属性 freqItemsets频繁项目集以包含以下列的数据框格式提供 itemsarray一个给定的项目集。freqlong根据配置的模型参数该项目集出现的次数。 associationRules生成的置信度高于 minConfidence 的关联规则以包含以下列的数据框格式提供 antecedentarray作为关联规则假设的项目集。如果关联规则为A-B则 antecedent 为 A。consequentarray总是包含一个元素的项目集代表关联规则的结论。如果关联规则为A-B则 antecedent 为 B。confidencedouble置信度定义参见上文中的 minConfidence。liftdouble提升度计算方法为 support(antecedent ∪ consequent) / (support(antecedent) x support(consequent))。supportdouble频繁项目集的支持度定义参见上文中的 minSupport。 transform根据传入的items比对关联规则将符合规则的结果添加到预测结果中。transform 方法将汇总所有适用规则的结果作为预测。预测列的数据类型与 items 列相同并且不包含 items 列中的现有项目。
5.4 完整代码
Doris建表语句
CREATE TABLE IF NOT EXISTS recommend_db.fpgrowth_association_goods
( calculate_date DATETIME COMMENT 计算时间,antecedent ARRAYSTRING COMMENT 购买的商品,consequent ARRAYSTRING COMMENT 关联推荐商品,confidence DOUBLE COMMENT 置信度,lift DOUBLE COMMENT 提升度,support DOUBLE COMMENT 支持度
)
DUPLICATE KEY(calculate_date)
comment 关联规则推荐
PARTITION BY RANGE(calculate_date) ()
DISTRIBUTED BY HASH(calculate_date) BUCKETS 1
sql (dynamic_partition.create_history_partition true,dynamic_partition.enable true,dynamic_partition.time_unit DAY,dynamic_partition.start -365,dynamic_partition.end 3,dynamic_partition.prefix p,dynamic_partition.buckets 10,replication_allocation tag.location.default: 1
);因为在doris中只是为了备份所以存储成冗余模型即可。为了区分不同时间计算的结果在表中添加了calculate_date字段作为区分。然后为了分区存储使用了动态分区的方式。推荐代码
from pyspark.ml.fpm import FPGrowth, FPGrowthModel
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, StructType, StructField, ArrayType
from tags.utils.hdfs_utils import HDFSUtilos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3# 实现商品推荐的功能
def get_recommend_goods(current_goods_no, spark, fpg_model):schema StructType([StructField(items,ArrayType(StringType()))])current_goods_no_df spark.createDataFrame(data[(current_goods_no,)],schemaschema)# 使用模型进行商品推荐result_df fpg_model.transform(current_goods_no_df)result_df.show()result_df.printSchema()print(result_df.collect())# 返回最终的推荐商品IDreturn result_df.collect()[0][1]# 基于关联规则的推荐
if __name__ __main__:# 1- 创建SparkSession对象spark SparkSession.builder\.appName(fp_growth)\.master(local[*]) \.config(spark.sql.warehouse.dir, hdfs://up01:8020/user/hive/warehouse) \.config(hive.metastore.uris, thrift://up01:9083) \.config(spark.sql.shuffle.partitions,5) \.enableHiveSupport() \.getOrCreate()# 2- 数据输入分析商品间的关联关系order_df spark.sql(selectorder_no,collect_set(goods_no) as items -- 将当前订单下的多个商品合到一个Set集合中from dwm.dwm_sold_goods_sold_dtl_iwhere datediff(current_date(),to_date(trade_date))40and goods_no is not nulland parent_order_no is not nulland order_no is not nullgroup by order_no)# 3- 通过FP-growth分析商品间的关联关系的频率path /xtzg/recommend/fpgif HDFSUtil().exists(path):# 如果之前已经训练好了模型那么直接加载出来使用即可fpg_model FPGrowthModel.load(hdfs://192.168.88.166:8020path)else:# 3.1- 创建算法模型实例对象fpGrowth FPGrowth(itemsColitems, minSupport0.001, minConfidence0.6)# 3.2- 对算法模型使用数据进行训练fpg_model fpGrowth.fit(order_df)# 3.3- 再将训练好的模型存储到HDFSfpg_model.save(hdfs://192.168.88.166:8020path)rule_result fpg_model.associationRulesrule_result.show(n100)rule_result.printSchema()# 4- 模型训练后的商品关联信息存放到Doris中# doris_df rule_result.withColumn(calculate_date,F.current_timestamp())doris_df rule_result.select(F.current_timestamp().alias(calculate_date),rule_result.antecedent.cast(StringType()).alias(antecedent),rule_result.consequent.cast(StringType()).alias(consequent),confidence,lift,support)doris_df.write.jdbc(urljdbc:mysql://192.168.88.166:9030/recommend_db?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCuseSSLfalse,tablefpgrowth_association_goods,modeappend,sql{ user : root, password : 123456 })# 5- 使用训练好的模型来进行商品的推荐这里是模拟后面顾客来购买东西的时候进行推荐的效果current_goods_no [3224064]recommend_goods get_recommend_goods(current_goods_no, spark, fpg_model)print(recommend_goods)# 6- 释放资源spark.stop()可能遇到的错误 原因: 将数据输入到Doris的array字段中的时候需要在输入前将对应的数据格式进行类型转换为字符串6、服务部署了解
配置好get_recommend_goods方法后每次调用都需要spark session和fpg_model如果每次调用都新建非常浪费资源时效也会非常差。所以这里需要将get_recommend_goods方法布置成接口服务。服务启动后则可以只实例化一个spark session和fgp_model并实时响应查询推荐商品的请求
这里借助Flask来实现。Flask是一个用Python编写的Web应用程序框架。Flask中文官网:https://dormousehole.readthedocs.io/en/latest/
Flask安装命令
pip install Flask -i https://mirrors.aliyun.com/pypi/simple/Flask代码
import ast
import os
from flask import Flask, request
from pyspark.ml.fpm import FPGrowthModel
from pyspark.sql import SparkSessionfrom tags.recommend.fpgrowth_association_goods import get_recommend_goodsos.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python# 初始化SparkSession
spark SparkSession.builder \.config(spark.sql.shuffle.partitions, 5) \.appName(recommend_api) \.getOrCreate()# 加载模型
hdfs_path /xtzg/recommend/fpg
fpg_model: FPGrowthModel FPGrowthModel.load(hdfs://192.168.88.166:8020hdfs_path)app Flask(__name__)app.route(/recommend)
def recommend():# 处理get请求获取?后边的参数data request.args.to_dict()# print(data)# 使用literal_eval将字符串转换为listgoods_list ast.literal_eval(data[goods_list])print(-----数据来了, goods_list)# 方法调用recommended_goods get_recommend_goods(goods_list, spark, fpg_model)return recommended_goodsapp.route(/)
def hello_world():return 欢迎来到小兔智购商品推荐系统if __name__ __main__:app.run(host0.0.0.0, port5000)启动Flask然后可以看到生成的URL。 访问
这里模拟发送一个get请求在url后加?然后是keyvalue如下
http://192.168.88.166:5000/recommend?goods_list[‘3215330’]
则可以得到响应的结果 在工作中由后端程序调用接口得到响应结果后再发生给前端进行渲染生成推荐结果。 文章转载自: http://www.morning.xpmwt.cn.gov.cn.xpmwt.cn http://www.morning.gfnsh.cn.gov.cn.gfnsh.cn http://www.morning.wqsjx.cn.gov.cn.wqsjx.cn http://www.morning.rfzzw.com.gov.cn.rfzzw.com http://www.morning.wpxfk.cn.gov.cn.wpxfk.cn http://www.morning.nbmyg.cn.gov.cn.nbmyg.cn http://www.morning.darwallet.cn.gov.cn.darwallet.cn http://www.morning.ggqcg.cn.gov.cn.ggqcg.cn http://www.morning.xdjwh.cn.gov.cn.xdjwh.cn http://www.morning.nxbsq.cn.gov.cn.nxbsq.cn http://www.morning.xknmn.cn.gov.cn.xknmn.cn http://www.morning.hhxkl.cn.gov.cn.hhxkl.cn http://www.morning.ctqlq.cn.gov.cn.ctqlq.cn http://www.morning.dwmtk.cn.gov.cn.dwmtk.cn http://www.morning.skbkq.cn.gov.cn.skbkq.cn http://www.morning.nsjpz.cn.gov.cn.nsjpz.cn http://www.morning.lsgsn.cn.gov.cn.lsgsn.cn http://www.morning.tsxg.cn.gov.cn.tsxg.cn http://www.morning.nynyj.cn.gov.cn.nynyj.cn http://www.morning.fwrr.cn.gov.cn.fwrr.cn http://www.morning.ghxkm.cn.gov.cn.ghxkm.cn http://www.morning.tjmfz.cn.gov.cn.tjmfz.cn http://www.morning.pyxtn.cn.gov.cn.pyxtn.cn http://www.morning.fnxzk.cn.gov.cn.fnxzk.cn http://www.morning.bhdyr.cn.gov.cn.bhdyr.cn http://www.morning.gjwkl.cn.gov.cn.gjwkl.cn http://www.morning.qiyelm.com.gov.cn.qiyelm.com http://www.morning.fslxc.cn.gov.cn.fslxc.cn http://www.morning.ktlfb.cn.gov.cn.ktlfb.cn http://www.morning.gcrlb.cn.gov.cn.gcrlb.cn http://www.morning.dmrjx.cn.gov.cn.dmrjx.cn http://www.morning.tpnxr.cn.gov.cn.tpnxr.cn http://www.morning.xfxlr.cn.gov.cn.xfxlr.cn http://www.morning.psxcr.cn.gov.cn.psxcr.cn http://www.morning.rttp.cn.gov.cn.rttp.cn http://www.morning.wbqt.cn.gov.cn.wbqt.cn http://www.morning.yzktr.cn.gov.cn.yzktr.cn http://www.morning.ypqwm.cn.gov.cn.ypqwm.cn http://www.morning.rhjhy.cn.gov.cn.rhjhy.cn http://www.morning.dywgl.cn.gov.cn.dywgl.cn http://www.morning.grbp.cn.gov.cn.grbp.cn http://www.morning.jpydf.cn.gov.cn.jpydf.cn http://www.morning.fcpjq.cn.gov.cn.fcpjq.cn http://www.morning.rsbqq.cn.gov.cn.rsbqq.cn http://www.morning.dnpft.cn.gov.cn.dnpft.cn http://www.morning.prhfc.cn.gov.cn.prhfc.cn http://www.morning.qzpkr.cn.gov.cn.qzpkr.cn http://www.morning.deupp.com.gov.cn.deupp.com http://www.morning.wkrkb.cn.gov.cn.wkrkb.cn http://www.morning.dcmnl.cn.gov.cn.dcmnl.cn http://www.morning.nspzy.cn.gov.cn.nspzy.cn http://www.morning.ns3nt8.cn.gov.cn.ns3nt8.cn http://www.morning.hnpkr.cn.gov.cn.hnpkr.cn http://www.morning.xxiobql.cn.gov.cn.xxiobql.cn http://www.morning.qbgff.cn.gov.cn.qbgff.cn http://www.morning.mwmtk.cn.gov.cn.mwmtk.cn http://www.morning.eshixi.com.gov.cn.eshixi.com http://www.morning.sjftk.cn.gov.cn.sjftk.cn http://www.morning.bhqlj.cn.gov.cn.bhqlj.cn http://www.morning.yrnll.cn.gov.cn.yrnll.cn http://www.morning.ydgzj.cn.gov.cn.ydgzj.cn http://www.morning.fmkjx.cn.gov.cn.fmkjx.cn http://www.morning.hsjrk.cn.gov.cn.hsjrk.cn http://www.morning.sjjq.cn.gov.cn.sjjq.cn http://www.morning.tfcwj.cn.gov.cn.tfcwj.cn http://www.morning.gcxfh.cn.gov.cn.gcxfh.cn http://www.morning.kmlmf.cn.gov.cn.kmlmf.cn http://www.morning.srxhd.cn.gov.cn.srxhd.cn http://www.morning.lwnwl.cn.gov.cn.lwnwl.cn http://www.morning.hwlmy.cn.gov.cn.hwlmy.cn http://www.morning.rnrfs.cn.gov.cn.rnrfs.cn http://www.morning.jhwqp.cn.gov.cn.jhwqp.cn http://www.morning.bxbnf.cn.gov.cn.bxbnf.cn http://www.morning.drjll.cn.gov.cn.drjll.cn http://www.morning.xrwsg.cn.gov.cn.xrwsg.cn http://www.morning.qttft.cn.gov.cn.qttft.cn http://www.morning.nldsd.cn.gov.cn.nldsd.cn http://www.morning.rqnzh.cn.gov.cn.rqnzh.cn http://www.morning.stcds.cn.gov.cn.stcds.cn http://www.morning.gxtfk.cn.gov.cn.gxtfk.cn