使用kafka 2.1.0 ,然后用最新的kafka-manager
1.3.3.18来管理kafka, 然后写了一个生产者和消费者程序,程序运行后,死活显示不出来
程序运行后,消费者的group死活显示不出来。
生产者代码如下:
package com.kafka.producer;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerDemo {
public static void main(String[] args) {
int i = 0;
while (true) {
i++;
try {
send("test", String.format("test_%d", i), "123");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(String.format("Kafka写入:%d", i));
}
}
private static Producer<String, Object> producer;
private static KafkaConsumer<String, Object> consumer;
private static final String server = "127.0.0.1:9092";
static {
Properties props = buildProducerConfig();
producer = new KafkaProducer<>(props);
}
private static Properties buildProducerConfig() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址,也就是Broker地址
props.put("bootstrap.servers", server);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static RecordMetadata send(String topic, String key, Object Obj) throws InterruptedException, ExecutionException {
return producer.send(new ProducerRecord<String, Object>(topic, key, Obj)).get();
}
public static void sendAsync(String topic,String key,Object obj) {
producer.send(new ProducerRecord<String, Object>(topic, key, obj), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e !=null) {
System.out.println(ExceptionUtils.getStackTrace(e));
}
}
} );
}
}
消费者程序如下:
package com.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
KafkaUtils.consume();
}
private static KafkaConsumer<String, Object> consumer;
private static final String server = "127.0.0.1:9092";
static {
Properties props = buildConsumerConfig();
consumer = new KafkaConsumer<>(props);
}
private static Properties buildConsumerConfig() {
Properties props;
props = new Properties();
props.put("bootstrap.servers", server);
// 消费组
props.put("group.id", "testGroup");
props.put("enable.auto.commit", "true");
// 设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
public static void consume() {
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 每隔100ms拉取一次数据
ConsumerRecords<String, Object> records = consumer.poll(100);
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("partition=%d,offset = %d, key = %s, value = %s\n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
然后在kafka manager的消费者组显示不出来,为了查找原因,去看kafka manager日志。发现日志报错如下:
[warn] k.m.a.c.KafkaManagedOffsetCache - Failed to process a message from offset topic on cluster test-Kafka!
kafka.common.KafkaException: Unknown offset schema version 3
at kafka.manager.utils.one10.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:428) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
at kafka.manager.utils.one10.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:532) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:332) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
at scala.util.Success.foreach(Try.scala:236) [org.scala-lang.scala-library-2.11.12.jar:na]
at kafka.manager.actor.cluster.KafkaManagedOffsetCache.run(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]
初步诊断是kafka manager的问题,觉得具体深入分析下,发现kafka manager是用scala写的, 自己有不了解scala,顿时感觉无从下手,
但是想想,程序应该都差不多,就去分析分析原因吧,发现错误日志在GroupMetadataManager.scala:428,这行,那应该错误也在这边,
然后在google找了找,也没有很好的解决方式,只能在github的kafka manager提了个Issue,发现有人修改过源代码后成功显示了,安装这位老兄的提示
修改scala源代码,然后重新编译打包,问题终于得到了解决。
修改的scala源代码如下:
git diff origin/master
diff --git a/app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala
index 85771cd..f16b1a3 100644
--- a/app/kafka/manager/utils/one10/GroupMetadataManager.scala
+++ b/app/kafka/manager/utils/one10/GroupMetadataManager.scala
@@ -368,6 +368,25 @@ object GroupMetadataManager {
new Field(SUBSCRIPTION_KEY, BYTES),
new Field(ASSIGNMENT_KEY, BYTES))
+ private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
+
+ private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
+ new Field("metadata", STRING, "Associated metadata.", ""),
+ new Field("commit_timestamp", INT64))
+ private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
+ private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
+ private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
+
+ private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
+ new Field("offset", INT64),
+ new Field("leader_epoch", INT32),
+ new Field("metadata", STRING, "Associated metadata.", ""),
+ new Field("commit_timestamp", INT64))
+ private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
+ private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
+ private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
+ private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
+
private val PROTOCOL_TYPE_KEY = "protocol_type"
private val GENERATION_KEY = "generation"
private val PROTOCOL_KEY = "protocol"
@@ -388,6 +407,12 @@ object GroupMetadataManager {
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
+ private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
+ new Field(PROTOCOL_TYPE_KEY, STRING),
+ new Field(GENERATION_KEY, INT32),
+ new Field(PROTOCOL_KEY, NULLABLE_STRING),
+ new Field(LEADER_KEY, NULLABLE_STRING),
+ new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
// map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(
@@ -398,13 +423,18 @@ object GroupMetadataManager {
// map of version of offset value schemas
private val OFFSET_VALUE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
- 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
+ 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
+ 2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
+ 3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
+
private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
// map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(
0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
- 1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
+ 1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
+ 2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
+
private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
@@ -545,6 +575,20 @@ object GroupMetadataManager {
val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+ } else if (version == 2) {
+ val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
+ val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
+ val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
+
+ OffsetAndMetadata(offset, metadata, commitTimestamp)
+ } else if (version == 3) {
+ val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
+ val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
+ val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
+ val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
+
+ // val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)
+ OffsetAndMetadata(offset, metadata, commitTimestamp)
} else {
throw new IllegalStateException("Unknown offset message version")
}
完整的app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala代码如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.manager.utils.one10
import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.UUID
import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}
import kafka.utils.{Logging, nonthreadsafe}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.{ConsumerProtocol, PartitionAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.collection.{Seq, immutable, mutable, _}
/**
* Case class used to represent group metadata for the ListGroups API
*/
case class GroupOverview(groupId: String,
protocolType: String)
/**
* We cache offset commits along with their commit record offset. This enables us to ensure that the latest offset
* commit is always materialized when we have a mix of transactional and regular offset commits. Without preserving
* information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit
* being materialized.
*/
case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {
def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get
}
/**
* Group contains the following metadata:
*
* Membership metadata:
* 1. Members registered in this group
* 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
* 3. Protocol metadata associated with group members
*
* State metadata:
* 1. group state
* 2. generation id
* 3. leader id
*/
@nonthreadsafe
class GroupMetadata(val groupId: String
, var protocolType: Option[String]
, var generationId: Int
, var protocol: Option[String]
, var leaderId: Option[String]
) extends Logging {
private val members = new mutable.HashMap[String, MemberMetadata]
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
private var receivedTransactionalOffsetCommits = false
private var receivedConsumerOffsetCommits = false
var newMemberAdded: Boolean = false
def has(memberId: String) = members.contains(memberId)
def get(memberId: String) = members(memberId)
def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
def leaderOrNull: String = leaderId.orNull
def protocolOrNull: String = protocol.orNull
def add(member: MemberMetadata) {
if (members.isEmpty)
this.protocolType = Some(member.protocolType)
assert(groupId == member.groupId)
assert(this.protocolType.orNull == member.protocolType)
assert(supportsProtocols(member.protocols))
if (leaderId.isEmpty)
leaderId = Some(member.memberId)
members.put(member.memberId, member)
}
def remove(memberId: String) {
members.remove(memberId)
if (isLeader(memberId)) {
leaderId = if (members.isEmpty) {
None
} else {
Some(members.keys.head)
}
}
}
def allMembers = members.keySet
def allMemberMetadata = members.values.toList
// TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString
private def candidateProtocols = {
// get the set of protocols that are commonly supported by all members
allMemberMetadata
.map(_.protocols)
.reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
}
def supportsProtocols(memberProtocols: Set[String]) = {
members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
}
def overview: GroupOverview = {
GroupOverview(groupId, protocolType.getOrElse(""))
}
def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {
this.offsets ++= offsets
this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
}
def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {
if (pendingOffsetCommits.contains(topicPartition)) {
if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +
"in the log.")
if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
offsets.put(topicPartition, offsetWithCommitRecordMetadata)
}
pendingOffsetCommits.get(topicPartition) match {
case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>
pendingOffsetCommits.remove(topicPartition)
case _ =>
// The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case
// its entries would be removed from the cache by the `removeOffsets` method.
}
}
def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
pendingOffsetCommits.get(topicPartition) match {
case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
case _ =>
}
}
def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
receivedConsumerOffsetCommits = true
pendingOffsetCommits ++= offsets
}
def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) {
trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending")
receivedTransactionalOffsetCommits = true
val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
offsets.foreach { case (topicPartition, offsetAndMetadata) =>
producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))
}
}
def hasReceivedConsistentOffsetCommits : Boolean = {
!receivedConsumerOffsetCommits || !receivedTransactionalOffsetCommits
}
/* Remove a pending transactional offset commit if the actual offset commit record was not written to the log.
* We will return an error and the client will retry the request, potentially to a different coordinator.
*/
def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition): Unit = {
pendingTransactionalOffsetCommits.get(producerId) match {
case Some(pendingOffsets) =>
val pendingOffsetCommit = pendingOffsets.remove(topicPartition)
trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetCommit failed " +
s"to be appended to the log")
if (pendingOffsets.isEmpty)
pendingTransactionalOffsetCommits.remove(producerId)
case _ =>
// We may hit this case if the partition in question has emigrated already.
}
}
def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,
commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) {
pendingTransactionalOffsetCommits.get(producerId) match {
case Some(pendingOffset) =>
if (pendingOffset.contains(topicPartition)
&& pendingOffset(topicPartition).offsetAndMetadata == commitRecordMetadataAndOffset.offsetAndMetadata)
pendingOffset.update(topicPartition, commitRecordMetadataAndOffset)
case _ =>
// We may hit this case if the partition in question has emigrated.
}
}
/* Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written
* to the log.
*/
def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = {
val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)
if (isCommit) {
pendingOffsetsOpt.foreach { pendingOffsets =>
pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")
val currentOffsetOpt = offsets.get(topicPartition)
if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
"committed and loaded into the cache.")
offsets.put(topicPartition, commitRecordMetadataAndOffset)
} else {
trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +
s"committed, but not loaded since its offset is older than current offset $currentOffsetOpt.")
}
}
}
} else {
trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetsOpt aborted")
}
}
def activeProducers = pendingTransactionalOffsetCommits.keySet
def hasPendingOffsetCommitsFromProducer(producerId: Long) =
pendingTransactionalOffsetCommits.contains(producerId)
def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
topicPartitions.flatMap { topicPartition =>
pendingOffsetCommits.remove(topicPartition)
pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
pendingOffsets.remove(topicPartition)
}
val removedOffset = offsets.remove(topicPartition)
removedOffset.map(topicPartition -> _.offsetAndMetadata)
}.toMap
}
def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
val expiredOffsets = offsets
.filter {
case (topicPartition, commitRecordMetadataAndOffset) =>
commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
}
.map {
case (topicPartition, commitRecordOffsetAndMetadata) =>
(topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
}
offsets --= expiredOffsets.keySet
expiredOffsets.toMap
}
def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
(topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
}.toMap
def offset(topicPartition: TopicPartition): Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)
// visible for testing
def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)
def numOffsets = offsets.size
def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty || pendingTransactionalOffsetCommits.nonEmpty
override def toString: String = {
"GroupMetadata(" +
s"groupId=$groupId, " +
s"generation=$generationId, " +
s"protocolType=$protocolType, " +
s"members=$members)"
}
}
/**
* Messages stored for the group topic has versions for both the key and value fields. Key
* version is used to indicate the type of the message (also to differentiate different types
* of messages from being compacted together if they have the same field values); and value
* version is used to evolve the messages within their data types:
*
* key version 0: group consumption offset
* -> value version 0: [offset, metadata, timestamp]
*
* key version 1: group consumption offset
* -> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
*
* key version 2: group metadata
* -> value version 0: [protocol_type, generation, protocol, leader, members]
*/
object GroupMetadataManager {
private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
private val CURRENT_GROUP_KEY_SCHEMA_VERSION2 = 3.toShort
private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
new Field("topic", STRING),
new Field("partition", INT32))
private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")
private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("timestamp", INT64))
private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64),
new Field("expire_timestamp", INT64))
private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
//new add for version
private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64))
private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
new Field("offset", INT64),
new Field("leader_epoch", INT32),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64))
private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
//new add for version 3-end
private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
private val MEMBER_ID_KEY = "member_id"
private val CLIENT_ID_KEY = "client_id"
private val CLIENT_HOST_KEY = "client_host"
private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
private val SESSION_TIMEOUT_KEY = "session_timeout"
private val SUBSCRIPTION_KEY = "subscription"
private val ASSIGNMENT_KEY = "assignment"
private val MEMBER_METADATA_V0 = new Schema(
new Field(MEMBER_ID_KEY, STRING),
new Field(CLIENT_ID_KEY, STRING),
new Field(CLIENT_HOST_KEY, STRING),
new Field(SESSION_TIMEOUT_KEY, INT32),
new Field(SUBSCRIPTION_KEY, BYTES),
new Field(ASSIGNMENT_KEY, BYTES))
private val MEMBER_METADATA_V1 = new Schema(
new Field(MEMBER_ID_KEY, STRING),
new Field(CLIENT_ID_KEY, STRING),
new Field(CLIENT_HOST_KEY, STRING),
new Field(REBALANCE_TIMEOUT_KEY, INT32),
new Field(SESSION_TIMEOUT_KEY, INT32),
new Field(SUBSCRIPTION_KEY, BYTES),
new Field(ASSIGNMENT_KEY, BYTES))
//new add for version
private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
private val PROTOCOL_TYPE_KEY = "protocol_type"
private val GENERATION_KEY = "generation"
private val PROTOCOL_KEY = "protocol"
private val LEADER_KEY = "leader"
private val MEMBERS_KEY = "members"
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
// map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_KEY_SCHEMA,
1 -> OFFSET_COMMIT_KEY_SCHEMA,
2 -> GROUP_METADATA_KEY_SCHEMA
)
// map of version of offset value schemas
private val OFFSET_VALUE_SCHEMAS = Map(
1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3
)
private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
// map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(
1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
private def schemaForKey(version: Int) = {
val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
schemaOpt match {
case Some(schema) => schema
case _ => throw new KafkaException("Unknown offset schema version " + version)
}
}
private def schemaForOffset(version: Int) = {
val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
println("version is:"+version+", schemaOpt is: "+schemaOpt)
schemaOpt match {
case Some(schema) => schema
case _ => throw new KafkaException("Unknown offset schema version " + version)
}
}
private def schemaForGroup(version: Int) = {
val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
schemaOpt match {
case Some(schema) => schema
case _ => throw new KafkaException("Unknown group metadata version " + version)
}
}
/**
* Generates the key for offset commit message for given (group, topic, partition)
*
* @return key for offset commit message
*/
def offsetCommitKey(group: String, topicPartition: TopicPartition,
versionId: Short = 0): Array[Byte] = {
val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
key.set(OFFSET_KEY_GROUP_FIELD, group)
key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
key.writeTo(byteBuffer)
byteBuffer.array()
}
/**
* Generates the key for group metadata message for given group
*
* @return key bytes for group metadata message
*/
def groupMetadataKey(group: String): Array[Byte] = {
val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
key.set(GROUP_KEY_GROUP_FIELD, group)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
key.writeTo(byteBuffer)
byteBuffer.array()
}
/**
* Generates the payload for offset commit message from given offset and metadata
*
* @param offsetAndMetadata consumer's current offset and metadata
* @return payload for offset commit message
*/
def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
// generate commit value with schema version 1
val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
value.writeTo(byteBuffer)
byteBuffer.array()
}
/**
* Decodes the offset messages' key
*
* @param buffer input byte-buffer
* @return an GroupTopicPartition object
*/
def readMessageKey(buffer: ByteBuffer): BaseKey = {
val version = buffer.getShort
val keySchema = schemaForKey(version)
val key = keySchema.read(buffer)
if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
// version 0 and 1 refer to offset
val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))
} else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
// version 2 refers to offset
val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]
GroupMetadataKey(version, group)
} else if(version == CURRENT_GROUP_KEY_SCHEMA_VERSION2) {//new add
// version 3 refers to offset
val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]
GroupMetadataKey(version, group)
} else {
throw new IllegalStateException("Unknown version " + version + " for group metadata message")
}
}
/**
* Decodes the offset messages' payload and retrieves offset and metadata from it
*
* @param buffer input byte-buffer
* @return an offset-metadata object from the message
*/
def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
if (buffer == null) { // tombstone
null
} else {
val version = buffer.getShort
val valueSchema = schemaForOffset(version)
val value = valueSchema.read(buffer)
if (version == 0) {
val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]
val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, timestamp)
} else if (version == 1) {
val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
} else if (version == 2) {
val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
} else if (version == 3) {
val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
} else {
throw new IllegalStateException("Unknown offset message version")
}
}
}
/**
* Decodes the group metadata messages' payload and retrieves its member metadatafrom it
*
* @param buffer input byte-buffer
* @return a group metadata object from the message
*/
def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
if (buffer == null) { // tombstone
null
} else {
val version = buffer.getShort
val valueSchema = schemaForGroup(version)
val value = valueSchema.read(buffer)
if (version == 0 || version == 1) {
val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
val memberMetadataArray = value.getArray(MEMBERS_KEY)
val members = memberMetadataArray.map { memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
val subscription: PartitionAssignor.Subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
val assignment: PartitionAssignor.Assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
val member = new MemberMetadata(memberId
, groupId
, clientId
, clientHost
, protocolType
, List((protocol, subscription.topics().asScala.toSet))
, assignment.partitions().asScala.map(tp => (tp.topic(), tp.partition())).toSet)
member
}
val finalProtocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
val group = new GroupMetadata(groupId = groupId
, generationId = generationId
, protocolType = finalProtocolType
, protocol = Option(protocol)
, leaderId = Option(leaderId)
)
members.foreach(group.add)
group
} else {
throw new IllegalStateException("Unknown group metadata message version")
}
}
}
// Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
// (specify --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
class OffsetsMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is an offset record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>
val groupTopicPartition = offsetKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
}
}
// Formatter for use with tools to read group metadata history
class GroupMetadataMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
// Only print if the message is a group metadata record.
// We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case groupMetadataKey: GroupMetadataKey =>
val groupId = groupMetadataKey.key
val value = consumerRecord.value
val formattedValue =
if (value == null) "NULL"
else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
output.write(groupId.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
output.write("\n".getBytes(StandardCharsets.UTF_8))
case _ => // no-op
}
}
}
}
case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
def this(group: String, topic: String, partition: Int) =
this(group, new TopicPartition(topic, partition))
override def toString: String =
"[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
}
trait BaseKey{
def version: Short
def key: Any
}
case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
override def toString: String = key.toString
}
case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
override def toString: String = key
}
posted on 2018-12-13 15:36
fly 阅读(1501)
评论(0) 编辑 收藏 所属分类:
J2EE