注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

being23

写给未来的自己

 
 
 

日志

 
 
关于我

真正的坚定,就是找到力量去做自己喜欢的事情,并为之努力,这样才会觉得生活是幸福的。

网易考拉推荐

chap07  

2013-07-20 12:01:05|  分类: work@oppo |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

Using Non-JVM Languages with Storm

有时由于更熟悉其他的语言或者使用了其他语言的库,就要使用不是基于JVM的语言来实现storm项目。

storm由Java实现,本书中所有的spout和bolt都是由java实现。那么可以使用其他语言,像python,ruby或者javascript来写spout和bolt吗?答案是肯定的! multilang protocl(多语言协议)使这一切变得可能。

多语言协议是storm中实现的一种特俗协议,它使用标准输入和标准输出作为与执行spout和bolt任务的进程之间通信的信道。消息以json格式或者普通的文本行通过信道传输。

下面看一个使用非JVM语言实现spout和bolt的例子。spout负责生成1到10000的数,bolt过滤出素数,两者均使用php实现。

在这个例子中,素数检测以一种普通方式实现。有其他更好的实现,但是会复杂一些,而且超出了本例的范围。

用于storm的PHP DSL有一个官方实现。在本章中,作为例子给出自己的实现。首先,定义拓扑。

...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
builder.setBolt("prime-numbers-filter", new
  PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
StormTopology topology = builder.createTopology();
...

可以使用非JVM语言指定拓扑。由于storm的拓扑只是thrift结构,加上nimbus是个thrift守护进程,可以以任何语言创建并提交拓扑。但是这超出了本书的范围。

这里没有什么新东东。下面给出NumbersGeneratorSpout的实现。

 public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
    public NumberGeneratorSpout(Integer from, Integer to) {
        super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to
            .toString());
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("number"));
    }
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

也许你已经注意到了,这个spout继承自ShellSpout。这是storm自带的一个特殊类,用于运行和控制使用其他语言实现的spout。在这里,它告诉storm如何执行php脚本。

NumberGeneratorSpout php 脚本向标准输出发射tuples,并从标准输入读取消息来完成ack或者fail。

在给出 NumberGeneratorSpout.php 脚本的实现之前,先看下多语言协议工作的更多细节。

spout生成从传给构造函数的参数from到参数to的连续数值。

接下来,看下PrimeNumbersFilterBolt。这个类实现了前面提到的脚本。它告诉storm如何执行php脚本。storm提供了一个特殊的称为ShellBolt的类用来实现这个目的,其中需要做的事情只是说明如何运行脚本以及声明发射域。

 public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
    public PrimeNumbersFilterBolt() {
        super("php", "-f", "PrimeNumbersFilterBolt.php");
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("number"));
    }
}

在构造函数中,只需告诉storm如何运行php脚本。这等价于下面的命令:

 php -f PrimeNumbersFilterBolt.php

PrimeNumbersFilterBolt php脚本从标准输入读取消息,处理,发射到标注输出,确认或者失败。在看到 PrimeNumbersFilterBolt.php脚本的实现之前,再看看多语言协议的实现细节吧。

The Multilang Protocol Specification

该协议依赖于作为进程之间通信信道的标准输入和标准输出。一个脚本想要生效需要采取下列步骤

  • 初始化握手
  • 开始循环
  • 读、写tuple

使用storm内建的日志机制有一种特殊的方式可以在自己的脚本中记录日志,所以不必实现自己的日志系统。

下面每个步骤做进一步的解释,以及如何使用php脚本来实现。

INITIAL HANDSHAKE

要控制进程(启动或者停止),storm需要知道的正在执行脚本的进程ID(PID)。根据多语言协议,当处理过程开始的第一件事情就是storm会发射一个带有配置,拓扑上下文和PID目录的Json对象到标准输入。它看上去跟下面的代码块差不多:

 {
    "conf": {
        "topology.message.timeout.secs": 3,
// etc
},
"context": {
    "task->component": {
        "1": "example-spout",
        "2": "__acker",
        "3": "example-bolt"
    },
    "taskid": 3
},
"pidDir": "..."
}

进程必须在pidDir指示的目录创建一个空文件,文件名为进程ID,然后将PID作为JSON对象写到标准输出。

 {"pid": 1234}

例如,如果接收到/tmp/example\n,脚本的执行PID为123,那么创建空文件/tmp/example/123,并打印行{"pid":123}nend\n到标准输出。storm采用这种方式来跟踪PID,以及在关闭的时候杀死进程。下面给使用PHP如何实现:

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

这里已经实现了函数read_msg,用于从标准输入读取消息。多语言协议中消息是json格式的单行或多行文本。当storm发送单行内容为end\n消息时说明消息结束。

