Friday, September 7, 2018

How to dig into the Kafka Streams/ksql's statestore in RocksDB

Goal:

By default, Kafka Streams and ksql use RocksDB as the internal state store.
This feature is used for:
  1. an internally created and compacted changelog topic (for fault-tolerance)
  2. one (or multiple) RocksDB instances (for cached key-value lookups).
Thus, in case of starting/stopping applications and rewinding/reprocessing, this internal data needs to get managed correctly.
More design details can be found in Kafka Streams Internal Data Management.
This article explains step by step on how to use tools "ldb" and "sst_dump" to dig deeper into the RocksDB state stores.

Env:

MapR 6.0.1
CentOS 7.4

Solution:

1. Build and install ldb and sst_dump

yum groupinstall 'Development Tools'
git clone git@github.com:facebook/rocksdb.git
cd rocksdb
make ldb sst_dump

2. Running a ksql query

Refer to How to use ksql on MapR Streams(MapR-ES).
The running query ID which we test here is named "CSAS_PAGEVIEWS_FEMALE":
ksql> show queries;

 Query ID              | Kafka Topic                     | Query String
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 CSAS_PAGEVIEWS_FEMALE | /sample-stream:pageviews_female | CREATE STREAM pageviews_female WITH (kafka_topic='/sample-stream:pageviews_female', value_format='json') AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
And one of the RocksDB state stores for this query is located here:
/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003

3. Examine the state store using ldb

  • Scan all the key value pairs from this state store
