iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >PySpark中RDD的数据输出详解
  • 317
分享到

PySpark中RDD的数据输出详解

PySpark RDD数据输出RDD数据输出PySpark RDD 2023-01-15 12:01:34 317人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录collect算子演示reduce算子 演示 take算子 count算子小结savaAsTextFile算子配置hadoop依赖 修改r

一. 回顾

数据输入:

  • sc.parallelize
  • sc.textFile

数据计算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey
  • .…

二.输出为python对象

数据输出可用的方法是很多的,这里简单介绍常会用到的4个

  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
返回值是一个list

演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_Python"]="C:/Users/hawa/AppData/Local/Programs/Python/python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())

结果是

 单独输出rdd,输出的是rdd的类名而非内容

reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

代码

 返回值等于计算函数的返回值

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)

结果是

 take算子

功能:取RDD的前N个元素,组合成list返回给你
用法:

 

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)

结果是

 count算子

功能:计算RDD有多少条数据,返回值是一个数字
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)
#count算子,统计rdd中有多少条数据,返回值为数字
num_count=rdd.count()
print(num_count)
#关闭链接
sc.stop()

结果是

小结

1.Spark的编程流程就是:

  • 将数据加载为RDD(数据输入)对RDD进行计算(数据计算)
  • 将RDD转换为Python对象(数据输出)

 2.数据输出的方法

  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

数据输出可用的方法是很多的,这里只是简单介绍4个

三.输出到文件中

savaAsTextFile算子

功能:将RDD的数据写入文本文件中支持本地写出, hdfs等文件系统.
代码:

 演示

 这是因为这个方法本质上依赖大数据的Hadoop框架,需要配置Hadoop 依赖.

配置Hadoop依赖

调用保存文件的算子,需要配置Hadoop依赖。

  • 下载Hadoop安装包解压到电脑任意位置
  • 在Python代码中使用os模块配置: os.environ['HADOOP_HOME']='HADOOP解压文件夹路径′。
  • 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
  • 下载hadoop.dll,并放入:C:/windows/System32文件夹内

配置完成之后,执行下面的代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 输出的文件夹中有这么8文件,是因为RDD被默认为分成8个分区
SaveAsTextFile算子输出文件的个数是根据RDD的分区来决定的,有多少分区就会输出多少个文件,RDD在本电脑中默认是8(该电脑CPU核心数是8核)

 打开设备管理器就可以查看处理器个数,这里是有8个逻辑CPU
或者打开任务管理器就可以看到是4核8个逻辑CPU

 修改rdd分区为1个

方式1, SparkConf对象设置属性全局并行度为1:

 方式2,创建RDD的时候设置( parallelize方法传入numSlices参数为1)

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 小结

1.RDD输出到文件的方法

  • rdd.saveAsTextFile(路径)
  • 输出的结果是一个文件夹
  • 有几个分区就输出多少个结果文件

2.如何修改RDD分区

  • SparkConf对象设置conf.set("spark.default.parallelism", "7")
  • 创建RDD的时候,sc.parallelize方法传入numSlices参数为1

四.练习案例

需求: 

读取文件转换成RDD,并完成:

  • 打印输出:热门搜索时间段(小时精度)Top3
  • 打印输出:热门搜索词Top3
  • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  • 将数据转换为JSON格式,写出为文件

代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
rdd=sc.textFile("D:/search_log.txt")
#需求1 打印输出:热门搜索时间段(小时精度)Top3
# 取出全部的时间并转换为小时
# 转换为(小时,1)的二元元组
# Key分组聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[0][:2]).\
    map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)#上面用的‘/'是换行的意思,当一行代码太长时就可以这样用
print(result1)
#需求2 打印输出:热门搜索词Top3
# 取出全部的搜索词
# (词,1)二元元组
# 分组聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[2])\
    .map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result2)
