Meta的client实现分析
由于meta不像activeMQ等产品,它们的broker端承载了非常多的功能,而像meta这样追求性能为目的的消息中间件,则是把broker端的功能弱化,同时加强了client端的某些功能,如当前client的消息offset的存储、从broker中pull消息等。
下面我们从消息pull这样一个client端最重要的功能作为分析的主线来了解meta中client的实现。
下面是client端执行pull消息的处理流程示意图:
1.通过ZKLoadRebalanceListener的rebalance方法,根据该client所订阅topic的分区数量来初始化对应数量的FetchRequest实例,并把它们放到FetchRequestQueue中(先进先出)。
2.消息抓取管理器初始化fetchRunners线程池,并启动所有线程对FetchRequestQueue进行读请求的操作。
3.当FetchRequestQueue中有请求时,则执行FetchRequestRunner线程中的processRequest方法,进行后续的操作。
4.通过SimpleMessageConsumer(消息消费者基类)的fetch方法从broker端获取某topic的某分区的消息数据byte组。
5.对broker端返回的消息数据byte组进行操作,解析出一条条消息,并对这些消息进行消费,具体代码实现在FetchRequestRunner类的notifyListener方法,它是消息消费的核心方法,后面我们会重点介绍。
6.FetchRequestRunner处理完一次从broker获取消息并消费的过程后,会把FetchRequest实例重新放回FetchRequestQueue中,重复进行下一轮操作。
上面是client端pull消息的主过程,由于meta的client涉及的功能也较多,为了更进一步了解client端的实现细节,我们从下面几个方面做更进一步的分析
pull模型的轮训时间
很多用户在一看到消息的获取是通过client端主动pull的方式,就感觉和activeMQ等其他消息中间件所采用的broker主动推送消息到client的方式相比较,实时性有所降低。但通过对meta源码的分析,发现它的实时性还是可以保证的,具体实现方法分析如下:
轮训时间的控制在FetchRequestQueue类的take方法中的如下几行代码:
final long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { final long tl = this.available.awaitNanos(delay); }
delay值的设置是按照下面规则进行的:
- 初始化为0,所以第一次该请求会立即和broker端通讯,以获得消息组。
- 大部分情况下pull的轮训间隔为0,所以它的实时性还是可以保证的。
Meta使用zookeeper的细节
meta对zookeeper非常依赖,而且重要的信息同步都是通过它完成的,官方文档对这一块的说明也不多,所以,我们对meta的分析就从这开始,当我们清晰meta中zookeeper的使用细节后,meta的内部实现原理也就基本清晰了。
meta里使用zookeeper最典型的就是类ZKLoadRebalanceListener的 rebalance()方法,该方法使用到了很多zookeeper的znode,下面我们对这个方面做一个详细介绍:
rebalance方法是用来计算某消息消费者具体应该消费哪个meta服务器节点上的哪些分区中的消息(也就是“消费者的负载平衡”)。该方面在下面两种情况下被触发:
- 监视同一个消费者分组的consumer列表是否有变化;
- 监视订阅的topic下的broker节点是否有新增或删除;
当上面两种情况发生时,zookeeper的机制会自动触发到rebalance方法,它的具体算法如下:
1.从zookeeper的'/meta/consumers/某消费者组/ids/某消费者id'节点上获取该消费者的所有订阅了的主题;
2.从zookeeper的'/meta/brokers/ids'节点上获取所有的broker列表;
3.从zookeeper的'/meta/consumers/某消费者组/ids/'、'/meta/consumers/某消费者组/ids/某消费者id'这两个节点中获得每个topic下有哪些消费者;
4.从zookeeper的'/meta/brokers/topics/某topic'节点获得对应topic在broker(包括:master和slaves)里有哪些partition;
5.从上面获得的所有这些信息,调用类ZKLoadRebalanceListener中getRelevantTopicMap方法,判断最新的partition列表或consumer列表和当前在用的是否有变化,如果没有变化则再补充做一个动作(因为虽然partition和consumer都没有新增或删除,但可能cluster的结构发生变化了):对集群做一个比较,看是否有机器down或新增;如果没有变化则继续进行后续rebalance操作;
6.如果经过上面的操作确认相关partition列表或consumer列表有变化,则根据'负载均衡策略'获取某个consumer对应的partition列表,然后根据之前老的和最新的partition列表做相关操作,如新分区在zookeeper上的挂载、释放等操作;
上面是client端一个比较重要的功能,也是zookeeper在meta里一个用的较多的地方。
Meta的HA
HA是任何进入生产环境的软件都需要考虑的一个重要因素,meta提供两种HA的方式:同步和异步(推荐使用异步方式)。
官方提供的文档里已经很清晰说明了HA的场景,这里我们再补充一些官方文档中未详细说明的部分:
- meta采用冷备方式来实现HA,任何主、备的切换都得重启相关broker。
- 任何时候只有master可以执行写操作;实现代码在BrokerConnectionListener类的syncedUpdateBrokersInfo方法,该方法从zookeeper上同步最新的master列表供producer使用。
- client可以连接cluster上的任何一台机器包括master和slave,作为它的消息来源,类似数据库的读写分离。
- 如果master因为某种原因当机,则必须手动停止某台slave并对它进行相关配置操作,并启动它使它成为新的master。当那台坏了的master修复后,它将作为一台新的slave加入集群。
- 在master上不停机的情况下新增一个topic,这个新增的topic不能自动的同步到slave上,必须通过某种方式把master上的server.ini同步到所有slave上,然后通过人工重启或通过jmx重启slave。
同步HA的切换过程:
生产环境中有一台master和一个同步slave,slave是不注册到zookeeper上的,当master当机,则所有连接到该broker的生产者和消费者都停止正常工作。然后人工停止slave,并新增samsa_master.properties配置文件,修改其中recoverOffset属性为true。并且修改server.ini中的brokerId为故障master的id,这些修改做完后,重启slave。这样它就成为新的master对外提供服务了。
Meta的transaction实现
对事务的支持是meta的一个重要特点,目前它支持XA和本地事务,下面我们详细对XA事务进行分析,由于本地事务相对简单,可以参考XA的实现。
分布式事务(XA)
分布式事务介绍
分布式事务在分布式应用中是非常重要的,目前分布式事务的实现标准是XA,而在java体系中就是JTA标准。下面是XA的一个示意图:
这里我们重点说一下两阶段提交协议(2PC),它是XA的核心思想,具体示意图如下:
具体更多关于XA的细节请参考XA的接口规范。
meta的实现
meta里事务的实现主要是如下几个类:
- TransactionalCommandProcessor:事务命令处理器。它主要作用是接收client端发过来的各种请求,如beginTransaction、prepare、commit、rollback、新增消息等。
- JournalTransactionStore:基于文件方式的事务存储引擎。
- JournalStore:具体存储事务的文件存储类。
它们3者之间的关系是:TransactionalCommandProcessor接收client端的各种事务请求,然后调用JournalTransactionStore进行事务存储,JournalTransactionStore根据不同的client请求调用JournalStore具体保存事务信息到磁盘文件。
下面通过一个示意图来进一步进行说明:
client通过调用beginTransaction来新开始一个事务(在meta里就是一个Tx实例),并把它放在JournalTransactionStore类的inflightTransactions队列里,然后client就可以在这个Tx中新增消息,但这些新增的消息是放在JournalStore文件里,并且完整的保存在内存中(由于meta目前没有专门的内存管理机制,当事务数量特别大的时候,这个地方有可能会出现内存溢出)。当client进行2PC中的prepare时,事务从inflightTransactions队列移到preparedTransactions队列,并保存相关信息到JournalStore。当执行commit时,该Tx的所有消息才真正放到MessageStore里供消息消费者读取。当client端发起rollback请求后,Tx被从preparedTransactions队列中删除,并保存相关信息到JournalStore。
下面我们对meta事务实现的几个重要方面做一个详细介绍:
beginTransaction
当client端的TransactionContext(XAResource的实现)调用start方法,broker接收到请求后,启动一个新事务。
新增消息
处理序列图如下:
当事务begin后,client端向broker发送多条消息存储的请求,broker收到请求后会调用JournalTransactionStore的addMessage方法。该方法把请求存储在事务日志文件中(JournalStore),同时新建或找到对应的Tx实例,把这些消息存储请求保存在内存中。这里注意一点,在事务没有提交之前,这些消息存储是不会被放到对应topic消息存储文件中去的。
prepare的处理过程
处理序列图如下:
prepare的处理过程相对简单些,它只是把Tx实例从JournalTransactionStore类的inflightTransactions中移除到preparedTransactions中,同时在事务日志文件存储相关信息。
commit的处理过程
处理序列图如下:
commit过程相当复杂点。broker收到client端的commit请求,调用JournalTransactionStore的commit方法,从preparedTransactions里找到对应的Tx,把该Tx里的所有请求命令(PutCommand),按照topic和分区分别保存到真正的topic消息存储文件中去,当全部保存完时,就会通过回调类AppendCallback的appendComplete方法记录commit日志到事务日志文件。
recover的处理过程
处理序列图如下:
recover操作发生在系统重启的时候,主要是为了还原系统上一次停止时候的事务场景,如还原处在prepare阶段的事务,rollback所有本地事务和没有prepare的XA事务。recover的处理细节包括两部分:
- 在JournalTransactionStore的构造函数中进行JournalStore的recover操作
JournalStore的recover主要是完成从事务日志文件中按照最近的checkpoint从日志中读取所有的日志记录,并按照记录的类型APPEND_MSG和TX_OP分别进行还原操作:
APPEND_MSG类型
这种类型的日志记录就调用JournalTransactionStore的addMessage方法,但是不会往日志文件中重复记录该消息了。
TX_OP类型
这种类型的处理相当复杂点。它根据日志记录的类型又细分为下面几种
XA_PREPARE:根据TransactionId把对应的Tx实例从JournalTransactionStore类的inflightTransactions中移到preparedTransactions中。
XA_COMMIT和LOCAL_COMMIT:根据TransactionId从JournalTransactionStore类的inflightTransactions或preparedTransactions中找到对应的Tx实例。把该Tx内的所有消息请求对比相应topic消息存储文件中消息,如果topic消息存储文件中不存在这些消息则新增,如果存在则通过crc32校验码进行比对。
LOCAL_ROLLBACK和XA_ROLLBACK:根据TransactionId把对应的Tx实例从JournalTransactionStore类的inflightTransactions或preparedTransactions中删除。
在TransactionalCommandProcessor的init方法中调用JournalTransactionStore类的recover操作
经过上面的recover操作后,它已经把meta重启前的事务现场在JournalTransactionStore和JournalStore中进行了还原。接下来就是TransactionalCommandProcessor类的事务现场还原,这个过程是把JournalTransactionStore类的preparedTransactions中的所有Tx在TransactionalCommandProcessor中进行还原,该过程相对简单,可参考源码实现。
经过上面这些recover步骤后,meta作为XAResource就可以继续加入XA事务了。