广告
返回顶部
首页 > 资讯 > 数据库 >基于Storm的WordCount
  • 403
分享到

基于Storm的WordCount

基于Storm的WordCount 2016-06-19 00:06:07 403人浏览 绘本
摘要

StORM WordCount 工作过程 Storm 版本: 1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去; 2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去

StORM WordCount 工作过程

Storm 版本:
1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去;
2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;
3、WordCountBolt 接收 SplitBolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

Java 版本:
1、读取文件中的数据,一行一行的读取;
2、将读到的数据进行切割;
3、对切割后的数组中的单词进行计算。

hadoop 版本:
1、按行读取文件中的数据;
2、在 Mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;
3、接收 Mapper()中输出的数据数组,在 Reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

源代码

storm的配置、eclipse里Maven的配置以及创建项目部分省略。

Mainclass

package com.test.stormwordcount;
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 

public class MainClass { 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {         
        //创建一个 TopologyBuilder         
        TopologyBuilder tb = new TopologyBuilder();         
        tb.setSpout("SpoutBolt", new SpoutBolt(), 2);         tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");         
        tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));         
        //创建配置         
        Config conf = new Config();         
        //设置 worker 数量         
        conf.setNumWorkers(2);         
        //提交任务         
        //集群提交         
        //StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());         
        //本地提交         
        LocalCluster localCluster = new LocalCluster();         
        localCluster.submitTopology("myWordcount", conf, tb.createTopology()); 
    }  
} 

SplitBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class SplitBolt extends BaseRichBolt{      
    OutputCollector collector; 

         
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 

         
    public void execute(Tuple input) {         
        String line = input.getString(0);         
        String[] split = line.split(" ");         
        for (String word : split) {             
            collector.emit(new Values(word));         
            }     
        } 

         
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("word"));     
        } 
} 

CountBolt 部分

package com.test.stormwordcount;
import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Tuple; 

public class CountBolt extends BaseRichBolt{ 

    OutputCollector collector;
    Map map = new HashMap(); 

         
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 


         
public void execute(Tuple input) {         
    String word = input.getString(0);         
    if(map.containsKey(word)){             
    Integer c = map.get(word);             
        map.put(word, c+1);         
        }else{             
        map.put(word, 1);         
        }         
    //测试输出         
    System.out.println("结果:"+map);     
    } 

         
public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    
} 
} 

SpoutBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class SpoutBolt extends BaseRichSpout{ 

    SpoutOutputCollector collector;
         
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {         
        this.collector = collector;     
        } 

         
    public void nextTuple() {         
        collector.emit(new Values("hello world this is a test"));     
        } 

         
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("test"));     
        } 
} 

POM.XML 文件内容


4.0.0

com.test
stormwordcount
0.9.6
jar

stormwordcount
Http://maven.apache.org


    UTF-8


    
        junit
        junit
        3.8.1
        test
    
    
        org.apache.storm
        storm-core
        0.9.6
    


    
        
            maven-assembly-plugin
            
                
                    jar-with-dependencies
                
                
                    
                        com.test.stormwordcount.MainClass
                    
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
        
            org.apache.maven.plugins
            maven-compiler-plugin
            
                1.7
                1.7
            
        
    

遇到的问题

基于Storm的WordCount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为Eclipse IDE for Eclipse Committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:
使用链接下载,Help->Install New SoftWare

点击Add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取
点击连接:http://www.eclipse.org/m2e/index.html 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中Contack all update sites during install to find required software(耗时太久)。

但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自己maven插件的JavaEE IDE了...

后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 Windows->Preferencs->Maven->Installation发现之前配置了的maven的安装路径没了...重新配置了下就可以创建项目了。

最后运行成功的结果:

您可能感兴趣的文档:

--结束END--

本文标题: 基于Storm的WordCount

