最近开始调研metamorphosis,官方文档很是详尽。于是边学习边做笔记。
基本概念
消息生产者
也称为Message Producer,一般简称为producer,负责产生消息并发送消息到meta服务器。
消息消费者
也称为Message Consumer,一般简称为consumer,负责消息的消费,meta采用pull模型,由消费者主动从meta服务器拉取数据并解析成消息并消费。
Topic
消息的主题,由用户定义并在服务端配置。producer发送消息到某个topic下,consumer从某个topic下消费消息。
分区(partition)
同一个topic下面还分为多个分区,如meta-test这个topic我们可以分为10个分区,分别有两台服务器提供,那么可能每台服务器提供5个分区,假设服务器分别为0和1,则所有分区为0-0、0-1、0-2、0-3、0-4、1-0、1-1、1-2、1-3、1-4。分区跟消费者的负载均衡机制相关。
Message
消息,负载用户数据并在生产者、服务端和消费者之间传输。
Broker
就是meta的服务端或者说服务器,在消息中间件中也通常称为broker。
消费者分组(Group)
消费者可以是多个消费者共同消费一个topic下的消息,每个消费者消费部分消息。这些消费者就组成一个分组,拥有同一个分组名称,通常也称为消费者集群
Offset
消息在broker上的每个分区都是组织成一个文件列表,消费者拉取数据需要知道数据在文件中的偏移量,这个偏移量就是所谓offset。Offset是绝对偏移量,服务器会将offset转化为具体文件的相对偏移量。
可靠性
Metamorphosis的可靠性保证贯穿客户端和服务器。
生产者的可靠性保证
消息生产者发送消息后返回SendResult,如果isSuccess返回为true,则表示消息已经确认发送到服务器并被服务器接收存储。整个发送过程是一个同步的过程。保证消息送达服务器并返回结果。
服务器的可靠性保证
消息生产者发送的消息,meta服务器收到后在做必要的校验和检查之后的第一件事就是写入磁盘,写入成功之后返回应答给生产者。因此,可以确认每条发送结果为成功的消息服务器都是写入磁盘的。
写入磁盘,不意味着数据落到磁盘设备上,毕竟我们还隔着一层os,os对写有缓冲。Meta有两个特性来保证数据落到磁盘上
- 每1000条(可配置),即强制调用一次force来写入磁盘设备。
- 每隔10秒(可配置),强制调用一次force来写入磁盘设备。
因此,Meta通过配置可保证在异常情况下(如磁盘掉电)10秒内最多丢失1000条消息。当然通过参数调整你甚至可以在掉电情况下不丢失任何消息。
服务器通常组织为一个集群,一条从生产者过来的消息可能按照路由规则存储到集群中的某台机器。Meta已经实现高可用的HA方案,类似mysql的同步和异步复制,将一台meta服务器的数据完整复制到另一台slave服务器,并且slave服务器还提供消费功能(同步复制不提供消费)。
消费者的可靠性保证
消息的消费者是一条接着一条地消费消息,只有在成功消费一条消息后才会接着消费下一条。如果在消费某条消息失败(如异常),则会尝试重试消费这条消息(默认最大5次),超过最大次数后仍然无法消费,则将消息存储在消费者的本地磁盘,由后台线程继续做重试。而主线程继续往后走,消费后续的消息。因此,只有在MessageListener确认成功消费一条消息后,meta的消费者才会继续消费另一条消息。由此来保证消息的可靠消费。
消费者的另一个可靠性的关键点是offset的存储,也就是拉取数据的偏移量。我们目前提供了以下几种存储方案
- zookeeper,默认存储在zoopkeeper上,zookeeper通过集群来保证数据的安全性。
- mysql,可以连接到您使用的mysql数据库,只要建立一张特定的表来存储。完全由数据库来保证数据的可靠性。
- file,文件存储,将offset信息存储在消费者的本地文件中,适合消费者分组只有一个消费者的情况,无需共享offset信息。
Offset会定期保存,并且在每次重新负载均衡前都会强制保存一次。
顺序
很多人关心的消息顺序,希望消费者消费消息的顺序跟消息的发送顺序是一致的。比如,我发送消息的顺序是A、B、C,那么消费者消费的顺序也应该是A、B、C。乱序对某些应用可能是无法接受的。
Metamorphosis对消息顺序性的保证是有限制的,默认情况下,消息的顺序以谁先达到服务器并写入磁盘,则谁就在先的原则处理。并且,发往同一个分区的消息保证按照写入磁盘的顺序让消费者消费,这是因为消费者针对每个分区都是按照从前到后递增offset的顺序拉取消息。
Meta可以保证,在单线程内使用该producer发送的消息按照发送的顺序达到服务器并存储,并按照相同顺序被消费者消费,前提是这些消息发往同一台服务器的同一个分区。为了实现这一点,你还需要实现自己的PartitionSelector用于固定选择分区
public interface PartitionSelector {
public Partition getPartition(String topic, List<Partition> partitions, Message message) throws MetaClientException;
}
选择分区可以按照一定的业务逻辑来选择,如根据业务id来取模。或者如果是传输文件,可以固定选择第n个分区使用。当然,如果传输文件,通常我们会建议你只配置一个分区,那也就无需选择了。
消息的顺序发送可以使用这个OrderedMessageProducer,自定义管理分区信息,并提供故障情况下的本地存储功能。
消息重复
消息的重复包含两个方面,生产者重复发送消息以及消费者重复消费消息。
针对生产者来说,有可能发生这种情况,生产者发送消息,等待服务器应答,这个时候发生网络故障,服务器实际已经将消息写入成功,但是由于网络故障没有返回应答。那么生产者会认为发送失败,则再次发送同一条消息,如果发送成功,则服务器实际存储两条相同的消息。这种由故障引起的重复,meta是无法避免的,因为meta不判断消息的data是否一致,因为它并不理解data的语义,而仅仅是作为载荷来传输。
针对消费者来说也有这个问题,消费者成功消费一条消息,但是此时断电,没有及时将前进后的offset存储起来,则下次启动的时候或者其他同个分组的消费者owner到这个分区的时候,会重复消费该条消息。这种情况meta也无法完全避免。
Meta对消息重复的保证只能说在正常情况下保证不重复,异常情况无法保证,这些限制是由远程调用的语义引起的,要做到完全不重复的代价很高,meta暂时不会考虑。