iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >kafka+storm+hbase如何实现计算WordCount
  • 788
分享到

kafka+storm+hbase如何实现计算WordCount

2023-06-04 06:06:29 788人浏览 泡泡鱼
摘要

这篇文章主要介绍了kafka+stORM+HBase如何实现计算WordCount,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。kafka+storm+hbase实现计算Wo

这篇文章主要介绍了kafka+stORM+HBase如何实现计算WordCount,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

kafka+storm+hbase实现计算WordCount。

(1)表名:wc

(2)列族:result

(3)RowKey:word

(4)Field:count

1、解决:

1)第一步:首先准备kafkastormhbase相关jar包。依赖如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

<project xmlns="Http://Maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com</groupId>

  <artifactId>kafkaSpout</artifactId>

  <version>0.0.1-SNAPSHOT</version>

   

    <dependencies>

        <dependency>

            <groupId>org.apache.storm</groupId>

            <artifactId>storm-core</artifactId>

            <version>0.9.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.storm</groupId>

            <artifactId>storm-kafka</artifactId>

            <version>0.9.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.1.1</version>

            <exclusions>

                <exclusion>

                    <groupId>org.apache.ZooKeeper</groupId>

                    <artifactId>zookeeper</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>log4j</groupId>

                    <artifactId>log4j</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>0.99.2</version>

            <exclusions>

                <exclusion>

                    <groupId>org.slf4j</groupId>

                    <artifactId>slf4j-log4j12</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>org.apache.zookeeper</groupId>

                    <artifactId>zookeeper</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

         

       <dependency>

 

         <groupId>com.Google.protobuf</groupId>

 

         <artifactId>protobuf-java</artifactId>

 

         <version>2.5.0</version>

 

        </dependency>

 

        <dependency>

            <groupId>org.apache.curator</groupId>

            <artifactId>curator-framework</artifactId>

            <version>2.5.0</version>

            <exclusions>

                <exclusion>

                    <groupId>log4j</groupId>

                    <artifactId>log4j</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>org.slf4j</groupId>

                    <artifactId>slf4j-log4j12</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

                                                                              

           <dependency>

            <groupId>jdk.tools</groupId>

            <artifactId>jdk.tools</artifactId>

            <version>1.7</version>

            <scope>system</scope>

            <systemPath>C:\Program Files\Java\jdk1.7.0_51\lib\tools.jar</systemPath>

        </dependency>    

         

    </dependencies>

  

    <repositories>

        <repository>

            <id>central</id>

            <url>http://repo1.maven.org/maven2/</url>

            <snapshots>

                <enabled>false</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>clojars</id>

            <url>https://clojars.org/repo/</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>Scala-tools</id>

            <url>http://scala-tools.org/repo-releases</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>conjars</id>

            <url>http://conjars.org/repo/</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

    </repositories>

 

    <build>

        <plugins>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.1</version>

                <configuration>

                    <source>1.6</source>

                    <target>1.6</target>

                    <encoding>UTF-8</encoding>

                    <showDeprecation>true</showDeprecation>

                    <showWarnings>true</showWarnings>

                </configuration>

            </plugin>

            <plugin>

                <artifactId>maven-assembly-plugin</artifactId>

                <configuration>

                    <descriptorRefs>

                        <descriptorRef>jar-with-dependencies</descriptorRef>

                    </descriptorRefs>

                    <arcHive>

                        <manifest>

                            <mainClass></mainClass>

                        </manifest>

                    </archive>

                </configuration>

                <executions>

                    <execution>

                        <id>make-assembly</id>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

</project>

 

(2)kafka发来的数据通过levelSplitbolt进行分割处理,然后再发送到下一个Bolt中。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

package com.kafka.spout;

 

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelSplit extends BaseBasicBolt {

  

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String words = tuple.getString(0).toString();//the cow jumped over the moon

        String []va=words.split(" ");

        for(String word : va)

        {

            collector.emit(new Values(word));

        }

         

    }

    

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }

 

}

