广告
返回顶部
首页 > 资讯 > 精选 >Java Flink窗口触发器Trigger怎么使用
  • 869
分享到

Java Flink窗口触发器Trigger怎么使用

2023-07-02 16:07:02 869人浏览 泡泡鱼
摘要

这篇文章主要介绍“Java flink窗口触发器Trigger怎么使用”,在日常操作中,相信很多人在Java Flink窗口触发器Trigger怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对

这篇文章主要介绍“Java flink窗口触发器Trigger怎么使用”,在日常操作中,相信很多人在Java Flink窗口触发器Trigger怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java Flink窗口触发器Trigger怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

定义

Trigger确定窗口(由窗口分配器形成)何时准备好由窗口函数处理。每个WindowAssigner都带有一个默认值Trigger。如果默认触发器不符合您的需求,您可以使用trigger(…)。

Trigger 源码

public abstract class Trigger<T, W extends Window> implements Serializable {    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;     public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;     public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {        throw new UnsupportedOperationException("This trigger does not support merging.");    }    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;    }

TriggerResult 源码

public enum TriggerResult {// 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。    CONTINUE(false, false),    // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。    FIRE_AND_PURGE(true, true),    // 触发窗口计算,但是保留窗口元素    FIRE(true, false),    // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。    PURGE(false, true);    private final boolean fire;    private final boolean purge;    private TriggerResult(boolean fire, boolean purge) {        this.purge = purge;        this.fire = fire;    }    public boolean isFire() {        return this.fire;    }    public boolean isPurge() {        return this.purge;    }}

一旦触发器确定窗口已准备好进行处理,就会触发,返回状态可以是FIRE或FIRE_AND_PURGE。其中FIRE是触发窗口计算并保留窗口内容,而FIRE_AND_PURGE是触发窗口计算并删除窗口内容。默认情况下,预实现的触发器只是简单地FIRE不清除窗口状态。

Flink 预置的Trigger

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。

  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。

  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。

  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。

  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。

  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。

  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。

  • NeverTrigger:任何时候都不触发窗口计算

Java Flink窗口触发器Trigger怎么使用

主要看看EventTimeTrigger和ProcessingTimeTrigger的源码。

EventTimeTrigger源码

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {    private static final long serialVersionUID = 1L;    private EventTimeTrigger() {    }    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {            return TriggerResult.FIRE;        } else {            ctx.reGISterEventTimeTimer(window.maxTimestamp());            return TriggerResult.CONTINUE;        }    }    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;    }    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {        return TriggerResult.CONTINUE;    }    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {        ctx.deleteEventTimeTimer(window.maxTimestamp());    }    public boolean canMerge() {        return true;    }    public void onMerge(TimeWindow window, OnMergeContext ctx) {        long windowMaxTimestamp = window.maxTimestamp();        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {            ctx.registerEventTimeTimer(windowMaxTimestamp);        }    }    public String toString() {        return "EventTimeTrigger()";    }    public static EventTimeTrigger create() {        return new EventTimeTrigger();    }}

ProcessingTimeTrigger源码

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {    private static final long serialVersionUID = 1L;    private ProcessingTimeTrigger() {    }    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {        ctx.registerProcessingTimeTimer(window.maxTimestamp());        return TriggerResult.CONTINUE;    }    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {        return TriggerResult.CONTINUE;    }    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {        return TriggerResult.FIRE;    }    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {        ctx.deleteProcessingTimeTimer(window.maxTimestamp());    }    public boolean canMerge() {        return true;    }    public void onMerge(TimeWindow window, OnMergeContext ctx) {        long windowMaxTimestamp = window.maxTimestamp();        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {            ctx.registerProcessingTimeTimer(windowMaxTimestamp);        }    }    public String toString() {        return "ProcessingTimeTrigger()";    }    public static ProcessingTimeTrigger create() {        return new ProcessingTimeTrigger();    }}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。

需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。

常见窗口的Trigger

滚动窗口

TumblingEventTimewindows :EventTimeTriggerpublic class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {            return EventTimeTrigger.create();        }}

TumblingProcessingTimeWindows :ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {        return ProcessingTimeTrigger.create();    }}

滑动窗口

SlidingEventTimeWindows:EventTimeTriggerpublic class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {        return EventTimeTrigger.create();    }}

SlidingProcessingTimeWindows :ProcessingTimeTrigger

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {            return ProcessingTimeTrigger.create();        }}

会话窗口

EventTimeSessionWindows:EventTimeTriggerpublic class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {        return EventTimeTrigger.create();    }}

ProcessingTimeSessionWindows:ProcessingTimeTrigger

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {        return ProcessingTimeTrigger.create();    }}

全局窗口

GlobalWindows :NeverTriggerpublic class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {     public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {            return new GlobalWindows.NeverTrigger();        }}

到此,关于“Java Flink窗口触发器Trigger怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: Java Flink窗口触发器Trigger怎么使用

