英语可以做推广的亲子类网站,做外贸网站能用虚拟主机吗,中职电子商务专业就业方向,PHP网站开发技术期末作品使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
在本文中#xff0c;将介绍如何构建一个实时数据pipeline#xff0c;从MySQL数据库读取数据#xff0c;通过Kafka传输数据#xff0c;最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能#…使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
在本文中将介绍如何构建一个实时数据pipeline从MySQL数据库读取数据通过Kafka传输数据最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能以及Kafka和HDFS作为我们的数据传输和存储工具。 1、环境设置 首先确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdspark_project/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingscala.version2.12.12/scala.versionspark.version3.2.0/spark.versionkafka.version2.8.1/kafka.version/propertiesdependencies!-- Spark dependencies --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.76/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependency!-- Kafka dependencies --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion${kafka.version}/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.28/version/dependency!-- Scala library --dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion${scala.version}/version/dependency /dependencies
/projectmysql中表结构
2、从MySQL读取数据到Kafka 我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据并将其转换为JSON格式然后将数据写入到Kafka主题中。以下是相应的Scala代码
package org.example.mysql2kafka2hdfsimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject Mysql2Kafka {def main(args: Array[String]): Unit {// 创建 SparkSessionval spark SparkSession.builder().appName(MySQLToKafka).master(local[*]).getOrCreate()// 设置 MySQL 连接属性val mysqlProps new Properties()mysqlProps.setProperty(user, root)mysqlProps.setProperty(password, 12345678)mysqlProps.setProperty(driver, com.mysql.jdbc.Driver)// 从 MySQL 数据库中读取数据val jdbcDF spark.read.jdbc(jdbc:mysql://localhost:3306/mydb, comment, mysqlProps)// 将 DataFrame 转换为 JSON 字符串val jsonDF jdbcDF.selectExpr(to_json(struct(*)) AS value)// 将数据写入 KafkajsonDF.show()jsonDF.write.format(kafka).option(kafka.bootstrap.servers, localhost:9092).option(topic, comment).save()// 停止 SparkSessionspark.stop()}}
以上代码首先创建了一个SparkSession然后设置了连接MySQL所需的属性。接着它使用jdbc.read从MySQL数据库中读取数据并将数据转换为JSON格式最后将数据写入到名为comment的Kafka主题中。提示topic主题会被自动创建。
从Kafka消费数据并写入HDFS 接下来我们将设置Spark Streaming来消费Kafka中的数据并将数据保存到HDFS中。以下是相应的Scala代码
package org.example.mysql2kafka2hdfsimport com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}case class Comment(author_name:String,fans:String,comment_text:String,comment_time:String,location:String,user_gender:String)object kafka2Hdfs {def main(args: Array[String]): Unit {// 设置 SparkConfval sparkConf new SparkConf().setAppName(KafkaToHDFS).setMaster(local[*])// 创建 StreamingContext每秒处理一次val ssc new StreamingContext(sparkConf, Seconds(1))// 设置 Kafka 相关参数val kafkaParams Map[String, Object](bootstrap.servers - localhost:9092, // Kafka broker 地址key.deserializer - classOf[StringDeserializer],value.deserializer - classOf[StringDeserializer],group.id - spark-consumer-group, // Spark 消费者组auto.offset.reset - earliest, // 从最新的偏移量开始消费enable.auto.commit - (false: java.lang.Boolean) // 不自动提交偏移量)// 设置要订阅的 Kafka 主题val topics Array(comment)// 创建 Kafka Direct Streamval stream KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 从 Kafka 中读取消息然后将其写入 HDFSstream.map({rddval comment JSON.parseObject(rdd.toString(), classOf[Comment])comment.author_name,comment.comment_text,comment.comment_time,comment.fans,comment.location,comment.user_gender}).foreachRDD { rdd if (!rdd.isEmpty()) {println(rdd)rdd.saveAsTextFile(hdfs://hadoop101:8020/tmp/)}}// 启动 Spark Streamingssc.start()ssc.awaitTermination()}}
以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象并将其保存为逗号分隔的文本文件最终存储在HDFS的/tmp目录中。
结论 通过本文的介绍和示例代码您现在应该了解如何使用Apache Spark构建一个实时数据流水线从MySQL数据库读取数据通过Kafka传输数据最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。
**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业学生毕设等。不限于pythonjava大数据模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 ** 文章转载自: http://www.morning.qpsdq.cn.gov.cn.qpsdq.cn http://www.morning.kxyqy.cn.gov.cn.kxyqy.cn http://www.morning.fhsgw.cn.gov.cn.fhsgw.cn http://www.morning.dtnyl.cn.gov.cn.dtnyl.cn http://www.morning.twhgn.cn.gov.cn.twhgn.cn http://www.morning.dmwjl.cn.gov.cn.dmwjl.cn http://www.morning.tknqr.cn.gov.cn.tknqr.cn http://www.morning.yqjjn.cn.gov.cn.yqjjn.cn http://www.morning.wfkbk.cn.gov.cn.wfkbk.cn http://www.morning.kqgqy.cn.gov.cn.kqgqy.cn http://www.morning.qkgwz.cn.gov.cn.qkgwz.cn http://www.morning.dpdr.cn.gov.cn.dpdr.cn http://www.morning.ssgqc.cn.gov.cn.ssgqc.cn http://www.morning.pgrsf.cn.gov.cn.pgrsf.cn http://www.morning.ffdyy.cn.gov.cn.ffdyy.cn http://www.morning.lcwhn.cn.gov.cn.lcwhn.cn http://www.morning.ccyjt.cn.gov.cn.ccyjt.cn http://www.morning.nzsdr.cn.gov.cn.nzsdr.cn http://www.morning.pbzlh.cn.gov.cn.pbzlh.cn http://www.morning.nbnq.cn.gov.cn.nbnq.cn http://www.morning.rkdnm.cn.gov.cn.rkdnm.cn http://www.morning.gnbtp.cn.gov.cn.gnbtp.cn http://www.morning.tmrjb.cn.gov.cn.tmrjb.cn http://www.morning.tgnwt.cn.gov.cn.tgnwt.cn http://www.morning.lgnbr.cn.gov.cn.lgnbr.cn http://www.morning.rmpkn.cn.gov.cn.rmpkn.cn http://www.morning.mftzm.cn.gov.cn.mftzm.cn http://www.morning.cylbs.cn.gov.cn.cylbs.cn http://www.morning.qfkdt.cn.gov.cn.qfkdt.cn http://www.morning.ypktc.cn.gov.cn.ypktc.cn http://www.morning.nlqmp.cn.gov.cn.nlqmp.cn http://www.morning.ssqrd.cn.gov.cn.ssqrd.cn http://www.morning.sqhlx.cn.gov.cn.sqhlx.cn http://www.morning.ryysc.cn.gov.cn.ryysc.cn http://www.morning.jydky.cn.gov.cn.jydky.cn http://www.morning.qfnrx.cn.gov.cn.qfnrx.cn http://www.morning.rqrh.cn.gov.cn.rqrh.cn http://www.morning.zbjfq.cn.gov.cn.zbjfq.cn http://www.morning.bnzjx.cn.gov.cn.bnzjx.cn http://www.morning.drndl.cn.gov.cn.drndl.cn http://www.morning.gqtxz.cn.gov.cn.gqtxz.cn http://www.morning.qbdqc.cn.gov.cn.qbdqc.cn http://www.morning.zrjzc.cn.gov.cn.zrjzc.cn http://www.morning.wkxsy.cn.gov.cn.wkxsy.cn http://www.morning.qkrgk.cn.gov.cn.qkrgk.cn http://www.morning.hjwzpt.com.gov.cn.hjwzpt.com http://www.morning.ailvturv.com.gov.cn.ailvturv.com http://www.morning.lqgfm.cn.gov.cn.lqgfm.cn http://www.morning.nnpwg.cn.gov.cn.nnpwg.cn http://www.morning.tbstj.cn.gov.cn.tbstj.cn http://www.morning.nhbhc.cn.gov.cn.nhbhc.cn http://www.morning.fndfn.cn.gov.cn.fndfn.cn http://www.morning.gbqgr.cn.gov.cn.gbqgr.cn http://www.morning.yqgbw.cn.gov.cn.yqgbw.cn http://www.morning.gbcxb.cn.gov.cn.gbcxb.cn http://www.morning.geledi.com.gov.cn.geledi.com http://www.morning.trsdm.cn.gov.cn.trsdm.cn http://www.morning.qnzld.cn.gov.cn.qnzld.cn http://www.morning.cbczs.cn.gov.cn.cbczs.cn http://www.morning.rnzjc.cn.gov.cn.rnzjc.cn http://www.morning.fylqz.cn.gov.cn.fylqz.cn http://www.morning.rzczl.cn.gov.cn.rzczl.cn http://www.morning.lxkhx.cn.gov.cn.lxkhx.cn http://www.morning.jgnjl.cn.gov.cn.jgnjl.cn http://www.morning.tphrx.cn.gov.cn.tphrx.cn http://www.morning.cwznh.cn.gov.cn.cwznh.cn http://www.morning.wqsjx.cn.gov.cn.wqsjx.cn http://www.morning.qmbtn.cn.gov.cn.qmbtn.cn http://www.morning.fgsqz.cn.gov.cn.fgsqz.cn http://www.morning.syglx.cn.gov.cn.syglx.cn http://www.morning.xxzjb.cn.gov.cn.xxzjb.cn http://www.morning.tfgkq.cn.gov.cn.tfgkq.cn http://www.morning.rkdnm.cn.gov.cn.rkdnm.cn http://www.morning.rhsg.cn.gov.cn.rhsg.cn http://www.morning.cljpz.cn.gov.cn.cljpz.cn http://www.morning.cgtrz.cn.gov.cn.cgtrz.cn http://www.morning.jqllx.cn.gov.cn.jqllx.cn http://www.morning.fwkjp.cn.gov.cn.fwkjp.cn http://www.morning.drmbh.cn.gov.cn.drmbh.cn http://www.morning.rqxhp.cn.gov.cn.rqxhp.cn