Theory:
Per Documentation about Consumer Group:You can group consumers together by setting the same value for the group.id configuration parameter when you start each consumer.
Per Documentation about Cursor:
- Read cursors
A consumer's read cursor is the offset of the most recent message that MapR Streams has sent to a consumer from a partition. - Committed cursors
Consumers that are part of a consumer group can save the current position of their read cursor. Consumers can do this either automatically or manually. The saved cursor is called a committed cursor because it indicates that the consumer has processed all messages in a partition up to and including the one with this offset.
Experiment:
Here we add a parameter "group.id"="CG1" in SampleConsumer_CG1.To make things simplest, here the topic only has one partition, and Consumer Group "CG1" will start only one consumer.
Session A: Producer -- SampleProducer_1sec is running.
1 | mapr openkb.stream.SampleProducer_1sec |
1 | mapr openkb.stream.SampleConsumer_CG1 |
1. Kill the Consumer using "CTRL-C".
This is to simulate the consumer failure scenario.1 2 3 | Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366086948, timestamp = 1461255947151, producer = root, key = null, value = Msg 146) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366086949, timestamp = 1461255947151, producer = root, key = null, value = Msg 147) ^C |
2. Check committed cursor of the topic for Consumer Group "CG1".
1 2 3 | # maprcli stream cursor list -path /stream/s1 -topic info -consumergroup CG1 topic partitionid consumertimestamp produceroffset consumergroup consumerlagmillis committedoffset producertimestamp info 0 2016-04-21T11:25:47.151-0500 366087192 CG1 6004 366086950 2016-04-21T11:25:53.155-0500 |
3. Restart Consumer and kill it immediately by "CTRL-C".
Consumer can successfully start from the message with offset >= 366086950(committed cursor).Note: There could be gap between 2 adjacent offsets.
In this case, the next available message "Msg 148 " is at offset=366087029.
1 2 3 4 | Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087029, timestamp = 1461255950153, producer = root, key = null, value = Msg 148) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087030, timestamp = 1461255950153, producer = root, key = null, value = Msg 149) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087031, timestamp = 1461255950153, producer = root, key = null, value = Msg 150) ^C |
4. Check committed cursor again.
1 2 3 | maprcli stream cursor list -path /stream/s1 -topic info -consumergroup CG1 topic partitionid consumertimestamp produceroffset consumergroup consumerlagmillis committedoffset producertimestamp info 0 2016-04-21T11:25:47.151-0500 366087684 CG1 24016 366086950 2016-04-21T11:26:11.167-0500 |
5. Restart Consumer again.
1 2 3 | Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087029, timestamp = 1461255950153, producer = root, key = null, value = Msg 148) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087030, timestamp = 1461255950153, producer = root, key = null, value = Msg 149) Consumed Record: ConsumerRecord(topic = /stream/s1:info, partition = 0, offset = 366087031, timestamp = 1461255950153, producer = root, key = null, value = Msg 150) |
This test prove that:
a. Duplicate messages could be read in the case of consumer failure.
b. Committed cursor could be lagging behind read cursor.
c. Consumer starts reading from committed cursor instead of read cursor.
No comments:
Post a Comment