iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 > flink 使用sql实现kafka生产者和消费者
  • 133
分享到

flink 使用sql实现kafka生产者和消费者

摘要

Maven依赖 UTF-8 1.8 1.8 1.11.2 1.1.7 1.7.25


	flink 使用sql实现kafka生产者和消费者
[数据库教程]

Maven依赖


        UTF-8
        1.8
        1.8
        1.11.2
        1.1.7
        1.7.25
    

    
        
            
            org.apache.flink
            flink-JSON
            ${flink.version}
        

        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-wikiedits_2.12
            ${flink.version}
        

        
            org.apache.flink
            flink-table-planner_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java
            ${flink.version}
        
        
            org.slf4j
            slf4j-api
            ${slf4j.version}
        
        
            ch.qos.logback
            logback-core
            ${logback.version}
        
        
            ch.qos.logback
            logback-classic
            ${logback.version}
        

        
            org.projectlombok
            lombok
            1.16.18
        
    

生产者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.$;


//@Slf4j
public class KafkaTableStreamApiProducerTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

        Long baseTimestamp = 1600855709000L;
        DataStream eventDataSet = env.fromElements(
                new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
        );

        String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
                "customerId int,
" +
                "oldStatus int,
" +
                "newStatus int,
" +
                "eventTime bigint
" +
                ") WITH(
" +
                "‘connector.type‘=‘kafka‘,
" +
                "‘connector.version‘=‘universal‘,
" +

                "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092‘,
" +
                "‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
               
                "‘fORMat.type‘=‘json‘
" +
                ")
"
                ;
        tableEnvironment.executesql(ddl);


        while (true) {
            try {
                TimeUnit.SECONDS.sleep(3);
                int status = (int) (System.currentTimeMillis() % 3);
                String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" +
                        "values(1001,1," + status + "," + System.currentTimeMillis() + ")";
                tableEnvironment.executeSql(insert);
            } catch (Exception ex) {

            }
        }

    }
}

 

消费者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


//@Slf4j
public class KafkaTableStreamApiConsumerTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

        Long baseTimestamp = 1600855709000L;
        DataStream eventDataSet = env.fromElements(
                new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
        );

        String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
                "customerId int,
" +
                "oldStatus int,
" +
                "newStatus int,
" +
                "eventTime bigint
" +
                ") WITH(
" +
                "‘connector.type‘=‘kafka‘,
" +
                "‘connector.version‘=‘universal‘,
" +
                "‘connector.properties.group.id‘=‘g2_group‘,
" +
                "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,
" +
                "‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
                "‘connector.startup-mode‘ = ‘latest-offset‘,
" +
                "‘format.type‘=‘json‘
" +
                ")
";
        tableEnvironment.executeSql(ddl);

        Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +
                " from CustomerStatusChangedEvent" +
                " where newStatus in(1,2)"
        );


    
        DataStream result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);
        result.print();

        try {
            env.execute();
        } catch (Exception ex) {

        }
    }

    public static class CustomerStatusLog {
        private Long customerId;

        private Integer status;

        public Long getCustomerId() {
            return customerId;
        }

        public void setCustomerId(Long customerId) {
            this.customerId = customerId;
        }

        public Integer getStatus() {
            return status;
        }

        public void setStatus(Integer newStatus) {
            this.status = newStatus;
        }

        public CustomerStatusLog() {

        }

        @Override
        public String toString() {
            return "CustomerStatusLog{" +
                    "customerId=" + customerId +
                    ", status=" + status +
                    ‘}‘;
        }
    }
}

 

消费者打印

4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}

flink 使用sql实现kafka生产者和消费者

原文地址:https://www.cnblogs.com/zhshlimi/p/13725081.html

您可能感兴趣的文档:

--结束END--

本文标题: flink 使用sql实现kafka生产者和消费者

