程序员就是做网站的吗,做微信小程序需要什么技术,绵阳高端网站建设,福田公司名称及地址文章最前#xff1a; 我是Octopus#xff0c;这个名字来源于我的中文名--章鱼#xff1b;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github #xff1b;这博客是记录我学习的点点滴滴#xff0c;如果您对 Python、Java、AI、算法有兴趣#xff0c;可以关注我的… 文章最前 我是Octopus这个名字来源于我的中文名--章鱼我热爱编程、热爱算法、热爱开源。所有源码在我的个人github 这博客是记录我学习的点点滴滴如果您对 Python、Java、AI、算法有兴趣可以关注我的动态一起学习共同进步。 相关文章
PySpark 概述Spark连接快速入门Spark上使用pandas API快速入门 创建pyspark对象 import warnings
warnings.filterwarnings(ignore)
#import pandas as pd
#import numpy as np
from datetime import timedelta, date, datetime
import time
import gc
import os
import argparse
import sysfrom pyspark.sql import SparkSession, functions as fn
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import *
from pyspark import StorageLevel
spark SparkSession \.builder \.appName(stockout_test) \.config(hive.exec.dynamic.partition.mode, nonstrict) \.config(spark.sql.sources.partitionOverwriteMode, dynamic)\.config(spark.driver.memory, 20g)\.config(spark.executor.memory, 40g)\.config(spark.yarn.executor.memoryOverhead, 1g)\.config(spark.executor.instances, 8)\.config(spark.executor.cores, 8)\.config(spark.kryoserializer.buffer.max, 128m)\.config(spark.yarn.queue, root.algo)\.config(spark.executorEnv.OMP_NUM_THREADS, 12)\.config(spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT, 1) \.config(spark.default.parallelism, 800)\.enableHiveSupport() \.getOrCreate()
spark.sql(set hive.exec.dynamic.partition.mode nonstrict)
spark.sql(set hive.exec.dynamic.partitiontrue)
spark.sql(set spark.sql.autoBroadcastJoinThreshold-1) 创建DataFrame employee_salary [(zhangsan, IT, 8000),(lisi, IT, 7000),(wangwu, IT, 7500),(zhaoliu, ALGO, 10000),(qisan, IT, 8000),(bajiu, ALGO, 12000),(james, ALGO, 11000),(wangzai, INCREASE, 7000),(carter, INCREASE, 8000),(kobe, IT, 9000)]columns [name, department, salary]
df spark.createDataFrame(data employee_salary, schema columns)
df.show() ------------------------
| name|department|salary|
------------------------
|zhangsan| IT| 8000|
| lisi| IT| 7000|
| wangwu| IT| 7500|
| zhaoliu| ALGO| 10000|
| qisan| IT| 8000|
| bajiu| ALGO| 12000|
| james| ALGO| 11000|
| wangzai| INCREASE| 7000|
| carter| INCREASE| 8000|
| kobe| IT| 9000|
------------------------
row_number() from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec Window.partitionBy(department).orderBy(F.desc(salary))
df.withColumn(row_number, F.row_number().over(windowSpec)).show(truncateFalse) ----------------------------------
|name |department|salary|row_number|
----------------------------------
|carter |INCREASE |8000 |1 |
|wangzai |INCREASE |7000 |2 |
|kobe |IT |9000 |1 |
|zhangsan|IT |8000 |2 |
|qisan |IT |8000 |3 |
|wangwu |IT |7500 |4 |
|lisi |IT |7000 |5 |
|bajiu |ALGO |12000 |1 |
|james |ALGO |11000 |2 |
|zhaoliu |ALGO |10000 |3 |
----------------------------------
Rank() from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec Window.partitionBy(department).orderBy(F.desc(salary))
df.withColumn(rank,F.rank().over(windowSpec)).show(truncateFalse) ----------------------------
|name |department|salary|rank|
----------------------------
|carter |INCREASE |8000 |1 |
|wangzai |INCREASE |7000 |2 |
|kobe |IT |9000 |1 |
|qisan |IT |8000 |2 |
|zhangsan|IT |8000 |2 |
|wangwu |IT |7500 |4 |
|lisi |IT |7000 |5 |
|bajiu |ALGO |12000 |1 |
|james |ALGO |11000 |2 |
|zhaoliu |ALGO |10000 |3 |
----------------------------
dense_rank() from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec Window.partitionBy(department).orderBy(F.desc(salary))
df.withColumn(dense_rank,F.dense_rank().over(windowSpec)).show() ----------------------------------
| name|department|salary|dense_rank|
----------------------------------
| carter| INCREASE| 8000| 1|
| wangzai| INCREASE| 7000| 2|
| kobe| IT| 9000| 1|
| qisan| IT| 8000| 2|
|zhangsan| IT| 8000| 2|
| wangwu| IT| 7500| 3|
| lisi| IT| 7000| 4|
| bajiu| ALGO| 12000| 1|
| james| ALGO| 11000| 2|
| zhaoliu| ALGO| 10000| 3|
----------------------------------
lag() from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec Window.partitionBy(department).orderBy(F.desc(salary))
df.withColumn(lag,F.lag(salary,1).over(windowSpec)).show() -----------------------------
| name|department|salary| lag|
-----------------------------
| carter| INCREASE| 8000| null|
| wangzai| INCREASE| 7000| 8000|
| kobe| IT| 9000| null|
|zhangsan| IT| 8000| 9000|
| qisan| IT| 8000| 8000|
| wangwu| IT| 7500| 8000|
| lisi| IT| 7000| 7500|
| bajiu| ALGO| 12000| null|
| james| ALGO| 11000|12000|
| zhaoliu| ALGO| 10000|11000|
-----------------------------
lead() from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec Window.partitionBy(department).orderBy(F.desc(salary))
df.withColumn(lead,F.lead(salary, 1).over(windowSpec)).show() -----------------------------
| name|department|salary| lead|
-----------------------------
| carter| INCREASE| 8000| 7000|
| wangzai| INCREASE| 7000| null|
| kobe| IT| 9000| 8000|
|zhangsan| IT| 8000| 8000|
| qisan| IT| 8000| 7500|
| wangwu| IT| 7500| 7000|
| lisi| IT| 7000| null|
| bajiu| ALGO| 12000|11000|
| james| ALGO| 11000|10000|
| zhaoliu| ALGO| 10000| null|
-----------------------------
Aggregate Functions from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec Window.partitionBy(department).orderBy(F.desc(salary))
windowSpecAgg Window.partitionBy(department)df.withColumn(row, F.row_number().over(windowSpec)) \.withColumn(avg, F.avg(salary).over(windowSpecAgg)) \.withColumn(sum, F.sum(salary).over(windowSpecAgg)) \.withColumn(min, F.min(salary).over(windowSpecAgg)) \.withColumn(max, F.max(salary).over(windowSpecAgg)) \.withColumn(count, F.count(salary).over(windowSpecAgg)) \.withColumn(distinct_count, F.approx_count_distinct(salary).over(windowSpecAgg)) \.show() --------------------------------------------------------------------
| name|department|salary|row| avg| sum| min| max|count|distinct_count|
--------------------------------------------------------------------
| carter| INCREASE| 8000| 1| 7500.0|15000| 7000| 8000| 2| 2|
| wangzai| INCREASE| 7000| 2| 7500.0|15000| 7000| 8000| 2| 2|
| kobe| IT| 9000| 1| 7900.0|39500| 7000| 9000| 5| 4|
|zhangsan| IT| 8000| 2| 7900.0|39500| 7000| 9000| 5| 4|
| qisan| IT| 8000| 3| 7900.0|39500| 7000| 9000| 5| 4|
| wangwu| IT| 7500| 4| 7900.0|39500| 7000| 9000| 5| 4|
| lisi| IT| 7000| 5| 7900.0|39500| 7000| 9000| 5| 4|
| bajiu| ALGO| 12000| 1|11000.0|33000|10000|12000| 3| 3|
| james| ALGO| 11000| 2|11000.0|33000|10000|12000| 3| 3|
| zhaoliu| ALGO| 10000| 3|11000.0|33000|10000|12000| 3| 3|
--------------------------------------------------------------------from pyspark.sql.window import Window
import pyspark.sql.functions as F
# 需要注意的是 approx_count_distinct() 函数适用于窗函数的统计
# 而在groupby中通常用countDistinct()来代替该函数用来求组内不重复的数值的条数。
# approx_count_distinct()取的是近似的数值不太准确使用需注意 windowSpec Window.partitionBy(department).orderBy(F.desc(salary))
windowSpecAgg Window.partitionBy(department)df.withColumn(row, F.row_number().over(windowSpec)) \.withColumn(avg, F.avg(salary).over(windowSpecAgg)) \.withColumn(sum, F.sum(salary).over(windowSpecAgg)) \.withColumn(min, F.min(salary).over(windowSpecAgg)) \.withColumn(max, F.max(salary).over(windowSpecAgg)) \.withColumn(count, F.count(salary).over(windowSpecAgg)) \.withColumn(distinct_count, F.approx_count_distinct(salary).over(windowSpecAgg)) \.where(F.col(row)1).select(department,avg,sum,min,max,count,distinct_count) \.show() --------------------------------------------------- |department| avg| sum| min| max|count|distinct_count| --------------------------------------------------- | INCREASE| 7500.0|15000| 7000| 8000| 2| 2| | IT| 7900.0|39500| 7000| 9000| 5| 4| | ALGO|11000.0|33000|10000|12000| 3| 3| --------------------------------------------------- 文章转载自: http://www.morning.tyjp.cn.gov.cn.tyjp.cn http://www.morning.wpspf.cn.gov.cn.wpspf.cn http://www.morning.wwgpy.cn.gov.cn.wwgpy.cn http://www.morning.nfzzf.cn.gov.cn.nfzzf.cn http://www.morning.wpmlp.cn.gov.cn.wpmlp.cn http://www.morning.mdjzydr.com.gov.cn.mdjzydr.com http://www.morning.rwlns.cn.gov.cn.rwlns.cn http://www.morning.nlbw.cn.gov.cn.nlbw.cn http://www.morning.fwcjy.cn.gov.cn.fwcjy.cn http://www.morning.eshixi.com.gov.cn.eshixi.com http://www.morning.plwfx.cn.gov.cn.plwfx.cn http://www.morning.lizimc.com.gov.cn.lizimc.com http://www.morning.pphbn.cn.gov.cn.pphbn.cn http://www.morning.rgyts.cn.gov.cn.rgyts.cn http://www.morning.wqjpl.cn.gov.cn.wqjpl.cn http://www.morning.nqrdx.cn.gov.cn.nqrdx.cn http://www.morning.byjwl.cn.gov.cn.byjwl.cn http://www.morning.knlgk.cn.gov.cn.knlgk.cn http://www.morning.snzgg.cn.gov.cn.snzgg.cn http://www.morning.bpwdc.cn.gov.cn.bpwdc.cn http://www.morning.ityi666.cn.gov.cn.ityi666.cn http://www.morning.rbhcx.cn.gov.cn.rbhcx.cn http://www.morning.wanjia-sd.com.gov.cn.wanjia-sd.com http://www.morning.yrck.cn.gov.cn.yrck.cn http://www.morning.bxbkq.cn.gov.cn.bxbkq.cn http://www.morning.nqgff.cn.gov.cn.nqgff.cn http://www.morning.zkjqj.cn.gov.cn.zkjqj.cn http://www.morning.zlgr.cn.gov.cn.zlgr.cn http://www.morning.mfmrg.cn.gov.cn.mfmrg.cn http://www.morning.wqnc.cn.gov.cn.wqnc.cn http://www.morning.mdtfh.cn.gov.cn.mdtfh.cn http://www.morning.nqrdx.cn.gov.cn.nqrdx.cn http://www.morning.prgrh.cn.gov.cn.prgrh.cn http://www.morning.gmjkn.cn.gov.cn.gmjkn.cn http://www.morning.kydrb.cn.gov.cn.kydrb.cn http://www.morning.drmbh.cn.gov.cn.drmbh.cn http://www.morning.dnqlba.cn.gov.cn.dnqlba.cn http://www.morning.webpapua.com.gov.cn.webpapua.com http://www.morning.gkgr.cn.gov.cn.gkgr.cn http://www.morning.cnqdn.cn.gov.cn.cnqdn.cn http://www.morning.blbys.cn.gov.cn.blbys.cn http://www.morning.fbylq.cn.gov.cn.fbylq.cn http://www.morning.hjjhjhj.com.gov.cn.hjjhjhj.com http://www.morning.mmxt.cn.gov.cn.mmxt.cn http://www.morning.jhxdj.cn.gov.cn.jhxdj.cn http://www.morning.wphfl.cn.gov.cn.wphfl.cn http://www.morning.hmfxl.cn.gov.cn.hmfxl.cn http://www.morning.hbjqn.cn.gov.cn.hbjqn.cn http://www.morning.wztnh.cn.gov.cn.wztnh.cn http://www.morning.pclgj.cn.gov.cn.pclgj.cn http://www.morning.krtky.cn.gov.cn.krtky.cn http://www.morning.sloxdub.cn.gov.cn.sloxdub.cn http://www.morning.tqpnf.cn.gov.cn.tqpnf.cn http://www.morning.ztcxx.com.gov.cn.ztcxx.com http://www.morning.xcszl.cn.gov.cn.xcszl.cn http://www.morning.ccyns.cn.gov.cn.ccyns.cn http://www.morning.lrjtx.cn.gov.cn.lrjtx.cn http://www.morning.pffqh.cn.gov.cn.pffqh.cn http://www.morning.qbwyd.cn.gov.cn.qbwyd.cn http://www.morning.swkpq.cn.gov.cn.swkpq.cn http://www.morning.jggr.cn.gov.cn.jggr.cn http://www.morning.xqxrm.cn.gov.cn.xqxrm.cn http://www.morning.kqgqy.cn.gov.cn.kqgqy.cn http://www.morning.wjxyg.cn.gov.cn.wjxyg.cn http://www.morning.sftrt.cn.gov.cn.sftrt.cn http://www.morning.thzgd.cn.gov.cn.thzgd.cn http://www.morning.hhxwr.cn.gov.cn.hhxwr.cn http://www.morning.qrqg.cn.gov.cn.qrqg.cn http://www.morning.sjwzz.cn.gov.cn.sjwzz.cn http://www.morning.nzwp.cn.gov.cn.nzwp.cn http://www.morning.qfbzj.cn.gov.cn.qfbzj.cn http://www.morning.ymdhq.cn.gov.cn.ymdhq.cn http://www.morning.drcnn.cn.gov.cn.drcnn.cn http://www.morning.kjcfz.cn.gov.cn.kjcfz.cn http://www.morning.mcbqq.cn.gov.cn.mcbqq.cn http://www.morning.ccjhr.cn.gov.cn.ccjhr.cn http://www.morning.fbtgp.cn.gov.cn.fbtgp.cn http://www.morning.pmnn.cn.gov.cn.pmnn.cn http://www.morning.htpjl.cn.gov.cn.htpjl.cn http://www.morning.bpwfr.cn.gov.cn.bpwfr.cn