本文链接: https://www.lsjlt.com/news/343047.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Java Flink窗口触发器Trigger怎么使用
    这篇文章主要介绍“Java Flink窗口触发器Trigger怎么使用”,在日常操作中,相信很多人在Java Flink窗口触发器Trigger怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对...
    99+
    2023-07-02
  • JavaFlink窗口触发器Trigger的用法详解
    目录定义Trigger 源码TriggerResult 源码Flink 预置的TriggerEventTimeTrigger源码ProcessingTimeTrigger源码常见窗口...
    99+
    2022-11-13
  • Oracle触发器trigger怎么使用
    Oracle触发器(trigger)用于在指定的数据库操作发生时自动执行一段特定的代码,可以用于数据插入、更新或删除时执行特定的操作...
    99+
    2023-08-15
    Oracle trigger
  • MySQL的触发器trigger怎么使用
    MySQL的触发器(trigger)可以在特定的数据库操作发生时自动执行一系列的SQL语句。触发器可以在插入、更新或删除数据时触发执...
    99+
    2023-08-11
    MySQL trigger
  • MySQL数据库触发器trigger怎么使用
    这篇文章主要讲解了“MySQL数据库触发器trigger怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MySQL数据库触发器trigger怎么使用”吧!一、基本概念触发器是一种特殊类...
    99+
    2023-07-02
  • 数据库中触发器trigger怎么用
    这篇文章主要介绍了数据库中触发器trigger怎么用,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。 实验如下:-...
    99+
    2022-10-19
  • Java Quartz触发器CronTriggerBean怎么使用
    要使用Java Quartz触发器CronTriggerBean,您需要按照以下步骤进行操作:1. 创建一个CronTriggerB...
    99+
    2023-08-08
    Java CronTriggerBean
  • SQLServer触发器怎么调用JavaWeb接口
    这篇文章主要为大家展示了“SQLServer触发器怎么调用JavaWeb接口”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“SQLServer触发器怎么调用JavaWeb接口”这篇文章吧。这几天接...
    99+
    2023-06-22
  • Oracle DML触发器和DDL触发器怎么使用
    今天小编给大家分享一下Oracle DML触发器和DDL触发器怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧...
    99+
    2023-07-05
  • java中怎么使用swing组件窗口
    要使用Swing组件创建窗口,你可以按照以下步骤进行操作:1. 导入Swing库中的相关类:```javaimport javax....
    99+
    2023-08-15
    java swing
  • 怎么使用sql触发器
    本篇文章给大家分享的是有关怎么使用sql触发器,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。  sql中的触发器是对某个表进行操作时...
    99+
    2022-10-18
  • 怎么使用MySQL触发器
    这篇文章主要讲解了怎么使用MySQL触发器,内容清晰明了,对此有兴趣的小伙伴可以学习一下,相信大家阅读完之后会有帮助。一、MySQL触发器创建:1、MySQL触发器的创建语法:CREATE [DEFINER...
    99+
    2022-10-18
  • SQL SERVER触发器怎么使用
    SQL Server触发器可以在数据库中的表上定义,当满足特定条件时,触发器会自动执行一些操作。以下是使用SQL Server触发器...
    99+
    2023-08-18
    SQL SERVER
  • 怎么创建和使用mysql触发器
    这篇文章主要讲解了怎么创建和使用mysql触发器,内容清晰明了,对此有兴趣的小伙伴可以学习一下,相信大家阅读完之后会有帮助。什么是触发器 触发器用来在某些操作之后/之前,“自动”执行一些操作。(比...
    99+
    2022-10-18
  • MySQL触发器怎么创建和使用
    这篇文章主要介绍“MySQL触发器怎么创建和使用”,在日常操作中,相信很多人在MySQL触发器怎么创建和使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”MySQL触发器怎么...
    99+
    2022-10-19
  • Mysql触发器怎么定义与使用
    这篇“Mysql触发器怎么定义与使用”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Mysql触发器怎么定义与使用”文章吧。一...
    99+
    2023-07-04
  • 亚马逊s3接口触发调用服务器错误怎么办
    1. 检查 S3 触发器配置 首先,您需要检查 S3 触发器的配置是否正确。确保您已正确设置了 S3 触发器,包括正确的桶名称、前缀和后缀等。如果您的配置有误,可能会导致触发器无法正常工作,从而导致服务器错误。 2. 检查服务器日志 如果...
    99+
    2023-10-27
    亚马逊 接口 错误
  • 亚马逊s3接口触发调用服务器异常怎么办
    如果您的亚马逊S3接口触发调用服务器异常,可以尝试以下几个步骤来解决问题: 检查服务器配置:确保您的服务器配置正确,包括网络连接、端口设置、防火墙和安全组等。如果您使用的是云服务器,还需要检查云服务商的相关设置。 检查代码逻辑:检查您的...
    99+
    2023-10-26
    亚马逊 接口 异常
  • 亚马逊s3接口触发调用服务器异常怎么回事
    1. 概述 当使用亚马逊S3接口触发调用服务器时,如果出现异常,可能是由于以下原因导致的: 服务器配置不正确 代码错误 亚马逊S3服务异常 本文将介绍如何解决这些问题。 2. 服务器配置不正确 如果服务器配置不正确,可能会导致无法正常...
    99+
    2023-10-26
    亚马逊 怎么回事 异常
  • 亚马逊s3接口触发调用服务器异常怎么回事啊
    这种情况下,通常可以通过以下方法进行排查: 检查服务器日志,找到异常的具体原因。 检查是否存在网络问题,确保能够及时切换到其他网络环境。 检查是否存在应用程序崩溃或其他性能问题,尝试通过重启应用程序或升级应用程序等方式解决。 检查服务器...
    99+
    2023-10-27
    亚马逊 怎么回事 异常
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作