Theory:
Per Documentation about Consumer Group:You can group consumers together by setting the same value for the group.id configuration parameter when you start each consumer.
Per Documentation about Cursor:
- Read cursors
A consumer's read cursor is the offset of the most recent message that MapR Streams has sent to a consumer from a partition. - Committed cursors
Consumers that are part of a consumer group can save the current position of their read cursor. Consumers can do this either automatically or manually. The saved cursor is called a committed cursor because it indicates that the consumer has processed all messages in a partition up to and including the one with this offset.
Experiment:
Here we add a parameter "group.id"="CG1" in SampleConsumer_CG1.To make things simplest, here the topic only has one partition, and Consumer Group "CG1" will start only one consumer.
Session A: Producer -- SampleProducer_1sec is running.
mapr openkb.stream.SampleProducer_1secSession B: Consumer with group.id="CG1" -- SampleConsumer_CG1 is also running. This is what we will play with.
mapr openkb.stream.SampleConsumer_CG1
1. Kill the Consumer using "CTRL-C".
This is to simulate the consumer failure scenario.Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366086948, timestamp = 1461255947151, producer = root, key = null, value = Msg 146) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366086949, timestamp = 1461255947151, producer = root, key = null, value = Msg 147) ^CFrom above last output of Consumer, we know that the latest read message "Msg 147" is at "offset = 366086949". So the read cursor should be pointing to the next offset -- 366086950.
2. Check committed cursor of the topic for Consumer Group "CG1".
# maprcli stream cursor list -path /stream/s1 -topic info -consumergroup CG1 topic partitionid consumertimestamp produceroffset consumergroup consumerlagmillis committedoffset producertimestamp info 0 2016-04-21T11:25:47.151-0500 366087192 CG1 6004 366086950 2016-04-21T11:25:53.155-0500Right now, the committed cursor is the same as read cursor -- 366086950.
3. Restart Consumer and kill it immediately by "CTRL-C".
Consumer can successfully start from the message with offset >= 366086950(committed cursor).Note: There could be gap between 2 adjacent offsets.
In this case, the next available message "Msg 148 " is at offset=366087029.
Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087029, timestamp = 1461255950153, producer = root, key = null, value = Msg 148) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087030, timestamp = 1461255950153, producer = root, key = null, value = Msg 149) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087031, timestamp = 1461255950153, producer = root, key = null, value = Msg 150) ^CFrom above last output of Consumer, we know that the latest read message "Msg 150" is at offset 366087031. So the read cursor should be pointing to the next offset 366087032.
4. Check committed cursor again.
maprcli stream cursor list -path /stream/s1 -topic info -consumergroup CG1 topic partitionid consumertimestamp produceroffset consumergroup consumerlagmillis committedoffset producertimestamp info 0 2016-04-21T11:25:47.151-0500 366087684 CG1 24016 366086950 2016-04-21T11:26:11.167-0500Since we cancelled the consumer too fast, the Consumer did not have time to change the committed cursor. So the committed cursor is still at offset 366086950.
5. Restart Consumer again.
Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087029, timestamp = 1461255950153, producer = root, key = null, value = Msg 148) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087030, timestamp = 1461255950153, producer = root, key = null, value = Msg 149) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087031, timestamp = 1461255950153, producer = root, key = null, value = Msg 150)Consumer starts again from "Msg 148" at offset 366087029.
This test prove that:
a. Duplicate messages could be read in the case of consumer failure.
b. Committed cursor could be lagging behind read cursor.
c. Consumer starts reading from committed cursor instead of read cursor.
No comments:
Post a Comment