(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,然后发送到hbase(Bolt)中。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

package com.kafka.spout;

 

import java.util.HashMap;

import java.util.Map;

import java.util.Map.Entry;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelCount extends BaseBasicBolt {

    Map<String, Integer> counts = new HashMap<String, Integer>();

 

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        // TODO Auto-generated method stub

        String word = tuple.getString(0);

        Integer count = counts.get(word);

        if (count == null)

            count = 0;

        count++;

        counts.put(word, count);

 

        for (Entry<String, Integer> e : counts.entrySet()) {

            //sum += e.getValue();

            System.out.println(e.geTKEy()

                                "----------->" +e.getValue());

        }

        collector.emit(new Values(word, count));     

    }

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        // TODO Auto-generated method stub

         declarer.declare(new Fields("word""count"));

    }

}

(4)准备连接kafkahbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

package com.kafka.spout;

  

import java.util.HashMap;

import java.util.Map;

 

import com.google.common.collect.Maps;

 

//import org.apache.storm.guava.collect.Maps;

  

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;

   

public class StormKafkaTopo {

    public static void main(String[] args) {

                  

        BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd""/storm""kafkaspout");

        Config conf = new Config();  

        spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());   

         

        SimpleHBaseMapper mapper = new SimpleHBaseMapper();

        mapper.withColumnFamily("result");

        mapper.withColumnFields(new Fields("count"));

        mapper.withRowKeyField("word");

         

        Map<String, Object> map = Maps.newTreeMap();

        map.put("hbase.rootdir""hdfs://zeb:9000/hbase");

        map.put("hbase.zookeeper.quorum""zeb:2181,yjd:2181,ylh:2181");

         

        // hbase-bolt

        HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");

 

        conf.setDebug(true);

        conf.put("hbase.conf", map);

          

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout"new KafkaSpout(spoutConfig));

        builder.setBolt("split"new LevelSplit(), 1).shuffleGrouping("spout");

        builder.setBolt("count"new LevelCount(), 1).fieldsGrouping("split"new Fields("word"));

        builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");

         

