注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

being23

写给未来的自己

 
 
 

日志

 
 
关于我

真正的坚定,就是找到力量去做自己喜欢的事情,并为之努力,这样才会觉得生活是幸福的。

网易考拉推荐

kafka 协议阅读笔记  

2014-04-11 19:08:25|  分类: 默认分类 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

最近看kafka c++客户端librdkafka源码,看到消息生产部分,了解了下kafka的协议设计,A Guide To The Kafka Protocol,做了些笔记。


overiew

  1. Metadata —— 描述当前可用brokers,它们的host和port信息,以及broker持有的partions
  2. Send —— 向broker发送消息
  3. Fetch —— 从broker获取消息,数据、集群元数据和topic offset信息
  4. Offsets —— 获取指定topic partition可用offset信息
  5. Offset Commit —— 确认一个consumer group的offset集合
  6. Offset Fetch —— 获取一个consumer group的offset集合

Preliminaries

网络

kafka协议是TCP上的二进制协议。请求响应对。大小指定。链接建立和断开不要求握手机制。客户端需要维持到多个broker的链接,因为数据分区放在了不同的broker上。某个客户端没必要维持到broker的多个连接。

server保证单个TCP链接上的请求响应是有序的。

request是大小受限的,超过限制的请求会导致socket断开。

分区与bootstrapping

topic被分到数目预先设置的partition,每个partition根据复制系数N进行复制。topic partition 以 0,1,...,P 编号。

客户端控制数据发送到哪个partition。

数据发送和获取请求必须发给作为指定partition leader的broker。

所有的kafka broker都能响应metadata请求:有哪些主题,这些主题有哪些分区,这些分区的leader broker是谁以及这些broker的host 和port信息是什么。

客户端没必要轮询集群是否有变化;一直获取后缓存元数据直到发生错误暗示元数据过期:1)无法跟指定broker通信的socket错误;2)某个请求响应错误代码暗示当前borker不再持有请求数据的分区

  1. 遍历"bootstrap" kafka url 列表,直到找到可以连接的。获取集群metadata
  2. 处理 fetch或者produce请求,根据发送或者获取的主题分区访问相应的broker
  3. 如果发生错误,刷新元数据并重试

librdkafka 还是会定时刷新元数据

分区策略

kafka中进行数据分区有两个作用:

  1. 在brokers中间平衡数据和请求负载
  2. It serves as a way to divvy up processing among consumer processes while allowing local state and preserving order within the partition. We call this semantic partitioning. 语义分区

在所有的broker中间对请求做round robin处理。如果producer数超过broker数,客户端随机选择partition。这种策略会带来更少的TCP链接。(为什么?)

语义分区就是根据消息里的某个key来确定消息发送到哪个partition。

批处理

消息批处理可以跨越多个topic和partition,也就是说一个produce请求中可能包含发给多个partition的数据;一个fetch请求可能从多个partition中获取数据。

版本与兼容性

kafka协议是向后兼容的。 Our versioning is on a per-api basis, each version consisting of a request and response pair. (每个API都有版本?)每个请求包含API key用于指明使用的API,以及版本号指明请求格式和期望的响应格式。

server会拒绝掉协议不支持的请求,响应是严格按照请求中所暗示的。

协议

协议基本类型

固定宽度
int8, int16, int32, int64 —— 指定宽度的有符号数,以big endian order存储

可变长度
bytes, string —— 这种类型由表示长度N的有符号整数以及N个字节的内容组成。长度-1表示null。string的长度类型是int16,bytes的长度类型是int32。

数组
用于处理重复的结构。包含长度N,后续N个重复的structure,这些structure可以由其他的基本类型构成。

common request and response structure

RequestOrResponse => Size (RequestMessage | ResponseMessage)
    Size => int32
描述
MessageSize给出后续请求或者响应消息的大小,以字节为单位。
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
    ApiKey => int16
    ApiVersion => int16
    CorrelationId => int32
    ClientId => string
    RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
描述
ApiKey数值id表明正被调用的API(例如,元数据请求,produce请求,fetch请求等)
ApiVersionapi的数值version number。server根据版本号作出相应格式的回复
CorrelationId用户提供的整数,server原样返回,用于在client和server之间匹配消息
ClientId用户提供的客户端标识
Response => CorrelationId ResponseMessage
    CorrelationId => int32
    ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse

