iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >pyspark dataframe列的合并与拆分实例
  • 956
分享到

pyspark dataframe列的合并与拆分实例

pyspark dataframedataframe列的合并dataframe列的拆分 2023-03-23 08:03:31 956人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

目录pyspark dataframe列的合并与拆分dataframe列数据的分割dataframe列数据的拆分dataframe将一行分成多行dataframe列数据的合并data

pyspark dataframe列的合并与拆分

使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。

这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("dataframe_split") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
 
sc = spark.sparkContext
df = spark.read.csv('hdfs://master:9000/dataset/dataframe_split.csv', inferSchema=True, header=True)
df.show(3)

原始数据如下所示

dataframe列数据的分割

from pyspark.sql.functions import split, explode, concat, concat_ws
df_split = df.withColumn("s", split(df['score'], " "))
df_split.show()

 

dataframe列数据的拆分

zipWithIndex:给每个元素生成一个索引 

排序首先基于分区索引,然后是每个分区内的项目顺序.因此,第一个分区中的第一个item索引为0,最后一个分区中的最后一个item的索引最大.当RDD包含多个分区时此方法需要触发spark作业.

first_row = df.first()
numAttrs = len(first_row['score'].split(" "))
print("新增列的个数", numAttrs)
attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
print("列名:", attrs)
for name, index in attrs:
    df_split = df_split.withColumn(name, df_split['s'].getItem(index))
df_split.show()

 

dataframe将一行分成多行

df_explode = df.withColumn("e", explode(split(df['score'], " ")))
df_explode.show()

dataframe列数据的合并

列的合并有两个函数:一个不添加分隔符concat(),一个添加分隔符concat_ws()

concat

df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_concat.show()

 

caoncat_ws