function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}
function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}
function write_line($line) {
    echo("$line\n");
}

使用flush()是非常关键的,有可能由于指定字符数量未满足导致缓冲区的内容不会flush。这意味脚本会挂起等待storm输入,由于storm同样在等待脚本输出,所以脚本是不会接收到输入的。所以一定要保证当脚本输出内容后立即flush。

开启循环与消息读写

这是最关键的一步,因为所有的工作都在这里完成。这一步的实现取决于要实现spout还是bolt。

在spout的情形中,应该发送消息。在bolt的情形中,循环,读取消息,处理它们,然后发射,确认或者失败。

发送数字的spout的实现如下。

$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
    $msg = read_msg();
    $cmd = json_decode($msg, true);
    if ($cmd['command']=='next') {
        if ($from<$to) {
            storm_emit(array("$from"));
            $task_ids = read_msg();
            $from++;
        } else {
            sleep(1);
        }
    }
    storm_sync();
}

从命令行参数中获得fromto,开始循环。每当从storm取得消息next,就说明可以发射新tuple。

一旦所有的tuple发送完成,没有更多的tuple发送,挂起。

要保证脚本准备好下一个tuple,storm在发射下一个之前会等待文本行sync\n。要读取命令,直接调用read_msg()来解码。

在bolts的场合中,有些小区别。

while(true) {
    $msg = read_msg();
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
    if (!empty($tuple["id"])) {
        if (isPrime($tuple["tuple"][0])) {
            storm_emit(array($tuple["tuple"][0]));
        }
        storm_ack($tuple["id"]);
    }
}

循环,从标准输入读取消息。一旦接收到消息,json解析。如果是一个tuple,对它做处理,即判断是否是素数。

如果是素数,将该数发射出去;否则直接忽略。在任何情形中,都要消息确认。

在函数json_decode中使用JSON_BIGINT_AS_STRING来规避Java和PHP之间的格式问题。Java中发送的大数,在PHP中获取时会丢失精度,而这可能会导致问题。要解决这个问题,告诉PHP将大数当作字符串处理,在json消息中打印时不使用双引号。PHP 5.4.0 或更高版本必须使用这个参数。

消息emitackfaillog结构如下:

Emit

{
    "command": "emit",
    "tuple": ["foo", "bar"]
}

其中,数组中是消息值。

Ack

{
    "command": "ack",
    "id": 123456789
}

其中,id是正处理消息的id。

Fail

{
    "command": "fail",
    "id": 123456789
}

跟fail一样,id是正处理消息的id。

Log

{
    "command": "log",
    "msg": "some message to be logged by storm."
}

将这些放到一起,就得到了下面的PHP脚本。

spout部分:

<?php
function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if ($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}
function write_line($line) {
    echo("$line\n");
}
function storm_emit($tuple) {
    $msg = array("command" => "emit", "tuple" => $tuple);
    storm_send($msg);
}
function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}
function storm_sync() {
    storm_send(array("command" => "sync"));
}
function storm_log($msg) {
    $msg = array("command" => "log", "msg" => $msg);
    storm_send($msg);
    flush();
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
    $msg = read_msg();
    $cmd = json_decode($msg, true);
    if ($cmd['command']=='next') {
        if ($from<$to) {
            storm_emit(array("$from"));
            $task_ids = read_msg();
            $from++;
        } else {
            sleep(1);
        }
    }
    storm_sync();
}
?>

bolt部分:

<?php
function isPrime($number) {
    if ($number < 2) {
        return false;
    }
    if ($number==2) {
        return true;
    }
    for ($i=2; $i<=$number-1; $i++) {
        if ($number % $i == 0) {
            return false;
        }
    }
    return true;
}
function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if ($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}
function write_line($line) {
    echo("$line\n");
}
function storm_emit($tuple) {
    $msg = array("command" => "emit", "tuple" => $tuple);
    storm_send($msg);
}
function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}
function storm_ack($id) {
    storm_send(["command"=>"ack", "id"=>"$id"]);
}
function storm_log($msg) {
    $msg = array("command" => "log", "msg" => "$msg");
    storm_send($msg);
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
while(true) {
    $msg = read_msg();
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
    if (!empty($tuple["id"])) {
        if (isPrime($tuple["tuple"][0])) {
            storm_emit(array($tuple["tuple"][0]));
        }
        storm_ack($tuple["id"]);
    }
}
?>

要将脚本放到工程目录下的目录multilang/resources。这个文件夹会包含到发送到workers的jar文件中。如果脚本没有放到这个文件夹,storm不会运行脚本,并报错。

  评论这张
 
阅读(327)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017