iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >如何使用FlinkSQL内置函数
  • 442
分享到

如何使用FlinkSQL内置函数

2024-04-02 19:04:59 442人浏览 独家记忆
摘要

本篇内容介绍了“如何使用flinksql内置函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!前言Flin

本篇内容介绍了“如何使用flinksql内置函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

前言

Flink Table 和 SQL 内置了很多 SQL 中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。

一、系统内置函数

Flink Table api 和 SQL 为用户提供了一组用于数据转换的内置函数。SQL 中支持的很多函数,Table API 和 SQL  都已经做了实现,其它还在快速开发扩展中。

以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。

类型TableApiSQLAPI
比较函数ANY1 === ANY2value1 = value2
比较函数NY1 > ANY2value1 > value2
逻辑函数BOOLEAN1 || BOOLEAN2boolean1 OR boolean2
逻辑函数BOOLEAN.isFalseboolean IS FALSE
逻辑函数!BOOLEANNOT boolean
算术函数NUMERIC1 + NUMERIC2numeric1 + numeric2
算术函数NUMERIC1.power(NUMERIC2)POWER(numeric1, numeric2)
字符串函数STRING1 + STRING2string1 || string2
字符串函数STRING.upperCase()UPPER(string)
字符串函数STRING.charLength()CHAR_LENGTH(string)
时间函数STRING.toDateDATE string
时间函数STRING.toTimestampTIMESTAMP string
时间函数currentTime()CURRENT_TIME
时间函数NUMERIC.daysINTERVAL string range
时间函数NUMERIC.minutes 
聚合函数FIELD.countCOUNT(*)
聚合函数FIELD.sum0SUM([ ALL | DISTINCT ] expression)
聚合函数 RANK()
聚合函数 ROW_NUMBER()

二、Flink UDF

