注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

being23

写给未来的自己

 
 
 

日志

 
 
关于我

真正的坚定,就是找到力量去做自己喜欢的事情,并为之努力,这样才会觉得生活是幸福的。

网易考拉推荐

chap05  

2013-07-17 21:33:11|  分类: work@oppo |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

Bolts

如你所见,bolts是storm集群中的关键组件。本章介绍bolt的生命周期,设计策略,以及如何实现的例子。


Bolt Lifecycle

bolt是接收tuple作为输入,生成tuple作为输出的组件。在实现bolt时,通常指实现IRichBolt接口。Bolts在客户端创建,序列化到拓扑中,然后提交到集群的master。集群启动workers,它们反序列化bolt,调用方法prepare,然后开始处理tuple。

要自定义bolt,在构造函数中设定参数,并保存到实例变量中,这样在提交到集群的时候就会序列化。

Bolt Structure

Bolt有下列方法:

declareOutputFileds(OutputFiledsDeclarer declarer) 声明该bolt的输出样式。

prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector) 在bolt开始处理tuple前调用

execute(Tuple input) 处理单个的输入tuple

cleanup() 关闭bolt时调用

看一个用于将句子分割成单词的bolt的实例:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }
    public void cleanup() {
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

这个bolt十分直接。值得注意的是,这个例子中没有消息保证。这意味着如果bolt由于某些原因丢弃了消息——或者因为bolt挂掉,或者因为根据程序中的设定丢弃——不会通知产生消息的spout,bolts和spouts之间的其他组件也不会。

在多数场景中,需要在整个拓扑中保证消息的处理。

Reliable versus Unreliable Bolts

如前面章节提到的,storm保证spout产生的消息都能被所有的bolts处理。这是设计上的考虑,即有你决定bolt是否保证处理消息。

拓扑就是一棵节点树, 其中消息(tuple)沿着一个或者多个分支流动。每个节点ack(tuple)或者fail(tuple),这样storm就知道什么时候消息处理失败,并通知产生消息的spout或者spouts。由于storm拓扑运行在高度并行化的环境中,跟踪原始spout实例的最好方式就是在消息里保存一个关于原始spout的引用。这种技术称为锚定Anchoring)。对上面的SplitSentence做些修改,就能保证消息处理。

class SplitSentence implements IRichBolt {
    private OutputCollector collector;
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }
    public void cleanup() {
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

锚定发生在collector.emit()所在的那一行。如之前提到的,传递参数tuple使得storm能跟踪原始的spout。collector.ack(tuple)collector.fail(tuple)通知spout每个消息发生了什么。当消息树上的每个消息都被处理了,storm就认为由spout发出的消息完整处理了。当消息树在设定的超时时间内没有被完全处理,就认为tuple处理失败。默认超时时间是30s。

可以通过设置拓扑的配置Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来修改超时时间

当然了,spout需要考虑消息处理失败的情形,重发或者丢弃。

每个tuple需要确定处理成功或者失败。Storm在内存中跟踪每个tuple,如果不对消息处理结果做处理,将撑爆内存。

Multiple Streams

方法emit(streamId, tuple)使得bolt能够发射突破了到多个数据流,其中streamId是表示流的字符串。然后,在TopologyBuilder中,可以决定接收哪个数据流。

Multiple Anchoring

要使用bolt聚合数据流,需要在内存中缓冲tuples。为了确保此种情形下的消息处理,需要把数据流锚定到不止一个tuple上。这可以通过调用方法emit,并传递一个tuple的List参数。

...
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, values);
...

在这种方式中,每当bolt确认或者失败,都会通知消息树,同时由于数据流锚定到多个tuple上,所有相关的spout都会被通知到。

Using IBasicBolt to Ack Automaticall

也许你已经注意到了,在多数场景中需要消息确认。storm提供了另一个叫做IBasicBolt的接口,让这些变得更容易,该接口封装了在execute方法之后调用ack的实现。该接口的一个实现,BaseBasicBolt,就可以用来自动完成确认。

class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

发送到BasicOutputCollector的tuples会自动锚定到输入tuple上(?)

  评论这张
 
阅读(357)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017