iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >SparkStreaming编程初级实践详解
  • 398
分享到

SparkStreaming编程初级实践详解

SparkStreaming编程初级SparkStreaming 2023-05-16 17:05:50 398人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

目录写在前面1. 安装Flume安装命令2.使用Avro数据源测试Flume题目描述Flume配置文件执行命令执行结果如下3. 使用netcat数据源测试Flume题目描述编写Flu

写在前面

1. 安装Flume

Flume是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。Flume 的核心是把数据从数据源收集过来,再送到目的地。请到Flume官网下载Flume1.7.0安装文件,下载地址如下:

www.apache.org/dyn/closer.…

或者也可以直接到本教程官网的“下载专区”中的“软件”目录中下载apache-flume-1.7.0-bin.tar.gz。

下载后,把Flume1.7.0安装到Linux系统的“/usr/local/flume”目录下,具体安装和使用方法可以参考教程官网的“实验指南”栏目中的“日志采集工具Flume的安装与使用方法。

安装命令

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
mv apache-flume-1.9.0-bin/ flume-1.9.0
sudo vi /etc/profile
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
mv flume-env.sh.template flume-env.sh
  • 查看版本号
bin/flume-ng version

2.使用Avro数据源测试Flume

题目描述

Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO rpc机制。请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动Flume以后,可以把helloworld.txt中的文本内容显示出来。

Flume配置文件

al.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels= c1
a1.sources.r1.bind = 0.0.0.0
al.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
al.channels.c1.capacity = 1000
a1.channels.c1.transaction = 100
al.sources.r1.channels = c1
a1.sinks.k1.channel=c1

执行命令

  • 先进入到Flume安装目录,执行以下第一行命令;
  • 开始新的一个会话窗口,执行第二行命令写入数据到指定的文件中
  • 查看上一步骤中指定的文件内容
./bin/flume-ng agent -c . -f ./conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
echo 'hello,world' >> ./log.00
bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F ./log.00

执行结果如下

3. 使用netcat数据源测试Flume

题目描述

请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个Linux终端(这里称为“Flume终端”)中,启动Flume,在另一个终端(这里称为“Telnet终端”)中,输入命令“telnet localhost 44444”,然后,在Telnet终端中输入任何字符,让这些字符可以顺利地在Flume终端中显示出来。

编写Flume配置文件

al.sources = r1
a1.sinks = k1
a1.channels = c1
al.sources.r1.type = netcat
al.sources.r1.channels = c1
a1.sources.r1.bind = localhost
al.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
al.channels.c1.transaction = 100
al.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 执行以下命令
./bin/flume-ng agent -c . -f ./netcatExample.conf -n a1 -Dflume.root.logger=INFO,console
telnet localhost 44444
  • 会话窗口成功得到数据

4. 使用Flume作为Spark Streaming数据源

题目描述

Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。

编写Flume配置文件

al.sources = r1
a1.sinks = k1
a1.channels =  c1
al.sources.r1.type = netcat
al.sources.r1.bind = localhost
a1.sources.r1.port = 33333
a1.sinks.k1.type = avro
al.sinks.k1.hostname = localhost
a1.sinks.k1.port = 44444
a1.channels.c1.type = memory
al.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000000
al.sources.r1.channels = c1
a1.sinks.k1.channel = c1

主程序代码

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.Milliseconds
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {
    def main(args: Array[String]): Unit = {
        if (args.length < 2) {
            System.err.println( "Usage: FlumeEventCount <host> <port>")
            System.exit(1)
        }
        StreamingExamples.setStreamingLogLevels()
        val Array(host, IntParam(port)) = args
        val batchInterval = Milliseconds(2000)
        val sc = new SparkConf()
          .setAppName("FlumeEventCount")
//          .setMaster("local[2]")
        val ssc = new StreaminGContext(sc, batchInterval)
        val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
        stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
        ssc.start()
        ssc.awaitTermination()
    }
}

执行结果1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {
    def setStreamingLogLevels(): Unit = {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
        if (!log4jInitialized) {
            logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.")
            Logger.getRootLogger.setLevel(Level.WARN)
        }
    }
}

执行结果2

以上就是Spark Streaming编程初级实践详解的详细内容,更多关于Spark Streaming编程的资料请关注编程网其它相关文章!

--结束END--

本文标题: SparkStreaming编程初级实践详解