#需求3 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
result3=rdd.map(lambda x:x.split("\t")).\
    filter((lambda x:x[2]=="黑马程序员")).\
    map(lambda x:(x[0][:2],1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result3)
#需求4 将数据转换为jsON格式,写出为文件
rdd.map(lambda x:x.split("\t")).\
    map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\
    .saveAsTextFile("D:/out_json")

结果是

到此这篇关于PySpark中RDD的数据输出详解的文章就介绍到这了,更多相关PySpark RDD数据输出内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: PySpark中RDD的数据输出详解

本文链接: https://www.lsjlt.com/news/177802.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • PySpark中RDD的数据输出详解
    目录collect算子演示reduce算子 演示 take算子 count算子小结savaAsTextFile算子配置Hadoop依赖 修改r...
    99+
    2023-01-15
    PySpark RDD数据输出 RDD数据输出 PySpark RDD
  • 【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )
    文章目录 一、RDD#sortBy 方法1、RDD#sortBy 语法简介2、RDD#sortBy 传入的函数参数分析 二、代码示例 - RDD#sortBy 示例1、需求分析2、代码示例...
    99+
    2023-08-30
    python 开发语言 PySpark Spark PyCharm 原力计划
  • 【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )
    文章目录 一、RDD 简介1、RDD 概念2、RDD 中的数据存储与计算 二、Python 容器数据转 RDD 对象1、RDD 转换2、转换 RDD 对象相关 API3、代码示例 - Py...
    99+
    2023-08-31
    python 开发语言 Spark PySpark PyCharm 原力计划
  • Java中常用数据类型的输入输出详解
    目录1、Char型1.1  输入格式:1.2  举例说明2、int型1.1  简单的int格式输入:1.2  举例说明2.1带空格的int格式输入 :2.2  举例说明3.1  复杂...
    99+
    2024-04-02
  • pandas进行数据输入和输出的方法详解
    目录1.文本格式数据的读写 1.1 分块读入文本文件 1.2 将数据写入文本格式总结1.文本格式数据的读写 read_csv():从文件、URL或文件型对象读取分...
    99+
    2024-04-02
  • Python数据的输出
    一.标准输出 1.使用表达式  2.使用print()函数 基本格式为: print([输出项1,输出项2,......,输出项n][,sep=分隔符][,end=结束符]) 没有输出项时输出一个空行: print() 这里输出...
    99+
    2023-09-04
    python 开发语言
  • C++的输入和输出流详解
    目录输入和输出流标准输入流小案例标准输出流文件读写总结输入和输出流 从键盘输入数据,输出到显示器屏幕。这种输入输出称为标准的输入输出,简称标准I/O。 从磁盘文件输入数据,数据输出到...
    99+
    2024-04-02
  • PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解
    目录 前言 一、PySpark基础功能  1.Spark SQL 和DataFrame 2.Pandas API on Spark 3.Streaming 4.MLBase/MLlib 5.Spark Core 二、PySpark依赖 De...
    99+
    2023-09-21
    数据分析 python pandas spark 大数据
  • PHP中的输出缓冲控制详解
    目录清除输出获得输出缓冲区的内容刷新(输出)缓冲区内容一些检测函数使用 ob_start() 的回调函数来进行输出缓冲区的内容替换添加 URL 重写器总结测试代码:在 PHP 中,我...
    99+
    2024-04-02
  • Python的输入和输出问题详解
    输出用print()在括号中加上字符串,就可以向屏幕上输出指定的文字。比如输出'hello, world',用代码实现如下: >>> print('hello, world') print()函数也可以接受多个字符...
    99+
    2023-01-30
    详解 Python
  • JavaScript中的输出数据多种方式
    前言 在 JavaScript 中,不像 Java 等语言,它没有任何打印或者输出方法的,通常使用如下 4 种方式来输出数据。 使用 window.alert() 用于弹出警告框使用...
    99+
    2024-04-02
  • Python的输入,输出和标识符详解
    目录一、标识符命名要规范:需要注意以下两点:二、input(输入)三、print(输出)总结一、标识符 何为标识符? 标识符是用来标识某个实体的一个符号。在编程语言中,标识符是计算机...
    99+
    2024-04-02
  • JAVA语言的输入输出流详解(c)
    详解b中的例子,详解[@more@]  1. BufferedReader是Reader的一个子类,它具有缓冲的作用,避免了频繁的从物理设备中读取信息。它有以下两个构造函数:BufferedReader(Reader in) Buffere...
    99+
    2023-06-03
  • Hive数据导出详解
    目录一、数据导出是什么?二、六大帮派1.insert2.Hadoop命令导出到本地3.Hive shell命令导出4.export导出到HDFS上5.Sqoop导出6.清除表中的数据(Truncate)—&m...
    99+
    2023-04-14
    Hive数据导出 Hive数据 数据导出
  • php数据库数据输出乱码如何解决
    今天小编给大家分享一下php数据库数据输出乱码如何解决的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。原因分析一般来说,数据输...
    99+
    2023-07-06
  • php中可以输出数据类型的是
    php 中输出数据类型的方式有三种:使用 gettype() 函数返回变量的数据类型。使用 var_dump() 函数提供更详细的信息,包括数据类型。使用 print_r() 函数打印数...
    99+
    2024-04-26
  • JavaScript中输出数据的方式有哪些
    这篇文章主要介绍“JavaScript中输出数据的方式有哪些”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“JavaScript中输出数据的方式有哪些”文章能帮助大家解决问题。前言在 JavaScri...
    99+
    2023-06-29
  • sql中怎么将输出的数据换行
    在SQL中,可以使用CONCAT函数将输出的数据换行。例如,可以使用以下语句将两个字段合并为一个字段,并在它们之间添加换行符: SE...
    99+
    2024-04-09
    sql
  • Nginx中怎么实现数据输出
    本篇文章为大家展示了Nginx中怎么实现数据输出,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Nginx配置分两种模式,一种是动态的,实时压缩输出。一种是静态的,找...
    99+
    2024-04-02
  • vue-resourse中怎么输出json数据
    这篇文章给大家介绍vue-resourse中怎么输出json数据,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1.demo目录,不要管index.html和index.js2.html...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作