返回顶部
首页 > 资讯 > 数据库 >「Flink」使用Managed Keyed State实现计数窗口功能
  • 671
分享到

「Flink」使用Managed Keyed State实现计数窗口功能

「Flink」使用ManagedKeyedState实现计数窗口功能 2016-03-16 12:03:33 671人浏览 猪猪侠
摘要

先上代码:public class WordCounTKEyedState { public static void main(String[] args) throws Exception { Strea

「Flink」使用Managed Keyed State实现计数窗口功能

先上代码:

public class WordCounTKEyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 初始化测试单词数据流
        DataStreamSource lineDS = env.addSource(new RichSourceFunction() {
            private boolean isCanaled = false;

            @Override
            public void run(SourceContext ctx) throws Exception {
                while(!isCanaled) {
                    ctx.collect("hadoop flink spark");
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        });

        // 切割单词,并转换为元组
        SingleOutputStreamOperator> wordTupleDS = lineDS.flatMap((String line, Collector> ctx) -> {
            Arrays.stream(line.split(" ")).forEach(word -> ctx.collect(Tuple2.of(word, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        // 按照单词进行分组
        KeyedStream, Integer> keyedWordTupleDS = wordTupleDS.keyBy(t -> t.f1);

        // 对单词进行计数
        keyedWordTupleDS.flatMap(new RichFlatMapFunction, Tuple2>() {

            private transient ValueState> countSumValueState;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化ValueState
                ValueStateDescriptor> countSumValueStateDesc = new ValueStateDescriptor("countSumValueState",
                        TypeInfORMation.of(new TypeHint>() {})
                );
                countSumValueState = getRuntimeContext().getState(countSumValueStateDesc);
            }

            @Override
            public void flatMap(Tuple2 value, Collector> out) throws Exception {
                if(countSumValueState.value() == null) {
                    countSumValueState.update(Tuple2.of(0, 0));
                }

                Integer count = countSumValueState.value().f0;
                count++;
                Integer valueSum = countSumValueState.value().f1;
                valueSum += value.f1;

                countSumValueState.update(Tuple2.of(count, valueSum));

                // 每当达到3次,发送到下游
                if(count > 3) {
                    out.collect(Tuple2.of(value.f0, valueSum));
                    // 清除计数
                    countSumValueState.update(Tuple2.of(0, valueSum));
                }
            }
        }).print();

        env.execute("KeyedState State");
    }
}

代码说明:

构建测试数据源,每秒钟发送一次文本,为了测试方便,这里就发一个包含三个单词的文本行

image

对句子按照空格切分,并将单词转换为元组,每个单词初始出现的次数为1

image

按照单词进行分组

image

自定义FlatMap

初始化ValueState,注意:ValueState只能在KeyedStream中使用,而且每一个ValueState都对一个一个key。每当一个并发处理ValueState,都会从上下文获取到Key的取值,所以每个处理逻辑拿到的ValueStated都是对应指定key的ValueState,这个部分是由Flink自动完成的。

image

注意:

带默认初始值的ValueStateDescriptor已经过期了,官方推荐让我们手动在处理时检查是否为空

instead and manually manage the default value by checking whether the contents of the state is null.


@Deprecated
public ValueStateDescriptor(String name, TypeSerializer typeSerializer, T defaultValue) {
super(name, typeSerializer, defaultValue);
}

逻辑实现

在flatMap逻辑中判断ValueState是否已经初始化,如果没有手动给一个初始值。并进行累加后更新。每当count > 3发送计算结果到下游,并清空计数。

image

您可能感兴趣的文档:

--结束END--

本文标题: 「Flink」使用Managed Keyed State实现计数窗口功能

本文链接: https://www.lsjlt.com/news/3967.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 「Flink」使用Managed Keyed State实现计数窗口功能
    先上代码:public class WordCountKeyedState { public static void main(String[] args) throws Exception { Strea...
    99+
    2016-03-16
    「Flink」使用Managed Keyed State实现计数窗口功能
  • 使用QGraphicsView实现气泡聊天窗口+排雷功能
    经过多方调查,用Qt实现气泡聊天窗口的方式有如下几个: 使用QWebEngineView控件内嵌html+CSS使用QTextEdit内嵌html使用QGraphicsView实现使...
    99+
    2024-04-02
  • 怎么在python中使用PyQt5实现一个窗口功能
    怎么在python中使用PyQt5实现一个窗口功能?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。python主要应用领域有哪些1、云计算,典型应用OpenSta...
    99+
    2023-06-14
  • 使用Reactor怎么实现一个Flink操作功能
    这篇文章给大家介绍使用Reactor怎么实现一个Flink操作功能,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。实现过程Flink对流式处理做的很好的封装,使用Flink的时候几乎不用关心线程池、积压、数据丢失等问题,...
    99+
    2023-06-06
  • 怎么使用electron实现百度网盘悬浮窗口功能
    这篇文章将为大家详细讲解有关怎么使用electron实现百度网盘悬浮窗口功能,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。相关依赖里面使用了vuex vue vue-ro...
    99+
    2024-04-02
  • 怎么使用QGraphicsView实现气泡聊天窗口+排雷功能
    这篇“怎么使用QGraphicsView实现气泡聊天窗口+排雷功能”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“怎么使用QG...
    99+
    2023-06-30
  • 如何使用Redis+Lua脚本实现计数器接口防刷功能
    这篇文章主要介绍如何使用Redis+Lua脚本实现计数器接口防刷功能,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!【实现过程】一、问题分析 如果set命令设置上,但是在设置失效时间时由于网络抖动等原因导致没...
    99+
    2023-06-29
  • C#实现简易计算器功能(1)(窗体应用)
    本文实例为大家分享了C#实现简易计算器功能的具体代码,供大家参考,具体内容如下 实现页面布局和数值初始化 using System; using System.Collections...
    99+
    2024-04-02
  • C#实现简易计算器功能(2)(窗体应用)
    本文实例为大家分享了C#实现简易计算器功能第二部分的具体代码,供大家参考,具体内容如下 初始化,实现四则运算 using System; using System.Collecti...
    99+
    2024-04-02
  • 使用vue实现计时器功能
    本文实例为大家分享了vue实现计时器功能的具体代码,供大家参考,具体内容如下 首先我们要知道setTimeout和setInterval的区别 setTimeout只在指定时间后执行...
    99+
    2024-04-02
  • 使用swift实现计算器功能
    关于计算器的实现在做之前想了几个方案。 首先是做一个输入功能,再以后缀表达式来进行计算,但是这个更适用于做一个科学计算器,在平日生活中的计算器需要一些便捷的计算效果。 所以实现这个计...
    99+
    2024-04-02
  • 如何使用Pandas实现MySQL窗口函数
    今天小编给大家分享一下如何使用Pandas实现MySQL窗口函数的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。一、前言环境:...
    99+
    2023-07-05
  • 如何使用css实现计时功能
    这篇文章主要为大家展示了“如何使用css实现计时功能”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“如何使用css实现计时功能”这篇文章吧。<!DOCTYP...
    99+
    2024-04-02
  • 使用Pandas怎么实现一个分组计数功能
    这篇文章将为大家详细讲解有关使用Pandas怎么实现一个分组计数功能,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。在对dataframe进行分析的时候会遇到需要分组计数,计数的column中...
    99+
    2023-06-14
  • 如何使用vue实现计时器功能
    小编给大家分享一下如何使用vue实现计时器功能,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!具体内容如下首先我们要知道setTimeout和setInterval的区别setTimeout只在指定时间后执行一次,代码如下:...
    99+
    2023-06-20
  • 使用react+redux实现计数器功能及遇到问题
    Redux,本身就是一个单纯的状态管理者,我们不追溯它的历史,从使用角度来说:它提供一个全局的对象store,store中包含state对象用以包含所有应用数据,并且store提供了...
    99+
    2024-04-02
  • 使用react和redux怎么实现一个计数器功能
    本篇文章给大家分享的是有关使用react和redux怎么实现一个计数器功能,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。Redux,本身就是一个单纯的状态管理者,我们不追溯它的...
    99+
    2023-06-15
  • 使用Pandas实现MySQL窗口函数的解决方法
    目录一、前言二、语法对比数据表row_number()lead()/lag()rank()/dense_rank()first_value()count()/sum()三、小结一、前...
    99+
    2023-02-22
    Pandas 窗口函数 Pandas mysql窗口函数
  • 如何利用Redis实现数据统计功能
    Redis是一种高效的内存数据库,可以被广泛应用于数据统计功能的实现中。本文将介绍如何使用Redis来实现数据统计功能,并提供具体实现的代码示例。统计计数器在很多场景下,需要对某个事件或对象的数量进行统计。这时候可以使用Redis的计数器功...
    99+
    2023-11-07
    数据聚合 实时统计 Redis 数据统计 Redis 统计实现
  • Redis+Lua脚本实现计数器接口防刷功能(升级版)
    目录【前言】【实现过程】一、问题分析二、解决方案三、代码改造【总结】【前言】 Cash Loan(一):Redis实现计数器防刷 中介绍了项目中应用redis来做计数器的实现过程,最...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作