广告
返回顶部
首页 > 资讯 > 后端开发 > Python >pyspark创建DataFrame的几种方法
  • 503
分享到

pyspark创建DataFrame的几种方法

2024-04-02 19:04:59 503人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录pyspark创建DataFrame RDD和DataFrame 使用二元组创建DataFrame 使用键值对创建DataFrame 使用rdd创建DataFrame 基于rdd

pyspark创建DataFrame

为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。

RDD和DataFrame

在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象。

这里简单看一下RDD和DataFrame的类型。


print(type(rdd))  # <class 'pyspark.rdd.RDD'>
print(type(df))   # <class 'pyspark.sql.dataframe.DataFrame'>

翻阅了一下源码的定义,可以看到他们之间并没有继承关系。


class RDD(object):

    """
    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
    Represents an immutable, partitioned collection of elements that can be
    operated on in parallel.
    """


class DataFrame(object):
    """A distributed collection of data grouped into named columns.

    A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
    and can be created using various functions in :class:`SparkSession`::
 ...
    """

RDD是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作。
DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计。

但是RDD只是元素的集合,但是DataFrame以列进行分组,类似于mysql的表或pandas中的DataFrame。

实际工作中,我们用的更多的还是DataFrame。

使用二元组创建DataFrame

尝试第一种情形发现,仅仅传入二元组,结果是没有列名称的。
于是我们尝试第二种,同时传入二元组和列名称。


a = [('Alice', 1)]
output = spark.createDataFrame(a).collect()
print(output)
# [Row(_1='Alice', _2=1)]

output = spark.createDataFrame(a, ['name', 'age']).collect()
print(output)
# [Row(name='Alice', age=1)]

这里collect()是按行展示数据表,也可以使用show()对数据表进行展示。


spark.createDataFrame(a).show()
# +-----+---+
# |   _1| _2|
# +-----+---+
# |Alice|  1|
# +-----+---+

spark.createDataFrame(a, ['name', 'age']).show()
# +-----+---+
# | name|age|
# +-----+---+
# |Alice|  1|
# +-----+---+

使用键值对创建DataFrame


d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)

# [Row(age=1, name='Alice')]

使用rdd创建DataFrame


a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)

# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]

基于rdd和ROW创建DataFrame


from pyspark.sql import Row


a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)

# [Row(name='Alice', age=1)]

基于rdd和StructType创建DataFrame


from pyspark.sql.types import *

a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)

# [Row(name='Alice', age=1)]

基于pandas DataFrame创建pyspark DataFrame

df.toPandas()可以把pyspark DataFrame转换为pandas DataFrame。


df = spark.createDataFrame(rdd, ['name', 'age'])
print(df)  # DataFrame[name: string, age: bigint]

print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'>

# 传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
print(output)

# [Row(name='Alice', age=1)]

创建有序的DataFrame


output = spark.range(1, 7, 2).collect()
print(output)
# [Row(id=1), Row(id=3), Row(id=5)]

output = spark.range(3).collect()
print(output)
# [Row(id=0), Row(id=1), Row(id=2)]

通过临时表得到DataFrame


spark.reGISterDataFrameAsTable(df, "table1")
df2 = spark.table("table1")
b = df.collect() == df2.collect()
print(b)
# True

配置DataFrame和临时表

创建DataFrame时指定列类型

在createDataFrame中可以指定列类型,只保留满足数据类型的列,如果没有满足的列,会抛出错误。


a = [('Alice', 1)]
rdd = sc.parallelize(a)

# 指定类型于预期数据对应时,正常创建
output = spark.createDataFrame(rdd, "a: string, b: int").collect()
print(output)  # [Row(a='Alice', b=1)]
rdd = rdd.map(lambda row: row[1])
print(rdd)  # pythonRDD[7] at RDD at PythonRDD.Scala:53

# 只有int类型对应上,过滤掉其他列。
output = spark.createDataFrame(rdd, "int").collect()
print(output)   # [Row(value=1)]

# 没有列能对应上,会抛出错误。
output = spark.createDataFrame(rdd, "boolean").collect()
# TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>

注册DataFrame为临时表


spark.registerDataFrameAsTable(df, "table1")
spark.dropTempTable("table1")

获取和修改配置


print(spark.getConf("spark.sql.shuffle.partitions"))  # 200
print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 10
print(spark.setConf("spark.sql.shuffle.partitions", u"50"))  # None
print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 50

注册自定义函数


spark.registerFunction("stringLengthString", lambda x: len(x))
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)='4')]

spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthString('test')").collect()
print(output)
# [Row(stringLengthString(test)=4)]

spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
output = spark.sql("SELECT stringLengthInt('test')").collect()
print(output)
# [Row(stringLengthInt(test)=4)]

查看临时表列表

可以查看所有临时表名称和对象。


spark.registerDataFrameAsTable(df, "table1")
print(spark.tableNames())  # ['table1']
print(spark.tables())  # DataFrame[database: string, tableName: string, isTemporary: boolean]
print("table1" in spark.tableNames())  # True
print("table1" in spark.tableNames("default"))  # True

spark.registerDataFrameAsTable(df, "table1")
df2 = spark.tables()
df2.filter("tableName = 'table1'").first()
print(df2)  # DataFrame[database: string, tableName: string, isTemporary: boolean]

从其他数据源创建DataFrame

MySQL

前提是需要下载jar包。
Mysql-connector-java.jar


from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F


sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.fORMat("jdbc").options(
    url="jdbc:mysql://localhost:3306/mydata?user=root&passWord=mysql&"
        "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
        "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()

参考

RDD和DataFrame的区别
spark官方文档 翻译 之pyspark.sql.SQLContext

到此这篇关于pyspark创建DataFrame的几种方法的文章就介绍到这了,更多相关pyspark创建DataFrame 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: pyspark创建DataFrame的几种方法

本文链接: https://www.lsjlt.com/news/126162.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • pyspark创建DataFrame的几种方法
    目录pyspark创建DataFrame RDD和DataFrame 使用二元组创建DataFrame 使用键值对创建DataFrame 使用rdd创建DataFrame 基于rdd...
    99+
    2022-11-12
  • PythonPandas创建Dataframe数据框的六种方法汇总
    目录一、字典类方法1:列表、数组或元组构成的字典构造Dataframe方法2:Series构成的字典构造Dataframe二、列表类方法1:二维数组构造Dataframe方法2:字典...
    99+
    2023-05-19
    Python Pandas创建Dataframe数据框 Python 创建Dataframe
  • pandas中DataFrame重置索引的几种方法
    在pandas中,经常对数据进行处理 而导致数据索引顺序混乱,从而影响数据读取、插入等。 小笔总结了以下几种重置索引的方法: import pandas as pd import...
    99+
    2022-11-12
  • Flutter-创建Flutter项目的几种方法
    方法一:终端命令行创建,打开终端,cd 打开创建项目文件存放位置,输入flutter create 项目名 flutter create 项目名 注意点⚠️:创建过程中可能会出现 "xxx项目名" is not a valid Dart p...
    99+
    2023-08-18
    flutter android android studio
  • php创建数组有几种方法
    PHP是一种十分流行的服务器端脚本语言,开发人员可以使用各种各样的方法来创建数组。本文将介绍PHP创建数组的几种常见方法。使用数组字面量在PHP中,数组字面量是最常见的创建数组的方法。使用这种方法时,我们可以直接在一对方括号中输入一个或多个...
    99+
    2023-05-19
  • 在 Python 中创建DataFrame的方法
    目录方法一:创建空的DataFrame​方法二:使用List创建DataFrame​​方法三:使用字典创建DataFrame​​方法四:使用数组创建带索引DataFrame​方法五:...
    99+
    2022-11-13
  • pandas DataFrame的创建方
    pandas DataFrame的增删查改总结系列文章: pandas DaFrame的创建方法 pandas DataFrame的查询方法 pandas DataFrame行或列的删除方法 pandas DataFrame的修改方法 ...
    99+
    2023-01-30
    pandas DataFrame
  • java中创建对象的方法有几种
    一、使用new关键字这是我们最常见的也是最简单的创建对象的方式,通过这种方式我们还可以调用任意的构造函数(无参的和有参的)。例如:User user = new User();二、使用反射机制运用反射手段,调用Java.lang.Class...
    99+
    2021-01-18
    java 创建 对象 方法
  • javascript创建对象的几种常见方法
    本篇内容介绍了“javascript创建对象的几种常见方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!既...
    99+
    2022-10-19
  • pyspark dataframe列的合并与拆分方法是什么
    这篇文章主要介绍了pyspark dataframe列的合并与拆分方法是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇pyspark dataframe列的合并与拆分方法是什么文章都会有...
    99+
    2023-07-05
  • javascript中有哪几种创建对象的方法
    本篇内容介绍了“javascript中有哪几种创建对象的方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!...
    99+
    2022-10-19
  • 创建 KVM 虚机的几种方式
    1 使用 virt-install 命令2 使用 virt-manager 工具3 使用 qemu-img 和 qemu-kvm ...
    99+
    2023-06-06
  • MySQL几种创建索引的方式
    一、创建表时创建索引 key 索引名 (column); 二、表创建好后创建索引 通过Alter创建索引 ①PRIMARY  KEY(主键索引)         mysql > ALTER  TABLE  `table_name`  A...
    99+
    2023-09-01
    mysql
  • Java 中创建线程的几种方式
    Java 是一种面向对象的编程语言,它支持多线程编程。多线程编程是指在一个程序中同时运行多个线程,这些线程可以并行执行,以提高程序的效率和性能。Java 提供了多种创建线程的方法,本文将介绍这些方法以...
    99+
    2023-09-13
    java jvm servlet
  • spring创建连接池的几种方式
    spring使用连接池有很多种方式,jdbc(不使用连接池),c3p0,dbcp,jndi,下面将分别贴代码介绍这几种: 1.jdbc方式   使用的是DriverManage...
    99+
    2022-10-18
  • Python中创建字典的几种方法总结(推荐)
    1、传统的文字表达式: >>> d={'name':'Allen','age':21,'gender':'male'} >>> d {'age': 21, 'name...
    99+
    2022-06-04
    字典 几种方法 Python
  • pandas创建DataFrame的方式小结
    如果你是一个pandas初学者,那么不知道你会不会像我一样。在学用列表或者数组创建DataFrame时理不清怎样用数据生成以及想要形状的的Dataframe,那么,现在,你不用自己琢...
    99+
    2022-11-12
  • pandas创建DataFrame对象失败的解决方法
    目录报错代码报错翻译报错原因解决方法创建DataFrame对象的四种方法1. list列表构建DataFrame2. dict字典构建DataFrame3. ndarray创建Dat...
    99+
    2023-01-17
    pandas创建DataFrame对象失败 pandas创建DataFrame对象
  • java创建对象的方式有哪几种
    在Java中,可以通过以下几种方式创建对象:1. 使用new关键字:通过使用new关键字,可以调用类的构造方法实例化一个对象。例如:...
    99+
    2023-10-10
    java
  • java中创建线程有几种方式
    线程的创建方式继承Thread类实现多线程覆写Runnable()接口实现多线程,而后同样覆写run()。推荐此方式使用Callable和Future创建线程相关视频教程推荐:java学习视频实例如下:继承Thread类实现多线程 publ...
    99+
    2018-07-06
    java教程 java 线程 创建 方式
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作