df_ws = df_split.withColumn("score_concat", concat_ws('-', df_split['score_0'], \
                                                       df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_ws.show()

dataframe多行转多列

pivot: 旋转当前[[dataframe]]列并执行指定的聚合 

#DataFrame 数据格式:每个用户对每部电影的评分 userID 用户ID,movieID 电影ID,rating评分
df=spark.sparkContext.parallelize([[15,399,2], \
                                   [15,1401,5], \
                                   [15,1608,4], \
                                   [15,20,4], \
                                   [18,100,3], \
                                   [18,1401,3], \
                                   [18,399,1]])\
                    .toDF(["userID","movieID","rating"])
#pivot 多行转多列
resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
#结果
resultDF.show()

pyspark dataframe常用操作

总体原则

pyspark中,dataframe与sql的耗时会经引擎优化,效率高于rdd,因此尽可能使用dataframe或者sql。执行效率之外,dataframe优于rdd的另一个好处是:dataframe的各个量有语义信息,便于后期维护。比如rdd[0][1][1]这种很难维护,但是,df.info.school.grade就容易理解。

在使用dataframe过程中,应尽量避免使用udf,因为序列化数据原本在JVM中,现在spark在worker上启动一个python进程,需要将全体数据序列化成Python可解释的格式,计算昂贵。

列相关

根据已有列生成新列

from pyspark.sql.functions import length, col, lit, size
df.withColumn("length_col", length(col("existing_str_col"))) # 将existing_str_col的长度生成新列
df.withColumn("constant_col", lit("hello")) # 生成一列常量
df.withColumn("size_col", size(col("existing_array_col"))) # 将existing_array_col的元素个数生成新列

从已有列选择部分列

from pyspark.sql.functions import col
df = df.select(col("col_1").cast("string"), col("col_2").alias("col_2_")) # 选择col_1列和col_2列,并将col_1列转换为string格式,将col_2列重命名为col_2_,此时不再存在col_2

将几列连接起来形成新列

from pyspark.sql.functions import concat_ws
 
df = df.withColumn("concat_col", concat_ws("_", df.col_1, df.col_2)) # 使用_将col_1和col_2连接起来,构成新列concat_col

将string列分割成list

from pyspark.sql.functions import split
 
df = df.withColumn("split_col", split(df.col, "-")) #按照-将df中的col列分割,此时split_col时一个list,后续或者配合filter(length(...))使用

统计列均值

from pyspark.sql.functions import mean
 
col_mean = df.select(mean(col)).collect()[0][0]

行相关

从全体行中选择部分行(一般调试时使用)

print(df.take(5)) #交互式的pyspark shell中,等价于df.show(5)

统计行数量

print(df.count()) #统计行数量

从全体行中筛选出部分行

from pyspark.sql.functions import col
df = df.filter(col("col_1")==col("col_2")) #保留col_1等于col_2的行

删除带null的行

df.na.drop("all") # 只有当所有列都为空时,删除该行
df.na.drop("any") # 任意列为空时,删除该行
df.na.drop("all", colsubset=["col_1","col_2"]) # 当col_1和col_2都为空时,删除该行

去除重复行

df = df.distinct() # 删除所有列值相同的重复行
df = df.dropDuplicates(["date", "count"]) # 删除date, count两列值相同的行

一行拆分成多行

from pyspark.sql.functions import explode, split
 
df = df.withColumn("sub_str", explode(split(df["str_col"], "_"))) # 将str_col按-拆分成list,list中的每一个元素成为sub_str,与原行中的其他列一起组成新的行

填补行中的空值

df.na.fill({"col_name":fill_content}) # 用fill_content填补col_name列的空值

行前加入递增(不一定连续)唯一序号

from pyspark.sql.functions import monotonically_increasing_id
 
df = df.withColumn("id", monotonically_increasing_id())

两个dataframe

两个dataframe根据某列拼接

df_3 = df_1.join(df_2, df_1.col_1==df_2.col_2) # inner join, 只有当df_1中的col_1列值等于df_2中的col_2时,才会拼接
df_4 = df_1.join(df_2, df_1.col_1==df_2.col_2, "left") # left join, 当df_1中的col_1值不存在于df_2中时,仍会拼接,凭借值填充null

两个dataframe合并

df3 = df1.uNIOn(df2)

聚合操作

groupBy
from pyspark.sql.functions import concat_ws, split, explode, collect_list, struct
 
concat_df = concat_df.groupBy("sample_id", "sample_date").agg(collect_list('feature').alias("feature_list")) # 将同sample_id, sample_date的行聚合成组,feature字段拼成一个list,构成新列feature_list。agg配合groupBy使用,效果等于select。此时concat_df只有两列:sample_id和feature_list。
concat_tuple_df = concat_df.groupBy("sample_id", "sample_date").agg(collect_list(struct("feature", "owner")).alias("tuple")) # 将同sample_id, sample_date的行聚合成组, (feature, owner)两个字段拼成一个单位,组内所有单位拼成一个list,构成新列tuple

窗口函数

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
 
windowspec = Window.partitionBy(df.id, df.date).orderBy(col("price").desc(), col("discount").asc()) # 相同id,date的行被聚成组,组内按照price降序,discount升序进行排列
df = df.withColumn("rank", row_number().over(windowSpec)) #为排序之后的组进行组内编号
df = df.filter(df.rank<=1) # 取组内top-1行

读写成csv

from pyspark.sql import SparkSession
from pyspark import SparkContext
 
sc = SparkContext(appName="test_rw")
sc_session = SparkSession(sc)
df.write.mode("overwrite").options(header="true").csv(output_path)
df = sc_session.csv.read(input_path, header=True)

dataframe转SQL

from pyspark import SparkContext
from pyspark.sql import SparkSession
 
sc = SparkContext(appName='get_sample')
sc_session = SparkSession(sc)
 
sample_df.createOrReplaceTempView("item_sample_df")
sample_df = sc_session.sql(
        '''
            select sample_id
                ,label
                ,type_ as type
                ,split(item_id, "_")[2] as owner
                ,ftime
            from item_sample_df
        ''')

自定义函数UDF(如非必要,勿用)

from pysprak.sql.functions import udf, col
from pyspark.sql.types import StringType, ArrayType, StructField, StructType
 
 
def simple_func(v1, v2):
    pass
    # return str
 
simple_udf = udf(my_func, StringType())
 
df = df.withColumn("new", simple_udf(df["col_1"], df["col_2"]))
 
 
 
# 复杂type
 
def get_entity_func():
    pass
    # return str_list_1, str_list_2
 
entity_schema = StructType([
                    StructField("location", ArrayType(StringType()), True),
                    StructField("nondigit", ArrayType(StringType()), True)
                ])
 
get_entity_udf = udf(get_entity_func, entity_schema)

dataframe与rdd互相转换

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
 
 
sc = SparkContext(appName="rdd2df")
sc_session = SparkSession(sc)
 
rdd = df.rdd # df转rdd, 注意每列仍带header,要map(lambda line: [line.id, line.price])才可以转换成不带header
 
schema = StructType([
                    StructField("id", StringType(), True),
                    StructField("price", FloatType(), True)
                    ])
df = sc_session.createDataFrame(rdd, schema) # rdd转df

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

--结束END--

本文标题: pyspark dataframe列的合并与拆分实例

本文链接: https://www.lsjlt.com/news/200961.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • pyspark dataframe列的合并与拆分实例
    目录pyspark dataframe列的合并与拆分dataframe列数据的分割dataframe列数据的拆分dataframe将一行分成多行dataframe列数据的合并data...
    99+
    2023-03-23
    pyspark dataframe dataframe列的合并 dataframe列的拆分
  • pyspark dataframe列的合并与拆分方法是什么
    这篇文章主要介绍了pyspark dataframe列的合并与拆分方法是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇pyspark dataframe列的合并与拆分方法是什么文章都会有...
    99+
    2023-07-05
  • SQL字符串的合并与拆分实例代码分析
    本篇内容主要讲解“SQL字符串的合并与拆分实例代码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SQL字符串的合并与拆分实例代码分析”吧!字符串的合并在Or...
    99+
    2024-04-02
  • Pandas实现批量拆分与合并Excel的示例代码
    目录前言一、拆分成小表格二、合并excel1.介绍2.代码前言 提示:这里可以添加本文要记录的大概内容: 将一个EXCEL等份拆成多个EXCEL 将多个小EXCEL合并成一个大EXC...
    99+
    2024-04-02
  • pandas中DataFrame数据合并连接的实例分析
    这篇文章主要介绍了pandas中DataFrame数据合并连接的实例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。pandas作者Wes McKinney 在【PYTHO...
    99+
    2023-06-15
  • linux中如何拆分与合并
    这篇文章主要介绍了linux中如何拆分与合并,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。操作实战场景一:使用 split 拆分文件难度★演示用发行版Fedora 32涉及命...
    99+
    2023-06-15
  • Pandas实现Dataframe的合并
    目录简介使用concat使用append使用merge使用join覆盖数据简介 Pandas提供了很多合并Series和Dataframe的强大的功能,通过这些功能可以方便的进行数据...
    99+
    2024-04-02
  • PHP中怎么实现数组合并与拆分操作
    这篇文章给大家介绍PHP中怎么实现数组合并与拆分操作,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。PHP数组合并与拆分之array_merge_recursive()用法array_merge_recursive()可...
    99+
    2023-06-17
  • pandas将DataFrame的几列数据合并成为一列
    目录1.1 方法归纳1.2 .str.cat函数详解1.2.1 语法格式:1.2.2 参数说明:1.2.3 核心功能:1.2.4 常见范例:1.1 方法归纳 使用 + 直接将多列合并...
    99+
    2024-04-02
  • 怎么在python拆分与合并字符串
    这篇文章将为大家详细讲解有关怎么在python拆分与合并字符串,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。python有哪些常用库python常用的库:1.requesuts;2.scra...
    99+
    2023-06-14
  • 怎么在python中拆分与合并文件
    本篇文章给大家分享的是有关怎么在python中拆分与合并文件,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。python是什么意思Python是一种跨平台的、具有解释性、编译性、...
    99+
    2023-06-15
  • 怎么对FrontPage框架进行拆分与合并
    怎么对FrontPage框架进行拆分与合并?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。一、拆分框架选中要拆分的框架,可以看到被选中的框架有一个蓝色的边框围绕。...
    99+
    2023-06-08
  • Python 实现多表和工作簿合并及一表按列拆分
    目录一、相关知识点讲解1.1 需要使用的相关库1.2 os.walk(pwd)1.3 os.path.join(path1,path2…)1.4 案例解析1.5 如何在...
    99+
    2024-04-02
  • 队列技术在PHP与MySQL中的消息拆分和消息合并的应用
    引言:队列技术是一种非常重要的数据结构,它在分布式系统中起到了至关重要的作用。在PHP与MySQL中,队列技术可以被广泛应用于消息拆分和消息合并的场景中。本文将介绍队列技术在PHP与MySQL中的应用,并提供具体的代码示例。一、队列技术的概...
    99+
    2023-10-21
    MySQL PHP 队列 消息合并 消息拆分
  • PythonPandas实现DataFrame合并的图文教程
    目录一、merge(合并)的语法:二、以关键列来合并两个dataframe三、理解merge时数量的对齐关系1、one-to-one 一对一关系的merge2、one-to-many...
    99+
    2024-04-02
  • C#实现拆分合并Word表格中的单元格
    目录程序环境在Word表格中合并单元格完整代码效果图在Word表格中拆分单元格完整代码效果图我们在使用Word制作表格时,由于表格较为复杂,只是简单的插入行、列并不能满足我们的需要。...
    99+
    2022-12-22
    C#拆分合并Word表格单元格 C#拆分单元格 C#合并单元格
  • Python怎么实现多表和工作簿合并及一表按列拆分
    今天小编给大家分享一下Python怎么实现多表和工作簿合并及一表按列拆分的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。一、相...
    99+
    2023-06-30
  • Java请求流量合并和拆分提高系统的并发量示例
    目录序言理论基础应用实践(一)编码与使用实现细节1、ConcurrentLinkedQueue2、CompletableFuture其它应用场景1、服务间接口调用小结序言 在并发场景...
    99+
    2024-04-02
  • Linux命令行中怎么进行文件的拆分与合并
    Linux命令行中怎么进行文件的拆分与合并,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。操作概述备份文件时常常涉及到大文件传输的问题,遇到网络质量不佳或者其他问题常常会导致传输...
    99+
    2023-06-28
  • C++实例分析组合数的计算与排列组合的产生
    目录组合数的计算使用加法递推—O(n^2)使用乘法递推—O(n)排列和组合的产生(无重集元素)全排列一般组合全组合由上一排列产生下一排列由上一组合产生下一组合...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作