有账号密码网站怎么建设,呼伦贝尔网站建设公司,图片制作器app,网站建设需求指引系列文章目录
spark第一章#xff1a;环境安装 spark第二章#xff1a;sparkcore实例 spark第三章#xff1a;工程化代码 文章目录系列文章目录前言一、三层架构二、拆分WordCount1.三层拆分2.代码抽取总结前言
我们上一次博客#xff0c;完成了一些案例的练习#xff0…系列文章目录
spark第一章环境安装 spark第二章sparkcore实例 spark第三章工程化代码 文章目录系列文章目录前言一、三层架构二、拆分WordCount1.三层拆分2.代码抽取总结前言
我们上一次博客完成了一些案例的练习现在我要要进行一些结构上的完善上一次的案例中代码的耦合性非常高想要修改就十分复杂而且有很多代码都在重复使用我们想要把一些重复的代码抽取出来进而完成解耦合的操作提高代码的复用。 一、三层架构
大数据的三层架构其中包括 controller(控制层)负责调度各模块 service(服务层)存放逻辑代码 dao(持久层)进行文件交互 现在我们分别给各层创建一个包 解释一下其中几个 application:项目的启动文件 bean:存放实体类 common:存放这个项目的通用代码 util:存放通用代码所有项目均可
二、拆分WordCount
万物皆可WordCount我们就以上次的WordCount为例操作。放一下源代码
object WordCount {def main(args: Array[String]): Unit {// 创建 Spark 运行配置对象val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(WordCount)// 创建 Spark 上下文环境对象连接对象val sc : SparkContext new SparkContext(sparkConf)// 读取文件 获取一行一行的数据val lines: RDD[String] sc.textFile(datas/word.txt)// 将一行数据进行拆分val words: RDD[String] lines.flatMap(_.split( ))// 将数据根据单次进行分组便于统计val wordToOne: RDD[(String, Int)] words.map(word (word, 1))// 对分组后的数据进行转换val wordToSum: RDD[(String, Int)] wordToOne.reduceByKey(_ _)// 打印输出val array: Array[(String, Int)] wordToSum.collect()array.foreach(println)sc.stop()}}1.三层拆分
在进行数据抽取之前我们先进行简单的三层架构拆分 记得把包名路径换成自己的 WordCountDao.scala 负责文件交互也就是第一步的读取文件
package com.atguigu.bigdata.spark.core.rdd.framework1.daoimport com.atguigu.bigdata.spark.core.rdd.framework1.application.WordCountApplication.scclass WordCountDao {def readFile(path:String) {sc.textFile(path)}
}WordCountService.scala 负责逻辑运算
package com.atguigu.bigdata.spark.core.rdd.framework1.serviceimport com.atguigu.bigdata.spark.core.rdd.framework1.dao.WordCountDaoimport org.apache.spark.rdd.RDDclass WordCountService {private val wordCountDao new WordCountDao()def dataAnalysis(): Array[(String, Int)] {val lines: RDD[String] wordCountDao.readFile(datas/word.txt)val words: RDD[String] lines.flatMap(_.split( ))val wordToOne: RDD[(String, Int)] words.map(word (word, 1))val wordToSum: RDD[(String, Int)] wordToOne.reduceByKey(_ _)val array: Array[(String, Int)] wordToSum.collect()array}
}WordCountController.scala 负责调度项目
package com.atguigu.bigdata.spark.core.rdd.framework1.controllerimport com.atguigu.bigdata.spark.core.rdd.framework1.service.WordCountServiceclass WordCountController {private val wordCountService new WordCountService()def dispath(): Unit {val arraywordCountService.dataAnalysis()array.foreach(println)}
}WordCountApplication.scala main方法启动项目
package com.atguigu.bigdata.spark.core.rdd.framework1.applicationimport com.atguigu.bigdata.spark.core.rdd.framework1.controller.WordCountController
import org.apache.spark.{SparkConf, SparkContext}object WordCountApplication extends App {val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(WordCount)val sc : SparkContext new SparkContext(sparkConf)val controller new WordCountController()controller.dispath()sc.stop()
}2.代码抽取
接下来我们把一些常用或者会重复实用的代码抽取出来。 创建四个Train用来抽取四个文件 TApplication.scala 其中通用代码为环境创建
package com.atguigu.bigdata.spark.core.rdd.framework.commonimport com.atguigu.bigdata.spark.core.rdd.framework.util.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}trait TApplication {def start(master: Stringlocal[*], app: StringApplication)(op: Unit): Unit {val sparkConf: SparkConf new SparkConf().setMaster(master).setAppName(app)val sc : SparkContext new SparkContext(sparkConf)EnvUtil.put(sc)try {op}catch {case exprintln(ex.getMessage)}sc.stop()EnvUtil.clear()}
}
TController.scala 定义调度Train之后由Controller进行重写
package com.atguigu.bigdata.spark.core.rdd.framework.commontrait TController {def dispatch():Unit
}TDao.scala WordCount通用读取路径为参数
package com.atguigu.bigdata.spark.core.rdd.framework.commonimport com.atguigu.bigdata.spark.core.rdd.framework.util.EnvUtil
import org.apache.spark.rdd.RDDtrait TDao {def readFile(path:String): RDD[String] {EnvUtil.take().textFile(path)}
}
TService.scala 和Controller类似由Service重写
package com.atguigu.bigdata.spark.core.rdd.framework.commontrait TService {def dataAnalysis():Any
}定义环境确保所有类都能访问sc线程 EnvUtil.scala
package com.atguigu.bigdata.spark.core.rdd.framework.utilimport org.apache.spark.SparkContextobject EnvUtil {private val scLocal new ThreadLocal[SparkContext]()def put(sc:SparkContext): Unit {scLocal.set(sc)}def take(): SparkContext {scLocal.get()}def clear(): Unit {scLocal.remove()}
}修改三层架构 WordCountApplication.scala
package com.atguigu.bigdata.spark.core.rdd.framework.applicationimport com.atguigu.bigdata.spark.core.rdd.framework.common.TApplication
import com.atguigu.bigdata.spark.core.rdd.framework.controller.WordCountControllerobject WordCountApplication extends App with TApplication{start(){val controller new WordCountController()controller.dispatch()}}
WordCountController.scala
package com.atguigu.bigdata.spark.core.rdd.framework.controllerimport com.atguigu.bigdata.spark.core.rdd.framework.common.TController
import com.atguigu.bigdata.spark.core.rdd.framework.service.WordCountServiceclass WordCountController extends TController{private val WordCountService new WordCountService()def dispatch(): Unit {val array: Array[(String, Int)] WordCountService.dataAnalysis()array.foreach(println)}
}WordCountDao.scala
package com.atguigu.bigdata.spark.core.rdd.framework.daoimport com.atguigu.bigdata.spark.core.rdd.framework.common.TDaoclass WordCountDao extends TDao{}
WordCountService.scala
package com.atguigu.bigdata.spark.core.rdd.framework.serviceimport com.atguigu.bigdata.spark.core.rdd.framework.common.TService
import com.atguigu.bigdata.spark.core.rdd.framework.dao.WordCountDao
import org.apache.spark.rdd.RDDclass WordCountService extends TService{private val wordCountDaonew WordCountDao()def dataAnalysis(): Array[(String, Int)] {val lines: RDD[String] wordCountDao.readFile(datas/word.txt)val words: RDD[String] lines.flatMap(_.split( ))val wordToOne: RDD[(String, Int)] words.map(word (word, 1))val wordToSum: RDD[(String, Int)] wordToOne.reduceByKey(_ _)val array: Array[(String, Int)] wordToSum.collect()array}}
再次运行 总结
对spark项目代码的规范就到这里确实有点复杂我也不知道说清楚没有。