本文链接: https://www.lsjlt.com/news/210410.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • SparkStreaming编程初级实践详解
    目录写在前面1. 安装Flume安装命令2.使用Avro数据源测试Flume题目描述Flume配置文件执行命令执行结果如下3. 使用netcat数据源测试Flume题目描述编写Flu...
    99+
    2023-05-16
    Spark Streaming编程初级 Spark Streaming
  • SparkSQL编程初级实践详解
    目录写在前面第1题:Spark SQL 基本操作主程序代码主程序执行结果第2题:编程实现将 RDD 转换为 DataFrame题目主程序代码主程序执行结果第3题:编程实现利用 Dat...
    99+
    2023-05-16
    Spark SQL编程实战 Spark SQL
  • Go语言初探:编译过程详解
    标题:Go语言初探:编译过程详解 Go语言是一种开源的编译型静态语言,由谷歌开发,旨在提升编程效率和简化工程管理。作为一种现代化的编程语言,它具有出色的并发支持和高效的垃圾回收机制,同...
    99+
    2024-04-02
  • PHP高级编程技巧详解
    非常抱歉,由于您没有提供文章标题,我无法为您生成一篇高质量的文章。请您提供文章标题,我将尽快为您生成一篇优质的文章。...
    99+
    2024-05-16
  • Python面向对象编程-初级篇
    前言面向对象 : 采用基于对象(实体) 的概念建立模型,模拟客观世界分析、设计、实现软件的办法。面向对象编程(Object-oriented Programming,简称 OOP)是一种解决软件复用的设计和编程方法,把软件系统中相近相似的操...
    99+
    2023-05-14
    Python 面向 对象
  • C++元编程语言初步入门详解
    目录模板泛型初步函数模板友元模板参数元编程的基本概念可变参数模板模板 由于模板元编程需要以面向对象为基础,所以如有疑问之处可以先补充一点C++面向对象的知识: C++面向对象这一篇就...
    99+
    2024-04-02
  • PHP高级特性:安全编程的最佳实践
    非常抱歉,由于您没有提供文章标题,我无法为您生成一篇高质量的文章。请您提供文章标题,我将尽快为您生成一篇优质的文章。...
    99+
    2024-05-15
  • Golang实现Json分级解析及数字解析实践详解
    目录一、背景介绍二、解决方案(1)将Json直接解析为map(2)解析部分json struct的方法 (json.RawMessage的用法)(3) json.Number类型的使...
    99+
    2023-02-14
    Golang Json分级解析 Golang Json解析 Golang Json
  • Python图形编程探索系列-01-初级
    设计一个主窗口,在其中添加三个标签和三个按钮,当点击按钮时,对标签的内容和色彩进行修改。 import tkinter as tk root = tk.Tk() def f1(): label1.config(text='...
    99+
    2023-01-30
    图形 系列 Python
  • JavaScript异步编程之Promise的初步使用详解
    1. 概述 Promise对象是ES6提出的的异步编程的规范。说到异步编程,就不得不说说同步和异步这两个概念。 从字面意思理解同步编程的话,似乎指的是两个任务同步运行,如果这样理解就...
    99+
    2024-04-02
  • 【JavaEE初阶】 线程池详解与实现
    文章目录 🌴线程池的概念🎄标准库中的线程池🍀ThreadPoolExecutor 类🚩corePoolSize与maximumP...
    99+
    2023-10-25
    java-ee java 开发语言 jdk 计算机操作系统 线程池
  • Golang并发编程之调度器初始化详解
    目录0. 简介1. 一些全局变量2. main函数之前2.1 初始化g02.2 主线程与m0的绑定2.3 m0和g0的绑定2.4 调度器的初始化0. 简介 上一篇博客简单介绍了GMP...
    99+
    2023-03-22
    Golang调度器初始化 Golang调度器 Go 调度器
  • PHP高级特性:面向对象编程的最佳实践
    非常抱歉,由于您没有提供文章标题,我无法为您生成一篇高质量的文章。请您提供文章标题,我将尽快为您生成一篇优质的文章。...
    99+
    2024-05-16
  • PHP中的OOP编程实践
    随着互联网的发展,PHP作为一种非常流行的服务器端编程语言,成为了很多Web开发人员的首选。随着技术的发展和语言本身的改进,越来越多的PHP开发者开始采用面向对象编程(OOP)的方式来进行开发。在本文中,我们将讨论PHP中的OOP编程实践。...
    99+
    2023-05-25
    实践 PHP OOP
  • 头歌实践教学平台Python-Python第二章作业(初级)
    第1关:三角形周长及面积 任务描述 输入的三角形的三条边a、b、c 的长度,计算并依次输出三角形的周长和面积,结果严格保留2位小数。测试用例的数据保证三角形三边数据可以构成三角形。 三角形面积计算公式: ,其中s=(a+b+c)/2。 a...
    99+
    2023-09-22
    python 单元测试
  • kafka生产实践(详解)
    1.引言最近接触到一个APP流量分析的项目,类似于友盟。涉及到几个C端(客户端)高并发的接口,这几个接口主要用于C端数据的提交。在没有任何缓冲的情况下,一个接口涉及到5张表的提交。压测的结果很不理想,主要瓶颈就在与RDS的交互。一台双核,1...
    99+
    2023-05-31
    kafka 实践 生产
  • PHP面向对象编程:高级特性详解
    php 的 oop 高级特性包括:接口:定义方法,确保不同类具有相似行为。多态性:子类对象实现父类方法,提供灵活性。命名空间:组织代码,避免命名冲突。特性:复用代码,无需继承即可添加方法...
    99+
    2024-05-10
    php 面向对象编程 php面向对象编程
  • LeetCode路径指南:Java编程的高级技巧和最佳实践。
    LeetCode路径指南:Java编程的高级技巧和最佳实践 LeetCode是一个非常受欢迎的在线编程平台,它提供了海量的算法题目,帮助程序员提升编程技能。Java作为一门流行的编程语言,也在LeetCode上得到了广泛应用。但是,如何才能...
    99+
    2023-09-24
    leetcode path 分布式
  • C#接口编程详细降级
    这篇文章主要介绍“C#接口编程详细降级”,在日常操作中,相信很多人在C#接口编程详细降级问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”C#接口编程详细降级”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!接口...
    99+
    2023-06-17
  • KubeSphere分级管理实践及解析
    目录前言为什么要在 KuberSphere 上实现分级管理什么是分级体系如何实现分级管理如何实现资源的升降级不同层级间 Pod 的网络隔离总结前言 K8s 是容器编排和分布式应用部署...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作