本文链接: https://www.lsjlt.com/news/3306.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 基于Storm的WordCount
    Storm WordCount 工作过程 Storm 版本: 1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去; 2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去...
    99+
    2016-06-19
    基于Storm的WordCount
  • 如何分析基于Spark Streaming Direct方式的WordCount
    这期内容当中小编将会给大家带来有关如何分析基于Spark Streaming Direct方式的WordCount,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.前提 a. flume 收集--》flu...
    99+
    2023-06-03
  • 基于Python的selenium
    一、安装 1.1安装Python,安装Python时需要勾选增加环境变量 如果之前已经安装过Python,需要将Python相关文件以及环境变量删除 1.2安装成功:在命令行界面下输入Python,最...
    99+
    2023-09-15
    python selenium pycharm
  • 基于Django 的 FreeSwitc
    YouPBX 是一个强大 FreeSwift (电话软交换系统) 的管理GUI系统,基于Django开发,功能全面,体验友好,可以基于此项目做一个完善的IPPBX系统、呼叫中心应用等   https://github.com/Jone...
    99+
    2023-01-30
    Django FreeSwitc
  • 基于python2.7的opencv3.
    当初一开始就是如此设想,通过opencv获取视频(摄像头)的图片帧,图像处理识别之后加工(绘制)图片,并把该图片作为视频流的一帧推送rtmp,然后远端直播,之间走了很多很多弯路(甚至想要手动实现rtmp推流)也就是了,搜索了一两周...
    99+
    2023-01-31
  • 以太网 VLAN的5种划分方式(基于端口、基于MAC地址、基于IP子网、基于协议、基于策略)介绍与基础配置命令
    2.8.3 以太网 VLAN(VLAN划分方式) VLAN的划分方式有 2.8.3 以太网 VLAN(VLAN划分方式)一、基于端口划分二、基于MAC地址划分三、基于IP子网划分四、基于协议划...
    99+
    2023-09-04
    网络 华为
  • 基于python 3 的selenium
    本文主要是运用selenium模块模拟登陆新浪微博 python webdriver环境搭建教程:http://blog.csdn.net/nanjunxiao/article/details/7957326 # -*- c...
    99+
    2023-01-31
    python selenium
  • 基于 Python 和 Pandas 的
    Pandas 是 Python 的一个模块(module), 我们将用 Python 完成接下来的数据分析的学习. Pandas 模块是一个高性能,高效率和高水平的数据分析库. 从本质上讲,它非常像操作电子表格的无头版本,如Excel....
    99+
    2023-01-30
    Python Pandas
  • Django 基于 jquery 的 a
    <1> $.ajax的两种写法: $.ajax("url",{}) $.ajax({}) <2> $.ajax的基本使用 $.ajax({ url:"//", data:{a...
    99+
    2023-01-31
    Django jquery
  • Oracle Study之--基于ASM的TSPITR(基于表空间的完全恢复)
    Oracle Study之--基于ASM的TSPITR(基于表空间的完全恢复)系统环境:操作系统:AIX5.3-08数据库:  Oracle 10gR2Understanding RMAN TSP...
    99+
    2022-10-18
  • 如何配置LINUX系统apache基于IP,基于port和基于域名的三种虚拟主机
    这篇文章主要介绍“如何配置LINUX系统apache基于IP,基于port和基于域名的三种虚拟主机”,在日常操作中,相信很多人在如何配置LINUX系统apache基于IP,基于port和基于域名的三种虚拟主机问题上存在疑惑,小编查阅了各式资...
    99+
    2023-06-10
  • MySQL:安装和基于SSL加密的主从复制(基于5.7)
       小生博客:http://xsboke.blog.51cto.com             ...
    99+
    2022-10-18
  • 【基础】 mysqldump 创建基于GTID的从库
    对于小型的数据库,我们可以直接使用mysqldump全库导出导入来创建从库。试验环境:  CentOS6.8 x86_64  MySQL5.6.34 社区rpm版  主库:no...
    99+
    2022-10-18
  • 基于Sanic的微服务基础架构
    使用python做web开发面临的一个最大的问题就是性能,在解决C10K问题上显的有点吃力。有些异步框架Tornado、Twisted、Gevent 等就是为了解决性能问题。这些框架在性能上有些提升,但是也出现了各种古怪的问题难以解决。...
    99+
    2023-01-31
    架构 基础 Sanic
  • 基于pytorch的PINN代码
    import torchimport torch.autograd as autograd # computation graphfrom torch import Tensor # ten...
    99+
    2023-09-09
    python django mysql memcached
  • 基于函数的索引
    以下内容摘自《Oracle SQL 高级编程》 第12.4.2章节-基于函数的索引[其中代码部分被修改,原始请参考书籍]如果一个谓语在索引列上应用了函数,则优化器不会选用该列上的索引。例如,对于谓...
    99+
    2022-10-18
  • MariaDB基于GTID的复制
    1、配置主从节点的服务配置文件1.1、配置master节点:#binlog_format=mixedbinlog-format=ROW# required unique id between 1 and 2...
    99+
    2022-10-18
  • 基于lnmp的Discuz论坛
                      ------------- LNMP + Discuz -------...
    99+
    2022-10-18
  • CentOS 7 基于fastcgi 的lamp
      实验环境  CentOS 7, lamp (php-fpm) 要求:(1) 三者分离于三台主机;  (2) 一个虚拟主机用于提供phpMyAdmin;另一个虚拟...
    99+
    2022-10-18
  • 基于Java的Scoket编程
    目录一,网络编程中两个主要的问题二,两类传输协议:TCP和UDPTCP和UDP的区别三,基于Socket的java网络编程1、什么是Socket2、Socket通讯的过程3、创建So...
    99+
    2022-11-12
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作