用户定义函数(User-defined  Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用 UDF  来自定义实现。

2.1 注册用户自定义函数 UDF

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的 Table API 注册函数。

函数通过调用 reGISterFunction()方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到  TableEnvironment 的函数目录中,这样 Table API 或 SQL 解析器就可以识别并正确地解释它。

2.2 标量函数(Scalar Functions)

用户定义的标量函数,可以将 0、1 或多个标量值,映射到新的标量值。

为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar  Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval(直接  def 声明,没有 override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。

在下面的代码中,我们定义自己的 HashCode 函数,在 TableEnvironment 中注册它,并在查询中调用它。

准备数据

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

代码如下

package udf  import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row   object FlinkSqlUdfHashCode {  def main(args: Array[String]): Unit = {    //1.构建运行环境    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1) // 设置并行度为1    //2.构建TableEnv    val tableEnv = StreamTableEnvironment.create(env)    //3.构建数据源    tableEnv.connect(new FileSystem().path("./data/sensor.txt"))      .withFORMat(new Csv())      .withSchema(new Schema()        .field("id", DataTypes.STRING())        .field("timestamp", DataTypes.INT())        .field("temperature", DataTypes.DOUBLE())      ).createTemporaryTable("sensor")    // 转为表    val tableSensor = tableEnv.from("sensor")    // 床架转换对象    val code = new HashCode()    //使用tableAPI 进行测试    val tableRes = tableSensor.select('id, code('id))    tableEnv.registerFunction("code",code) // 注册udf    val tableSql = tableEnv.sqlQuery(      """        |select        |id,        |code(id)        |from        |sensor        |""".stripMargin)    // 输出    tableRes.toAppendStream[Row].print("tableAPI")    tableSql.toAppendStream[Row].print("tableSql")     env.execute("FlinkSqlUdfHashCode")  }   class HashCode() extends ScalarFunction {    def eval(s: String): String = {      s.hashCode.toString    }  } } 运行结果

2.3 表函数(Table Functions)

与用户定义的标量函数类似,用户定义的表函数,可以将 0、1 或多个标量值作为输入参数;

与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。为了定义一个表函数,必须扩展  org.apache.flink.table.functions 中的基类  TableFunction并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是 public的,并命名为  eval。求值方法的参数类型,决定表函数的所有有效参数。

返回表的类型由 TableFunction 的泛型类型确定。求值方法使用 protected collect(T)方法发出输出行。

在 Table API 中,Table 函数需要与.joinLateral 或.leftOuterJoinLateral 一起使用。

joinLateral 算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。

而 leftOuterJoinLateral  算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。

在 SQL 中,则需要使用 Lateral Table(),或者带有 ON TRUE 条件的左连接。

下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。

数据准备

hello|Word,hello|spark hello|Flink,hello|java,hello|大数据老哥

编写代码

package udf  import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.Row   object FlinkSqlUDFTableFunction {   def main(args: Array[String]): Unit = {     //1.构建运行环境     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1) // 设置并行度为1     //2.构建TableEnv     val tableEnv = StreamTableEnvironment.create(env)     //3.构建数据源     val data = env.readTextFile("./data/words.txt")     // 解析数据     val wordData: DataStream[String] = data.flatMap(_.split(","))     // 类型转换     val tableWord = tableEnv.fromDataStream(wordData,'id)     // 调用TableFunction     val split = new Split()     // Table API 方式一     val resTable1 = tableWord.       joinLateral(split('id) as('word,'length))       .select('id,'word,'length )     //  Table API  方式二     val resTable2 = tableWord.       leftOuterJoinLateral(split('id) as('word,'length))       .select('id,'word,'length )     // 将数据注册成表      tableEnv.createTemporaryView("sensor",tableWord)      tableEnv.registerFunction("split",split)      // SQL 方式一     val tableSQL1 = tableEnv.sqlQuery(       """         |select         |id,         |word,         |length         |from         |sensor ,LATERAL TABLE(split(id)) AS newsensor(word, length)         |""".stripMargin)     //  SQL 方式二     val TableSQL2 = tableEnv.sqlQuery(       """         |select         |id,         |word,         |length         |from         |sensor         | LEFT JOIN LATERAL TABLE(split(id)) AS newsensor(word, length) ON TRUE         |""".stripMargin)     // 调用数据     resTable1.toAppendStream[Row].print("resTable1")     resTable2.toAppendStream[Row].print("resTable2")     tableSQL1.toAppendStream[Row].print("tableSQL1")     TableSQL2.toAppendStream[Row].print("TableSQL2")       env.execute("FlinkSqlUDFTableFunction")   }    class Split() extends TableFunction[(String,Int)] {     def eval(str: String): Unit = {       str.split("\\|").foreach(         word => collect((word, word.length))       )     }   } }

2.4 聚合函数(Aggregate Functions)

用户自定义聚合函数(User-Defined Aggregate  Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承 AggregateFunction  抽象类实现的。

如何使用FlinkSQL内置函数

上图中显示了一个聚合的例子。

假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name 和 price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行  max()聚合,结果将是一个数值。AggregateFunction 的工作原理如下:

  • 首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用 AggregateFunction 的  createAccumulator()方法创建空累加器。

  • 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。

  • 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果。AggregationFunction 要求必须实现的方法:

除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session  group window)上下文中,则 merge()方法是必需的。

  • retract()

  • merge()

  • resetAccumulator()

接下来我们写一个自定义AggregateFunction,计算一个每个price的平均值。

数据准备

1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4

代码如下

package udf  import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.types.Row  import java.util   object FlinkSQUDFAggregateFunction {   def main(args: Array[String]): Unit = {     //1.构建运行环境     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1) // 设置并行度为1     //2.构建TableEnv     val tableEnv = StreamTableEnvironment.create(env)     //3.构建数据源     tableEnv.connect(new FileSystem().path("./data/datas"))       .withFormat(new Csv)       .withSchema(new Schema()         .field("id", DataTypes.STRING())         .field("name", DataTypes.STRING())         .field("price", DataTypes.DOUBLE())       ).createTemporaryTable("datas")     val AvgTemp = new AvgTemp()     val table = tableEnv.from("datas")      val resTableApi = table.groupBy('id)       .aggregate(AvgTemp('price) as 'sumprice)       .select('id, 'sumprice)      tableEnv.registerFunction("avgTemp",AvgTemp)     val tablesql = tableEnv.sqlQuery(       """         |select         |id ,avgTemp(price)         |from datas group by id         |""".stripMargin)     resTableApi.toRetractStream[Row].print("resTableApi")     tablesql.toRetractStream[Row].print("tablesql")     env.execute("FlinkSQUDFAggregateFunction")   }    class AvgTempAcc {     var sum: Double = 0.0     var count: Int = 0   }    class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {     override def getValue(acc: AvgTempAcc): Double = {       acc.sum / acc.count     }      override def createAccumulator(): AvgTempAcc = new AvgTempAcc()   }    def accumulate(accumulator: AvgTempAcc, price: Double): Unit = {     accumulator.sum += price      accumulator.count += 1   }  }

2.5表聚合函数(Table Aggregate Functions)

户定义的表聚合函数(User-Defined Table Aggregate  Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟 AggregateFunction  非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。

如何使用FlinkSQL内置函数

比如现在我们需要找到表中所有饮料的前 2 个最高价格,即执行 top2()表聚合。我们需要检查  5 行中的每一行,得到的结果将是一个具有排序后前 2 个值的表。用户定义的表聚合函数,是通过继承 TableAggregateFunction  抽象类来实现的。TableAggregateFunction 的工作原理如下:

  • 为首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 TableAggregateFunction 的  createAccumulator()方法可以创建空累加器。

  • 为随后,对每个输入行调用函数的 accumulate()方法来更新累加器。

  • 为处理完所有行后,将调用函数的 emitValue()方法来计算并返回最终结果。除了上述方法之外,还有一些可选择实现的方法。

  • retract()

  • merge()

  • resetAccumulator()

  • emitValue()

  • emitUpdateWithRetract()

接下来我们写一个自定义 TableAggregateFunction,用来提取每个 price 最高的两个平均值。

数据准备

1,Latte,6 2,Milk,3 3,Breve,5 4,Mocha,8 5,Tea,4

代码如下

package udf  import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} import org.apache.flink.table.functions.TableAggregateFunction import org.apache.flink.types.Row import org.apache.flink.util.Collector import udf.FlinkSQUDFAggregateFunction.AvgTemp   object FlinkSqlUDFTableAggregateFunction {   def main(args: Array[String]): Unit = {     //1.构建运行环境     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1) // 设置并行度为1     //2.构建TableEnv     val tableEnv = StreamTableEnvironment.create(env)     //3.构建数据源     tableEnv.connect(new FileSystem().path("./data/datas"))       .withFormat(new Csv)       .withSchema(new Schema()         .field("id", DataTypes.STRING())         .field("name", DataTypes.STRING())         .field("price", DataTypes.DOUBLE())       ).createTemporaryTable("datas")     val table = tableEnv.from("datas")     val temp = new Top2Temp()     val tableApi = table.groupBy('id)       .flatAggregate(temp('price) as('tmpprice, 'rank))       .select('id, 'tmpprice, 'rank)     tableEnv.registerFunction("temp",temp)       tableApi.toRetractStream[Row].print()      env.execute("FlinkSqlUDFTableAggregateFunction")   }    class Top2TempAcc {     var highestPrice: Double = Int.MinValue     var secodeHighestPrice: Double = Int.MinValue   }    class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {     override def createAccumulator(): Top2TempAcc = new Top2TempAcc      def accumulate(acc: Top2TempAcc, temp: Double): Unit = {       if (temp > acc.highestPrice) {         acc.secodeHighestPrice = acc.highestPrice         acc.highestPrice = temp       } else if (temp > acc.secodeHighestPrice) {         acc.highestPrice = temp       }     }      def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {       out.collect(acc.highestPrice, 1)       out.collect(acc.secodeHighestPrice, 2)     }   }  }

“如何使用FlinkSQL内置函数”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

您可能感兴趣的文档:

--结束END--

本文标题: 如何使用FlinkSQL内置函数

本文链接: https://www.lsjlt.com/news/61913.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何使用FlinkSQL内置函数
    本篇内容介绍了“如何使用FlinkSQL内置函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!前言Flin...
    99+
    2024-04-02
  • 如何使用Python中的内置函数
    如何使用Python中的内置函数Python是一种简单易学的编程语言,拥有丰富的内置函数库,这些函数可以帮助我们更高效地编写代码。本文将介绍一些常见的Python内置函数,并提供具体的代码示例,帮助读者更好地理解和使用这些函数。print(...
    99+
    2023-10-22
    使用方法 关键词: Python内置函数
  • Python中的内置函数如何使用
    这篇文章主要介绍“Python中的内置函数如何使用”,在日常操作中,相信很多人在Python中的内置函数如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Python中的内置函数如何使用”的疑惑有所帮助!...
    99+
    2023-07-06
  • 如何指定使用python内置函数
    在python中使用指定内置函数的方法max()作用:返回可迭代对象中的元素中的最大值或者所有参数的最大值。用法:>>> max(1,2,3) # 传入3个参数 取3个中较大者3>>> max('1234') # 传入1个可迭代对象,取其最...
    99+
    2024-04-02
  • 如何使用 PHP 内置函数操作数组?
    如何使用 PHP 内置函数操作数组 PHP 语言内置了丰富的函数来轻松操作数组,使开发人员可以高效地处理和管理数据。本文将介绍使用这些函数的常见实战案例。 基本数组函数 array...
    99+
    2024-04-22
    php 数组处理
  • python如何调用内置函数
    在python中调用内置函数的方法python内置函数就是python提供给你直接可以拿来使用的所有函数,所以内置函数是可以直接使用的。示例:#调用type(obj)函数name = "zhangsan"print(type(name))#...
    99+
    2024-04-02
  • 如何使用 PHP 内置函数处理图像?
    php 内置函数提供了便捷的图像处理功能,可实现图像缩放、裁剪、添加水印等操作。使用 imagecopyresampled() 可缩放图像,imagecrop() 可裁剪图像,而 ima...
    99+
    2024-04-22
    图像处理 php内建函数
  • python容器的内置通用函数如何使用
    这篇文章主要介绍“python容器的内置通用函数如何使用”,在日常操作中,相信很多人在python容器的内置通用函数如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”python容器的内置通用函数如何使用...
    99+
    2023-06-22
  • 如何使用 PHP 内置函数进行数学运算?
    php 内置数学函数可执行各种运算,包括基本算术运算(+、-、*、/、%)、四舍五入、取整、求绝对值、求最大值、最小值、幂次、平方根。实战案例中,可计算圆的面积、两个数字的平均值,并确定...
    99+
    2024-04-22
    php 数学运算
  • 如何使用 PHP 内置函数连接到数据库?
    本文介绍了使用 php 内置 mysqli_* 函数连接到 mysql 数据库的步骤:加载 mysql 扩展。建立连接,需要主机名、用户名、密码、数据库名称和端口。检查连接是否成功。实战...
    99+
    2024-04-23
    数据库 php mysql
  • 如何使用 PHP 内置函数进行数据验证?
    如何使用 php 内置函数进行数据验证?php 内置了大量函数用于验证各种数据类型,包括:数字:is_numeric()、filter_var()字符串:strlen()、empty()...
    99+
    2024-04-23
    数据验证 php
  • 如何使用 PHP 内置函数处理表单数据?
    php 内置函数处理表单数据包括验证、清理和安全处理,具体步骤如下:验证数据是否为空或已设置(empty()、isset())去除字符串空格和过滤输入类型(trim()、fil...
    99+
    2024-04-23
    php 表单处理 mysql lsp
  • PHP 内置函数的使用
    php 内置函数提供了以下功能:1.类型转换:is_numeric()、floatval()、strval()、intval();2.字符串处理:strlen()、substr...
    99+
    2024-04-14
    php 字符串处理 字符串解析
  • 如何使用 PHP 内置函数操纵数据库数据?
    php 提供了内置函数,可用于与数据库交互和数据操作,如连接到数据库、创建表、插入、读取、更新和删除数据。这些函数简化了与数据库交互的过程,使其在应用程序中管理数据变得容易。 使用 P...
    99+
    2024-04-22
    数据库 php mysql
  • 如何使用 PHP 内联函数?
    php 内联函数是匿名函数,可通过 fn() 关键字创建,用于一次性操作或封装复杂逻辑。它们可以作为参数传递,充当闭包访问外部变量,并在日志记录等实战场景中便捷使用。 如何使用 PHP...
    99+
    2024-04-16
    php 内联函数
  • 用help如何看python内置函数
    使用help方法查看python内置函数的方法首先,打开python自带的集成开发环境IDLE;IDLE打开后,在IDLE中输入"dir(__builtins__)"命令,查看所有内置函数;查看到所有的内置函数后,即可使用help方法某个内...
    99+
    2024-04-02
  • 如何使用 PHP 内置函数处理字符串?
    如何使用 PHP 内置函数处理字符串 简介PHP 提供了丰富的内置函数来处理字符串,这些函数提供了强大的文本操作功能。在这篇文章中,我们将探讨使用这些函数处理字符串的各种方法。 字符串...
    99+
    2024-04-22
    php 字符串处理 排列 lsp
  • 如何使用 PHP 内置的函数调试工具?
    php 内置调试工具包括 print_r()、var_dump() 和 debug_backtrace() 等,它们分别以人可读方式打印变量值、提供更详细的变量信息,以及创建包含调用堆栈...
    99+
    2024-04-18
    php 调试工具
  • C++宏函数和内联函数如何使用
    今天小编给大家分享一下C++宏函数和内联函数如何使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。1. 宏常量&宏函...
    99+
    2023-07-02
  • 如何使用 PHP 内置函数执行数据库查询?
    php 内置函数可用于执行数据库查询,包括:mysqli_query():执行查询并返回结果。pdostatement:准备查询并绑定参数以防止 sql 注入。mysqli_affect...
    99+
    2024-04-23
    php 数据库查询 mysql
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作