免费手机h5模板网站模板,seo谷歌外贸推广,网站建设学什么书,互联网平台怎么建立概览
技术方案#xff1a;
日志采集服务#xff1a;通过利用Flume-ng对业务平台中用户对于电影的一次评分行为进行采集#xff0c;实时发送到Kafka集群。消息缓冲服务#xff1a;项目采用Kafka作为流式数据的缓存组件#xff0c;接受来自Flume的数据采集请求。并将数据推…概览
技术方案
日志采集服务通过利用Flume-ng对业务平台中用户对于电影的一次评分行为进行采集实时发送到Kafka集群。消息缓冲服务项目采用Kafka作为流式数据的缓存组件接受来自Flume的数据采集请求。并将数据推送到项目的实时推荐系统部分。实时推荐服务项目采用Spark Streaming作为实时推荐系统通过接收Kafka中缓存的数据通过设计的推荐算法实现对实时推荐的数据处理并将结构合并更新到MongoDB数据库。
1. 实现思路
我们应该如何实现
首先应该redis安装这里存储用户的第K次评分用户评分存入redis中安装zookeeper安装kafka都是standlone模式测试Kafka与Spark Streaming 联调。Kafka生产一条数据Spark Streaming 可以消费成功,并根据redis中的数据和MongoDB数据进行推荐存入MongoDB中在业务系统写埋点信息测试时写入本地文件之后再远程测试写入云服务器log文件中flume配置文件书写kafka创建两个topic对整个过程进行测试
2 环境准备
1.1 redis 安装
redis安装redis安装密码123456存入redis一些数据 lpush uid:1 mid:scoreredis 教程教程
1.2 zookeeper单机版安装
zookeeper安装zookeeper安装版本3.7.1遇到的坑8080端口连接占用我们需要在zoo.cpg文件中加上 admin.serverPort8001重新启动即可。
1.3 kafka单机安装
kafka安装官网下载地址安装使用的为127.0.0.1启动kafkakafka教程
bin/kafka-server-start.sh config/server.properties创建一个topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic recommender生产一个消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic recommender消费一个消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic recommender --from-beginning3 测试kafka与spark streaming联调
kafka版本2.2.0spark版本2.3.0因此使用spark-streaming-kafka-0-10 启动kafka生产一条信息书写程序
// 定义kafka连接参数val kafkaParam Map(bootstrap.servers - 服务器IP:9092,key.deserializer - classOf[StringDeserializer],value.deserializer - classOf[StringDeserializer],group.id - recommender,auto.offset.reset - latest)// 通过kafka创建一个DStreamval kafkaStream KafkaUtils.createDirectStream[String, String]( ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String]( Array(config(kafka.topic)), kafkaParam ))// 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流// 1|31|4.5|val ratingStream kafkaStream.map{msg val attr msg.value().split(\\|)( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )}若是kafka报错如果你同样也是云服务器请注意kafka的配置信息很重要
1解决方法修改kafka配置文件设置为设置listeners为内网ip设置外网ip
解决方案修改内网ip
2重新启动成功
内网外网分流内网外网分流kafka入门教程入门教程
redis报错开启保护模式了需要修改conf文件
效果
在kafka生产一个数据可以在MongoDB中得到推荐的电影结果
4 后端埋点
前端进行评分后触发click事件后端进行测试埋点利用log4j写入本地文件中。
4.1 本地测试
log4j配置文件
log4j.rootLoggerINFO, file, stdout# write to stdout
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n# write to file
log4j.appender.fileorg.apache.log4j.RollingFileAppender
log4j.appender.FILE.Appendtrue
log4j.appender.FILE.ThresholdINFO
log4j.appender.file.FileF:/demoparent/business/src/main/log/agent.txt
log4j.appender.file.MaxFileSize1024KB
log4j.appender.file.MaxBackupIndex1
log4j.appender.file.layoutorg.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n埋点实现
//埋点日志
import org.apache.log4j.Logger;// 关键代码
Logger log Logger.getLogger(MovieController.class.getName());
log.info(MOVIE_RATING_PREFIX : uid | mid | score | System.currentTimeMillis()/1000)4.2 写入远程测试
Linux安装syslog服务进行测试主机log4j配置文件设置服务器ip
log4j配置写入远程服务器
log4j.appender.syslogorg.apache.log4j.net.SyslogAppender
log4j.appender.syslog.SyslogHost 服务器IP
log4j.appender.syslog.ThresholdINFO
log4j.appender.syslog.layoutorg.apache.log4j.PatternLayout
log4j.appender.syslog.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%20t] %-130c:(line:%4L) : %m%n5 flume配置
flume对接kafkaflume对接文件flume设置source和sinksource为文件地址sink为kafka的log
# log-kafka.properties
agent.sources exectail
agent.channels memoryChannel
agent.sinks kafkasink
agent.sources.exectail.type exec
agent.sources.exectail.command tail -f /project/logs/agent.log agent.sources.exectail.interceptorsi1 agent.sources.exectail.interceptors.i1.typeregex_filter agent.sources.exectail.interceptors.i1.regex.MOVIE_RATING_PREFIX. agent.sources.exectail.channels memoryChannelagent.sinks.kafkasink.type org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkasink.kafka.topic log agent.sinks.kafkasink.kafka.bootstrap.servers 服务器地址:9092 agent.sinks.kafkasink.kafka.producer.acks 1 agent.sinks.kafkasink.kafka.flumeBatchSize 20 agent.sinks.kafkasink.channel memoryChannel
agent.channels.memoryChannel.type memory
agent.channels.memoryChannel.capacity 100006 实时推荐
ratingStream.foreachRDD{rdds rdds.foreach{case (uid, mid, score, timestamp) {println(rating data coming! )println(uid,mid:mid)// 1. 从redis里获取当前用户最近的K次评分保存成Array[(mid, score)]val userRecentlyRatings getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )println(用户最近的K次评分:userRecentlyRatings)// 2. 从相似度矩阵中取出当前电影最相似的N个电影作为备选列表Array[mid]val candidateMovies getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )println(电影最相似的N个电影:candidateMovies)// 3. 对每个备选电影计算推荐优先级得到当前用户的实时推荐列表Array[(mid, score)]val streamRecs computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )println(当前用户的实时推荐列表:streamRecs)// 4. 把推荐数据保存到mongodbsaveDataToMongoDB( uid, streamRecs )}}
}def computeMovieScores(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] {// 定义一个ArrayBuffer用于保存每一个备选电影的基础得分val scores scala.collection.mutable.ArrayBuffer[(Int, Double)]()// 定义一个HashMap保存每一个备选电影的增强减弱因子val increMap scala.collection.mutable.HashMap[Int, Int]()val decreMap scala.collection.mutable.HashMap[Int, Int]()for( candidateMovie - candidateMovies; userRecentlyRating - userRecentlyRatings){// 拿到备选电影和最近评分电影的相似度val simScore getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )if(simScore 0.7){// 计算备选电影的基础推荐得分scores ( (candidateMovie, simScore * userRecentlyRating._2) )if( userRecentlyRating._2 3 ){increMap(candidateMovie) increMap.getOrDefault(candidateMovie, 0) 1} else{decreMap(candidateMovie) decreMap.getOrDefault(candidateMovie, 0) 1}}}// 根据备选电影的mid做groupby根据公式去求最后的推荐评分scores.groupBy(_._1).map{// groupBy之后得到的数据 Map( mid - ArrayBuffer[(mid, score)] )case (mid, scoreList) ( mid, scoreList.map(_._2).sum / scoreList.length log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )}.toArray.sortWith(_._2_._2)
}7 启动顺序
启动hadoop、spark的容器
cd /dockerdocker-compose up -ddocker-compose ps
启动mongodb和redis服务
netstat -lanp | grep 27017bin/redis-server etc/redis.conf
启动zookeeper、kafka服务
./zkServer.sh startbin/kafka-server-start.sh config/server.properties
启动flume服务
bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent
实现效果
前端评分成功后写入日志文件flume对接log日志文件无问题kafka对接flume无问题spark streaming处理收到的一条数据进行推荐存入MongoDB中。 总结
由于时间匆忙写的有些匆忙如果有需要前端设计代码和后端的代码可以评论我我整理整理发到github上。
前端设计部分没有时间去详细做后续再对前端页面进行美化。本科当时整合了一个管理系统现在也没有时间做总之一周多时间把当时的系统快速复现了下算是一个复习。
在进行开发时遇到许多问题版本问题、服务器内网外网问题、docker容器相关问题、协同过滤算法设计问题但帮着自己复习了下Vue和SpringBoot。
遇到问题时
遇到问题不应该盲目解决应该静下心看看报错原因想想为何报错版本尤其重要因此最好在一个project的pom设定版本使用服务器搭建docker-compose利用该方法来搭建集群快速简单但涉及的端口转发等一些网络知识需要耐下心来看Vue-CliElement-ui搭配起来开发简单写程序时我们应该提前约定好接口否则后续会很混乱…
后续
后续将优化下前端页面设计更多功能改进推荐算法增加冷启动方案 文章转载自: http://www.morning.wnnlr.cn.gov.cn.wnnlr.cn http://www.morning.lcmhq.cn.gov.cn.lcmhq.cn http://www.morning.kfrhh.cn.gov.cn.kfrhh.cn http://www.morning.fkyrk.cn.gov.cn.fkyrk.cn http://www.morning.rccpl.cn.gov.cn.rccpl.cn http://www.morning.lptjt.cn.gov.cn.lptjt.cn http://www.morning.bkslb.cn.gov.cn.bkslb.cn http://www.morning.lltdf.cn.gov.cn.lltdf.cn http://www.morning.kzxlc.cn.gov.cn.kzxlc.cn http://www.morning.nrmyj.cn.gov.cn.nrmyj.cn http://www.morning.vattx.cn.gov.cn.vattx.cn http://www.morning.bfgpn.cn.gov.cn.bfgpn.cn http://www.morning.rtjhw.cn.gov.cn.rtjhw.cn http://www.morning.jxcwn.cn.gov.cn.jxcwn.cn http://www.morning.trrpb.cn.gov.cn.trrpb.cn http://www.morning.hongjp.com.gov.cn.hongjp.com http://www.morning.ztnmc.cn.gov.cn.ztnmc.cn http://www.morning.ljsxg.cn.gov.cn.ljsxg.cn http://www.morning.qsmch.cn.gov.cn.qsmch.cn http://www.morning.qbzfp.cn.gov.cn.qbzfp.cn http://www.morning.fbfnk.cn.gov.cn.fbfnk.cn http://www.morning.rdgb.cn.gov.cn.rdgb.cn http://www.morning.tclqf.cn.gov.cn.tclqf.cn http://www.morning.trzzm.cn.gov.cn.trzzm.cn http://www.morning.bwjgb.cn.gov.cn.bwjgb.cn http://www.morning.dxsyp.cn.gov.cn.dxsyp.cn http://www.morning.rhdqz.cn.gov.cn.rhdqz.cn http://www.morning.tsnwf.cn.gov.cn.tsnwf.cn http://www.morning.rftk.cn.gov.cn.rftk.cn http://www.morning.ktqtf.cn.gov.cn.ktqtf.cn http://www.morning.dschz.cn.gov.cn.dschz.cn http://www.morning.pgfkl.cn.gov.cn.pgfkl.cn http://www.morning.kyjyt.cn.gov.cn.kyjyt.cn http://www.morning.flhnd.cn.gov.cn.flhnd.cn http://www.morning.smtrp.cn.gov.cn.smtrp.cn http://www.morning.zzfjh.cn.gov.cn.zzfjh.cn http://www.morning.qbgdy.cn.gov.cn.qbgdy.cn http://www.morning.fhrgk.cn.gov.cn.fhrgk.cn http://www.morning.xdhcr.cn.gov.cn.xdhcr.cn http://www.morning.dnydy.cn.gov.cn.dnydy.cn http://www.morning.rdwm.cn.gov.cn.rdwm.cn http://www.morning.jlxld.cn.gov.cn.jlxld.cn http://www.morning.xnbd.cn.gov.cn.xnbd.cn http://www.morning.saletj.com.gov.cn.saletj.com http://www.morning.fnjrh.cn.gov.cn.fnjrh.cn http://www.morning.bpmtz.cn.gov.cn.bpmtz.cn http://www.morning.jkcpl.cn.gov.cn.jkcpl.cn http://www.morning.nqgds.cn.gov.cn.nqgds.cn http://www.morning.shawls.com.cn.gov.cn.shawls.com.cn http://www.morning.rkxk.cn.gov.cn.rkxk.cn http://www.morning.zylrk.cn.gov.cn.zylrk.cn http://www.morning.bntfy.cn.gov.cn.bntfy.cn http://www.morning.qhmgq.cn.gov.cn.qhmgq.cn http://www.morning.xfcjs.cn.gov.cn.xfcjs.cn http://www.morning.kfqzd.cn.gov.cn.kfqzd.cn http://www.morning.nj-ruike.cn.gov.cn.nj-ruike.cn http://www.morning.ytfr.cn.gov.cn.ytfr.cn http://www.morning.xrqkm.cn.gov.cn.xrqkm.cn http://www.morning.fkffr.cn.gov.cn.fkffr.cn http://www.morning.xnwjt.cn.gov.cn.xnwjt.cn http://www.morning.xqbgm.cn.gov.cn.xqbgm.cn http://www.morning.rlcqx.cn.gov.cn.rlcqx.cn http://www.morning.rkdnm.cn.gov.cn.rkdnm.cn http://www.morning.rqfzp.cn.gov.cn.rqfzp.cn http://www.morning.dnjwm.cn.gov.cn.dnjwm.cn http://www.morning.rscrj.cn.gov.cn.rscrj.cn http://www.morning.hkpn.cn.gov.cn.hkpn.cn http://www.morning.jsdntd.com.gov.cn.jsdntd.com http://www.morning.pxsn.cn.gov.cn.pxsn.cn http://www.morning.hnkkm.cn.gov.cn.hnkkm.cn http://www.morning.kcyxs.cn.gov.cn.kcyxs.cn http://www.morning.ldynr.cn.gov.cn.ldynr.cn http://www.morning.pqryw.cn.gov.cn.pqryw.cn http://www.morning.wbns.cn.gov.cn.wbns.cn http://www.morning.nyqb.cn.gov.cn.nyqb.cn http://www.morning.nqxdg.cn.gov.cn.nqxdg.cn http://www.morning.syssdz.cn.gov.cn.syssdz.cn http://www.morning.yrwqz.cn.gov.cn.yrwqz.cn http://www.morning.hkgcx.cn.gov.cn.hkgcx.cn http://www.morning.clndl.cn.gov.cn.clndl.cn