本文链接: https://www.lsjlt.com/news/7924.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • kafka-3python生产者和消费者
    程序分为productor.py是发送消息端,consumer为消费消息端,启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,productor.py#!/usr/bin/env python2...
    99+
    2023-01-31
    生产者 消费者 kafka
  • Kafka中生产者和消费者指的是什么
    在Kafka中,生产者和消费者是指Kafka消息系统中参与消息传递的两种角色。 生产者是指负责向Kafka集群中的主题(topic)...
    99+
    2024-03-14
    Kafka
  • 使用 sarama 监控 Kafka 生产者和消费者的性能数据
    从现在开始,我们要努力学习啦!今天我给大家带来《使用 sarama 监控 Kafka 生产者和消费者的性能数据》,感兴趣的朋友请继续看下去吧!下文中的内容我们主要会涉及到等等知识点,如果在阅读本文过...
    99+
    2024-04-04
  • Golang rabbitMQ生产者和消费者怎么实现
    今天小编给大家分享一下Golang rabbitMQ生产者和消费者怎么实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一...
    99+
    2023-06-30
  • Golang rabbitMQ生产者消费者实现示例
    目录消费者生产者消费者 package main import ( "fmt" "github.com/streadway/amqp" ) func failOnError(er...
    99+
    2024-04-02
  • java 中怎么实现生产者消费者
    今天就跟大家聊聊有关java 中怎么实现生产者消费者,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。生产者消费者图存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,...
    99+
    2023-06-17
  • PHP实现生产者与消费者的案例
    这篇文章主要介绍PHP实现生产者与消费者的案例,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装...
    99+
    2023-06-14
  • python 的生产者和消费者模式
    目录python 的生产者和消费者模式一、生产者消费者模式概述二、为什么使用生产者消费者模式三、什么是生产者消费者模式四、代码案例1、定义一个生产者2、定义一个消费者3、定义一个队列...
    99+
    2024-04-02
  • java中BlockingQueue如何实现生产者消费者
    这篇文章主要为大家展示了“java中BlockingQueue如何实现生产者消费者”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“java中BlockingQueue如何实现生产者消费者”这篇文章...
    99+
    2023-05-30
    java blockingqueue
  • golang生产者消费者模式怎么实现
    在Go语言中,可以使用goroutine和channel来实现生产者消费者模式。 首先,我们定义一个包含生产者和消费者的函数: fu...
    99+
    2023-10-20
    golang
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者
    1、查看kafka队列中topic信息 1.1、查看所有topic ./kafka-topics.sh --zookeeper 10.128.106.52:2181 --list 1.2、查看kafka中指定topic的详情 ./kafk...
    99+
    2023-08-21
    kafka 分布式 java
  • java中生产者和消费者问题实例分析
    这篇“java中生产者和消费者问题实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“java中生产者和消费者问题实例分析...
    99+
    2023-06-29
  • 如何限制生产者和消费者读取消息?
    php小编子墨在软件开发过程中,消息队列是一种常见的通信机制,用于实现生产者和消费者之间的异步通信。然而,有时候我们希望控制生产者和消费者对消息的读取,以便更好地管理系统资源和处理高峰...
    99+
    2024-02-11
  • java wait()/notify() 实现生产者消费者模式详解
    java wait()/notify() 实现生产者消费者模式 java中的多线程会涉及到线程间通信,常见的线程通信方式,例如共享变量、管道流等,这里我们要实现生产者消费者模式,也需...
    99+
    2024-04-02
  • Docker怎么启动RabbitMQ实现生产者与消费者
    这篇文章主要讲解了“Docker怎么启动RabbitMQ实现生产者与消费者”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Docker怎么启动RabbitMQ实现生产者与消费者”吧!一、Doc...
    99+
    2023-07-05
  • Java 生产者/消费者问题实例详解
    生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去...
    99+
    2023-05-31
    java 生产者消费者 ava
  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能
    自然,依赖是少不了的。除了spring-boot-starter-web依赖外。 就这个是最主要的依赖了,其他的看着办就是了。我用的是gradle,用maven的看着弄也一样的。无非...
    99+
    2024-04-02
  • Java编程生产者消费者实现的四种方法
    目录实现生产者消费者的四种方式一、最基础的二、java.util.concurrent.lock 中的 Lock 框架三、阻塞队列BlockingQueue的实现Blockqueue...
    99+
    2024-04-02
  • C++实现简单的生产者-消费者队列详解
    本文的代码都是ChatGPT生成,我只是做了微小的调整和整合,AI提示词如下: 设计一个C++类,支持生产者-消费者模型,可以通过size函数获取剩余数量 可能第一次生成的不一定合适...
    99+
    2023-05-18
    C++实现生产者消费者队列 C++生产者消费者队列 C++队列
  • python多进程中的生产者和消费者模型怎么实现
    这篇文章主要介绍了python多进程中的生产者和消费者模型怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇python多进程中的生产者和消费者模型怎么实现文章都会有所收获,下面我们一起来看看吧。Pytho...
    99+
    2023-07-05
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作