注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

being23

写给未来的自己

 
 
 

日志

 
 
关于我

真正的坚定,就是找到力量去做自己喜欢的事情,并为之努力,这样才会觉得生活是幸福的。

网易考拉推荐

部署ELK遇到的若干问题备忘  

2016-03-13 11:39:38|  分类: 默认分类 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

最近部署了一套ELK,用于生产环境中实时检索上报的信息。期间遇到的一些问题,这里纪录下来。

  1. ES使用的当前版本(V2.2.0),启动的时候报错java.lang.UnsupportedClassVersionError: org/elasticsearch/bootstrap/Elasticsearch : Unsupported major.minor version 51.0,需要升级所在主机的JDK
  2. ES V2.2.0版的配置文件较之前的V1.1.1版配置文件还是有些区别的,部署过程中遇到一些问题,例如,node无法加入集群,需要指定配置项network.host,node之间无法通过多播发现,需要指定配置项discovery.zen.ping.unicast.hosts
  3. 使用Flume的ElasticsearchSink将数据写入ES,遇到错误org.elasticsearch.common.transport.InetSocketTransportAddress,当前版本的Flume不支持ES,降级到V1.1.1版本
  4. ElasticsearchSink在写数据到ES过程中,采用的批量操作,导致日志中的多条记录作为一个document索引到ES中,不利于通过Kibana查询,改为通过logstash的kafka input插件从Kafka中获取数据
  5. 使用的Logstash版本是V1.4.2,没有集成Kafka插件,在手动安装的过程中,因为网络原因(获取jruby-kafka时总是出现Unable to download data from https://rubygems.org/ - Connection reset by peer),无法完成。使用最新版本的Logstash(2.2.2)解决,已经自带kafka插件
  6. 在测试过程中,需要频繁的从头开始读取Kafka中的数据,需要同时设置配置项auto\_offset\_reset => smallestreset_beginning => true,否则无法消费Kafka中的数据
  7. 消费到的数据是json字符串,大致如下:

    {
    "timestamp": "****",
    "host": "",
    "body": [
    "f1\tf2\tf3\nf4\tf5\tf6\n"
    ]
    }

    Kafka Input插件默认的解码器是json,由于消费的数据中包含用于分割的字符\t,json解析的时候报错Illegal unquoted character ((CTRL-CHAR, code 9)): has to be escaped using backslash to be included in string value。最初是将解码器改成plain,通过过滤器mutate对字符做转义,再使用过滤器json

    mutate {
    gsub => ["message", "\t", "\\t","message", "\n", "\\n"]
    }
    json {
    source => "message"
    }
  8. 使用split过滤器有一个问题——当传递的array中只有一个元素时,输出类型是array;有多个元素时,输出类型是string(不清楚是使用不当,还是自身bug)。logstash提供了一个支持自定义代码的过滤器ruby,在代码逻辑中实现对类型的判断,同时使用ruby的Hash类代替过滤器grok获取到每个field。

    ruby {
    init => '@fieldName=["F1","F2","F3"]
    def filter(event, &block)
    if event["body"].is_a?(Array)
    event["body"].each do |log|
    log.split("\n").each do |line|
    e = LogStash::Event.new(Hash[@fieldName.zip(line.split("\t"))])
    e["type"] = "demo"
    yield e
    end
    end
    event.cancel
    end'
    code => "filter(event)"
    }
  9. json解析失败会导致pipeline挂起,从Kafka消费到的数据会堆积在内存中,最终导致OOM。改进的措施是,从input插件kafka中获取数据时使用解码器line,将数据按照\n先行切分,这么做的好处一方面简化了配置,另一面避免了解析json。之后还是通过ruby过滤器获取每个field。

    ruby {
    init => '@fieldName=["F1","F2","F3"]
    idx = event["message"].index("[\"")
    if idx!=nil
    idx=idx+2
    len=event["message"].length()
    event["message"]=event["message"][idx..len]
    end
    e = LogStash::Event.new(Hash[@fieldName.zip(event["message"].split("\t"))])
    e["type"] = "demo"
    yield e
    event.cancel
    end'
    code => "filter(event)"
    }
  10. 由于每个过滤器ruby的自定义代码部分,除了每个field对应的name不同,剩下的代码都是一样的,所以将这部分公共的代码放到了自定义插件kafka中,field名字作为配置项写到配置文件中,最终的配置如下:

    filter {
    kafka {
    keys => ["F1","F2","F3"]
    }
    }
20160313@前海花园
  评论这张
 
阅读(143)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017