# ./ldb --db=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003 scan
User_1 : {"REGISTERTIME":1511164836164,"ROWTIME":1536354992072,"USERID":"User_1","GENDER":"FEMALE","ROWKEY":"User_1","REGIONID":"Region_1"}
User_2 : {"REGISTERTIME":1513176010609,"ROWTIME":1536355004549,"USERID":"User_2","GENDER":"FEMALE","ROWKEY":"User_2","REGIONID":"Region_9"}
User_3 : {"REGISTERTIME":1505260753939,"ROWTIME":1536355010552,"USERID":"User_3","GENDER":"MALE","ROWKEY":"User_3","REGIONID":"Region_6"}
User_4 : {"REGISTERTIME":1518311990103,"ROWTIME":1536355016057,"USERID":"User_4","GENDER":"FEMALE","ROWKEY":"User_4","REGIONID":"Region_4"}
User_5 : {"REGISTERTIME":1500613777028,"ROWTIME":1536354977350,"USERID":"User_5","GENDER":"FEMALE","ROWKEY":"User_5","REGIONID":"Region_2"}
User_6 : {"REGISTERTIME":1510227650166,"ROWTIME":1536355024298,"USERID":"User_6","GENDER":"MALE","ROWKEY":"User_6","REGIONID":"Region_7"}
User_7 : {"REGISTERTIME":1505244299235,"ROWTIME":1536355016057,"USERID":"User_7","GENDER":"MALE","ROWKEY":"User_7","REGIONID":"Region_3"}
User_8 : {"REGISTERTIME":1515774224403,"ROWTIME":1536355010552,"USERID":"User_8","GENDER":"FEMALE","ROWKEY":"User_8","REGIONID":"Region_1"}
User_9 : {"REGISTERTIME":1495112452553,"ROWTIME":1536355024298,"USERID":"User_9","GENDER":"OTHER","ROWKEY":"User_9","REGIONID":"Region_7"}
Since the column "userid" is the key for table "users_original" when we create it, above records the current latest user data for each "userid".
To double confirm, this output matches the Drill on MapR Streams output as well:
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select * from ksql.`/sample-stream:users` where USERID='User_1' order by kafkaMsgTimestamp desc limit 1;
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
|  registertime  | gender  | regionid  | userid  |      kafkaTopic       | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  | kafkaMsgKey  |
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
| 1511164836164  | FEMALE  | Region_1  | User_1  | /sample-stream:users  | 0                 | 2973            | 1536354992072      | [B@14204fe2  |
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
1 row selected (0.387 seconds)
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select * from ksql.`/sample-stream:users` where USERID='User_2' order by kafkaMsgTimestamp desc limit 1;
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
|  registertime  | gender  | regionid  | userid  |      kafkaTopic       | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  | kafkaMsgKey  |
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
| 1513176010609  | FEMALE  | Region_9  | User_2  | /sample-stream:users  | 0                 | 2975            | 1536355004549      | [B@12adfaf0  |
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
1 row selected (0.436 seconds)
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select * from ksql.`/sample-stream:users` where USERID='User_3' order by kafkaMsgTimestamp desc limit 1;
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
|  registertime  | gender  | regionid  | userid  |      kafkaTopic       | kafkaPartitionId  | kafkaMsgOffset  | kafkaMsgTimestamp  | kafkaMsgKey  |
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
| 1505260753939  | MALE    | Region_6  | User_3  | /sample-stream:users  | 0                 | 2979            | 1536355010552      | [B@10fcb3c6  |
+----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+
1 row selected (0.95 seconds)
  • Fetch value based on key
# ./ldb --db=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003 get User_1
{"REGISTERTIME":1508781371805,"ROWTIME":1536355797368,"USERID":"User_1","GENDER":"MALE","ROWKEY":"User_1","REGIONID":"Region_9"}
  • Dump the whole database into a HEX file
# ./ldb --db=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003 dump --hex > /tmp/dbdump
  • Restore the dump HEX file into a database
# cat /tmp/dbdump | ./ldb --db=/tmp/test_db_new load --hex --block_size=65536 --create_if_missing --disable_wal
After that, this new database can show us the current key value info for the state store:
# ./ldb --db=/tmp/test_db_new scan
User_1 : {"REGISTERTIME":1506734440801,"ROWTIME":1536358470693,"USERID":"User_1","GENDER":"MALE","ROWKEY":"User_1","REGIONID":"Region_1"}
User_2 : {"REGISTERTIME":1489206390421,"ROWTIME":1536358479390,"USERID":"User_2","GENDER":"OTHER","ROWKEY":"User_2","REGIONID":"Region_9"}
User_3 : {"REGISTERTIME":1506433604346,"ROWTIME":1536358533382,"USERID":"User_3","GENDER":"OTHER","ROWKEY":"User_3","REGIONID":"Region_9"}
User_4 : {"REGISTERTIME":1509804593170,"ROWTIME":1536358490776,"USERID":"User_4","GENDER":"OTHER","ROWKEY":"User_4","REGIONID":"Region_2"}
User_5 : {"REGISTERTIME":1517022886825,"ROWTIME":1536358526046,"USERID":"User_5","GENDER":"OTHER","ROWKEY":"User_5","REGIONID":"Region_3"}
User_6 : {"REGISTERTIME":1499219356108,"ROWTIME":1536358496854,"USERID":"User_6","GENDER":"OTHER","ROWKEY":"User_6","REGIONID":"Region_3"}
User_7 : {"REGISTERTIME":1513930261290,"ROWTIME":1536358516971,"USERID":"User_7","GENDER":"OTHER","ROWKEY":"User_7","REGIONID":"Region_2"}
User_8 : {"REGISTERTIME":1496124294925,"ROWTIME":1536358505860,"USERID":"User_8","GENDER":"FEMALE","ROWKEY":"User_8","REGIONID":"Region_7"}
User_9 : {"REGISTERTIME":1490786529384,"ROWTIME":1536358545656,"USERID":"User_9","GENDER":"FEMALE","ROWKEY":"User_9","REGIONID":"Region_2"}

4. Examine specific sst file using sst_dump

sst_dump tool can be used to gain insights about a specific SST file.
  • Read first 5 rows in a specific SST file
# ./sst_dump --file=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003090.sst --command=scan --read_num=5
from [] to []
Process /tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003090.sst
Sst file format: block-based
'User_3' seq:2673, type:1 => {"REGISTERTIME":1498381577138,"ROWTIME":1536358918272,"USERID":"User_3","GENDER":"MALE","ROWKEY":"User_3","REGIONID":"Region_2"}
  • Verify the cksum of a specific SST file
# ./sst_dump --file=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003152.sst --command=check --verify_checksum
from [] to []
Process /tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003152.sst
Sst file format: block-based
This command will Iterate over all entries in the SST file but wont print any thing except if it encountered a problem in the SST file. It will also verify the checksum.
  • Print SSH file properties
# ./sst_dump --file=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003188.sst --show_properties
from [] to []
Process /tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003188.sst
Sst file format: block-based
Table Properties:
------------------------------
  # data blocks: 1
  # entries: 1
  # range deletions: 0
  raw key size: 14
  raw average key size: 14.000000
  raw value size: 128
  raw average value size: 128.000000
  data block size: 159
  index block size (user-key? 0, delta-value? 0): 28
  filter block size: 0
  (estimated) table size: 187
  filter policy name: N/A
  prefix extractor name: nullptr
  column family ID: 0
  column family name: default
  comparator name: leveldb.BytewiseComparator
  merge operator name: nullptr
  property collectors names: []
  SST file compression algo: NoCompression
  creation time: 1536359347
  time stamp of earliest key: 0
  # deleted keys: 0
  # merge operands: 0
Raw user collected properties
------------------------------
  # rocksdb.block.based.table.index.type: 0x00000000
  # rocksdb.block.based.table.prefix.filtering: 0x30
  # rocksdb.block.based.table.whole.key.filtering: 0x31
  # rocksdb.deleted.keys: 0x00
  # rocksdb.merge.operands: 0x00
This will tell some useful information like how many entries are inside this SST file, and the average key side,etc.

Reference:

https://github.com/facebook/rocksdb/wiki/Administration-and-Data-Access-Tool

No comments:

Post a Comment

Popular Posts