messageset 是带有offset和size信息的消息序列。This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.message set 还是kafka中消息压缩的单位。MessageSets 跟其他数组元素不一样的地方在于开头不是表示长度的size域。

MessageSet => [Offset MessageSize Message]
    Offset => int64
    MessageSize => int32
    Message => Crc MagicByte Attributes Key Value
        Crc => int32
        MagicByte => int8
        Attributes => int8
        Key => bytes
        Value => bytes
描述
Offset在kafka中是作为 log sequence number。当producer发送消息的时候,它并不知道offset,可以填充任何值
Crc后续的消息bytes的CRC32,用于校验broker和consumer中消息的完整性
MagicByte用于向后兼容的version id
Attributes存放消息的元数据属性的字节。最低两位表示消息的压缩编码。其他置零。
Value实际的消息内容,以opaque byte 数组的形式。
Key可选的message key用于partition。可以为null

kafaka压缩的不是单条message而是message set。

CompressionCodec
None0
GZIP1
Snappy2

The APIS

Metadata API

  1. 存在哪些topic
  2. 每个topic有多少partition
  3. 每个partition的leader是那个broker
  4. 每个broker的host和port 是什么

这是唯一一个可以发给集群中的任意broker的请求。

client可以只请求部分topic的元数据。

返回的元数据是partition级别的,按topic分组。对于每个partition,元数据包含 leader信息,所有的replicas以及当前处于 in-sync 状态的replicas列表。

MetadataRequest => [TopicName]
      TopicName => string
描述
TopicName请求指定topic的元数据。如果为空产生所topic的元数据
MetadataResponse => [Broker][TopicMetadata]
    Broker => NodeId Host Port
        NodeId => int32
        Host => string
        Port => int32
    TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
        TopicErrorCode => int16
        PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
            PartitionErrorCode => int16
            PartitionId => int32
            Leader => int32
            Replicas => [int32]
            Isr => [int32]
描述
Leader作为某个partition leader的broker的node id。如果在leader选举中,id = -1
Replicas作为当前partition leader 的slave的 node 集合
Isrcaught up leader 的 replicas 子集合
Brokerkafka broker的nodeid、hostname和port
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
    RequiredAcks => int16
    Timeout => int32
    Partition => int32
    MessageSetSize => int32
描述
RequriedAcks这个域表明server在给请求相应之前要接收到多少个确认。如果是0,server不会发送任何响应(这是唯一一种server不会响应请求的情形,如果是1,server会等到数据写到本地再发送响应。如果是-1,server会阻塞直到消息被所有的sync replicas确认。对于任何大于1的情形,server将会阻塞直到相应数目的确认产生(不过server不会等待超过 in-sync replicas 数目的确认)
Timeoutserver等待RequireAcks中指定数目acknowledge超时时间,以毫秒为单位。这个超时时间不是针对请求时间:1)不包括网络延时,2)计时器在请求开始处理时启动,这样一来如果server负载高导致很多请求排队,那么等待时间时不会计算在内,3)we will not terminate a local write so if the local write time exceeds this timeout it will not be respected。要得到严格的超时时间,使用socket timeout
TopicName数据发送的主题
Partition数据发送的分区
MessageSetSize后续message set的大小,以字节为单位
MessageSet标准格式的消息集合
ProduceResponse => [TopicName [Partition ErrorCode Offset]]
    TopicName => string
    Partition => int32
    ErrorCode => int16
    Offset => int64
描述
Topic响应相应的主题
Partition响应相应的分区
ErrorCode来自分区的错误,如果有的话。Error是跟分区对应的,这是因为指定的分区不可用或者在不同的host上,但是其他的分区可能成功接受到produce 请求
Offset赋给追加到这个partition的message set的第一条消息的offset

Constants

Api Keys

API nameApiKey Value
ProduceRequest0
FetchRequest1
OffsetRequest2
MetadataRequest3
LeaderAndIsrRequest4
StopReplicaRequest5
OffsetCommitRequest8
OffsetFetchRequest9
20140411@迈科龙

  评论这张
 
阅读(441)| 评论(0)
推荐 转载

历史上的今天

在LOFTER的更多文章

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017