        if(args != null && args.length > 0) {

            //提交到集群运行

            try {

                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

            catch (AlreadyAliveException e) {

                e.printStackTrace();

            catch (InvalidTopologyException e) {

                e.printStackTrace();

            }

        else {

            //本地模式运行

            LocalCluster cluster = new LocalCluster();

            cluster.submitTopology("Topotest1121", conf, builder.createTopology());

            Utils.sleep(1000000);

            cluster.killTopology("Topotest1121");

            cluster.shutdown();

        }          

    }

}

(5)在kafka端用控制台生产数据,如下:

kafka+storm+hbase如何实现计算WordCount

2、运行结果截图:

 kafka+storm+hbase如何实现计算WordCount

3、遇到的问题:

(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,如下:

 kafka+storm+hbase如何实现计算WordCount

解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。

(2)发生了错误2,如下:

 kafka+storm+hbase如何实现计算WordCount

解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。

感谢你能够认真阅读完这篇文章,希望小编分享的“kafka+storm+hbase如何实现计算WordCount”这篇文章对大家有帮助,同时也希望大家多多支持编程网,关注编程网精选频道,更多相关知识等着你来学习!

--结束END--

本文标题: kafka+storm+hbase如何实现计算WordCount

本文链接: https://www.lsjlt.com/news/236696.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • kafka+storm+hbase如何实现计算WordCount
    这篇文章主要介绍了kafka+storm+hbase如何实现计算WordCount,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。kafka+storm+hbase实现计算Wo...
    99+
    2023-06-04
  • JS如何实现计算器
    这篇文章将为大家详细讲解有关JS如何实现计算器,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。代码:<!DOCTYPE html> <html&...
    99+
    2022-10-19
  • AndroidStudio如何实现BMI计算
    这篇文章将为大家详细讲解有关AndroidStudio如何实现BMI计算,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。xml代码```xml<TextView   &n...
    99+
    2023-06-14
  • JavaScript如何实现计算器
    这篇文章主要为大家展示了“JavaScript如何实现计算器”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“JavaScript如何实现计算器”这篇文章吧。一、实例代码HTML:<!DOCT...
    99+
    2023-06-26
  • mysql如何实现累加计算
    小编这次要给大家分享的是mysql如何实现累加计算,文章内容丰富,感兴趣的小伙伴可以来了解一下,希望大家阅读完这篇文章之后能够有所收获。前言接了一个需求,产品想分析一下用户增长的曲线。也就是某个时间段的每日...
    99+
    2022-10-18
  • AngularJs+Bootstrap如何实现计算器
    这篇文章主要为大家展示了“AngularJs+Bootstrap如何实现计算器”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“AngularJs+Bootstra...
    99+
    2022-10-19
  • javascript如何实现次方计算
    这篇文章主要讲解了“javascript如何实现次方计算”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“javascript如何实现次方计算”吧! ...
    99+
    2022-10-19
  • Python如何实现GUI计算器
    本文小编为大家详细介绍“Python如何实现GUI计算器”,内容详细,步骤清晰,细节处理妥当,希望这篇“Python如何实现GUI计算器”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。效果可执行正常加减乘除相关运算...
    99+
    2023-07-04
  • vue如何实现计算属性
    本文小编为大家详细介绍“vue如何实现计算属性”,内容详细,步骤清晰,细节处理妥当,希望这篇“vue如何实现计算属性”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。什么是计算属性模板内的表达式非常便利,但是设计它们...
    99+
    2023-07-04
  • java8 如何实现分组计算数量和计算总数
    java8分组计算数量和计算总数 package com.pig4cloud.pigx.admin.api.vo; import lombok.Builder; import l...
    99+
    2022-11-12
  • Python编程算法:如何实现并行计算?
    在计算机科学领域中,计算机的速度一直是一个瓶颈。为了克服这个瓶颈,现代计算机通常采用并行计算方法。并行计算是指通过同时执行多个计算任务来提高计算机的效率。 Python作为一种高级编程语言,也可以实现并行计算。在本篇文章中,我们将探讨如何...
    99+
    2023-06-27
    编程算法 开发技术 git
  • java如何实现科学计算器
    这篇文章主要介绍了java如何实现科学计算器的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇java如何实现科学计算器文章都会有所收获,下面我们一起来看看吧。实现思路通过点击按钮可以得到一个算术表达式,并且它是一...
    99+
    2023-07-02
  • HTML如何实现简单计算器
    本篇内容介绍了“HTML如何实现简单计算器”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!代码如下:   ...
    99+
    2022-10-19
  • vbs如何实现计算机重启
    这篇文章主要为大家展示了“vbs如何实现计算机重启”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“vbs如何实现计算机重启”这篇文章吧。Dim return Set ...
    99+
    2023-06-08
  • IDEA Debug如何实现求值计算
    这篇文章将为大家详细讲解有关IDEA Debug如何实现求值计算,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。求值计算求值计算功能在我们debug过程中,可以很方便的查看程序中所有变量的值,也可以临时修改...
    99+
    2023-06-27
  • Android如何实现房贷计算器
    今天小编给大家分享一下Android如何实现房贷计算器的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。fangdai(acti...
    99+
    2023-06-30
  • js如何实现网页计算器
    小编给大家分享一下js如何实现网页计算器,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!如何在利用HTML,css和js的知识制作一个简单的网页计算器呢?一个计算机...
    99+
    2023-06-15
  • javascript如何实现计算器功能
    这篇文章给大家分享的是有关javascript如何实现计算器功能的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1、计算器功能介绍可以实现数据的加(+),减(-),乘(*),除(/),取余运算(%),以及实现数据的...
    99+
    2023-06-25
  • Java如何实现分布式实时计算?
    随着互联网时代的到来,数据量呈现爆炸性增长,如何高效地处理这些数据成为了每个企业必须面对的问题。分布式计算是一种解决大规模数据处理的有效方法。本文将介绍Java如何实现分布式实时计算,并且通过演示代码,让读者更好地理解。 一、分布式实时计...
    99+
    2023-10-17
    numy 分布式 实时
  • 计算机之间如何实现通信
    这篇文章给大家介绍计算机之间如何实现通信,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。在计算机网络中,计算机之间的通信是通过网络协议实现的。什么是网络协议?网络协议是为计算机网络中进行数据交换而建立的规则、标准或约定的...
    99+
    2023-06-14
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作