sparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数据);而Pull属于拉取(Spark
导入两个jar包到flume/lib下
org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink
java.lang.IllegalStateException: begin() called when transaction is OPEN!
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding
a1.sources.r1.fileHeader=true
# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
# 当前主机端口
a1.sinks.k1.hostname = 192.168.137.88
a1.sinks.k1.port = 9999
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
package day02
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingFlume {
def main(args: Array[String]): Unit = {
//1.创建SparkConf对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume")
//2.创建SparkContext对象
val sc = new SparkContext(conf)
//设置日志输出格式,只打印异常日志,在这里设置没有用
//sc.setLogLevel("WARN")
//3.创建StreamingContext,Seconds(5):轮询机制,多久执行一次
val ssc = new StreamingContext(sc, Seconds(5))
//4.定义一个flume集合,可以接受多个flume数据,多个用,隔开需要new
val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555))
//5.获取flume中的数据,
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2)
// 6.截取flume数据:{"header":xxxxx "body":xxxxxx}
val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
向指定目录上传文件
--结束END--
本文标题: SparkStreaming两种方式连接Flume
本文链接: https://www.lsjlt.com/news/8179.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-05-21
2024-05-21
2024-05-21
2024-05-21
2024-05-21
2024-05-21
2024-05-21
2024-05-21
2024-05-21
2024-05-21
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0