Goal:
By default, Kafka Streams and ksql use RocksDB as the internal state store.This feature is used for:
- an internally created and compacted changelog topic (for fault-tolerance)
- one (or multiple) RocksDB instances (for cached key-value lookups).
More design details can be found in Kafka Streams Internal Data Management.
This article explains step by step on how to use tools "ldb" and "sst_dump" to dig deeper into the RocksDB state stores.
Env:
MapR 6.0.1CentOS 7.4
Solution:
1. Build and install ldb and sst_dump
yum groupinstall 'Development Tools' git clone git@github.com:facebook/rocksdb.git cd rocksdb make ldb sst_dump
2. Running a ksql query
Refer to How to use ksql on MapR Streams(MapR-ES).The running query ID which we test here is named "CSAS_PAGEVIEWS_FEMALE":
ksql> show queries; Query ID | Kafka Topic | Query String ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CSAS_PAGEVIEWS_FEMALE | /sample-stream:pageviews_female | CREATE STREAM pageviews_female WITH (kafka_topic='/sample-stream:pageviews_female', value_format='json') AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- For detailed information on a Query run: EXPLAIN <Query ID>;And one of the RocksDB state stores for this query is located here:
/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003
3. Examine the state store using ldb
- Scan all the key value pairs from this state store
# ./ldb --db=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003 scan User_1 : {"REGISTERTIME":1511164836164,"ROWTIME":1536354992072,"USERID":"User_1","GENDER":"FEMALE","ROWKEY":"User_1","REGIONID":"Region_1"} User_2 : {"REGISTERTIME":1513176010609,"ROWTIME":1536355004549,"USERID":"User_2","GENDER":"FEMALE","ROWKEY":"User_2","REGIONID":"Region_9"} User_3 : {"REGISTERTIME":1505260753939,"ROWTIME":1536355010552,"USERID":"User_3","GENDER":"MALE","ROWKEY":"User_3","REGIONID":"Region_6"} User_4 : {"REGISTERTIME":1518311990103,"ROWTIME":1536355016057,"USERID":"User_4","GENDER":"FEMALE","ROWKEY":"User_4","REGIONID":"Region_4"} User_5 : {"REGISTERTIME":1500613777028,"ROWTIME":1536354977350,"USERID":"User_5","GENDER":"FEMALE","ROWKEY":"User_5","REGIONID":"Region_2"} User_6 : {"REGISTERTIME":1510227650166,"ROWTIME":1536355024298,"USERID":"User_6","GENDER":"MALE","ROWKEY":"User_6","REGIONID":"Region_7"} User_7 : {"REGISTERTIME":1505244299235,"ROWTIME":1536355016057,"USERID":"User_7","GENDER":"MALE","ROWKEY":"User_7","REGIONID":"Region_3"} User_8 : {"REGISTERTIME":1515774224403,"ROWTIME":1536355010552,"USERID":"User_8","GENDER":"FEMALE","ROWKEY":"User_8","REGIONID":"Region_1"} User_9 : {"REGISTERTIME":1495112452553,"ROWTIME":1536355024298,"USERID":"User_9","GENDER":"OTHER","ROWKEY":"User_9","REGIONID":"Region_7"}Since the column "userid" is the key for table "users_original" when we create it, above records the current latest user data for each "userid".
To double confirm, this output matches the Drill on MapR Streams output as well:
0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select * from ksql.`/sample-stream:users` where USERID='User_1' order by kafkaMsgTimestamp desc limit 1; +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ | registertime | gender | regionid | userid | kafkaTopic | kafkaPartitionId | kafkaMsgOffset | kafkaMsgTimestamp | kafkaMsgKey | +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ | 1511164836164 | FEMALE | Region_1 | User_1 | /sample-stream:users | 0 | 2973 | 1536354992072 | [B@14204fe2 | +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ 1 row selected (0.387 seconds) 0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select * from ksql.`/sample-stream:users` where USERID='User_2' order by kafkaMsgTimestamp desc limit 1; +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ | registertime | gender | regionid | userid | kafkaTopic | kafkaPartitionId | kafkaMsgOffset | kafkaMsgTimestamp | kafkaMsgKey | +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ | 1513176010609 | FEMALE | Region_9 | User_2 | /sample-stream:users | 0 | 2975 | 1536355004549 | [B@12adfaf0 | +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ 1 row selected (0.436 seconds) 0: jdbc:drill:zk=v1.poc.com:5181,v2.poc.com:5> select * from ksql.`/sample-stream:users` where USERID='User_3' order by kafkaMsgTimestamp desc limit 1; +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ | registertime | gender | regionid | userid | kafkaTopic | kafkaPartitionId | kafkaMsgOffset | kafkaMsgTimestamp | kafkaMsgKey | +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ | 1505260753939 | MALE | Region_6 | User_3 | /sample-stream:users | 0 | 2979 | 1536355010552 | [B@10fcb3c6 | +----------------+---------+-----------+---------+-----------------------+-------------------+-----------------+--------------------+--------------+ 1 row selected (0.95 seconds)
- Fetch value based on key
# ./ldb --db=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003 get User_1 {"REGISTERTIME":1508781371805,"ROWTIME":1536355797368,"USERID":"User_1","GENDER":"MALE","ROWKEY":"User_1","REGIONID":"Region_9"}
- Dump the whole database into a HEX file
# ./ldb --db=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003 dump --hex > /tmp/dbdump
- Restore the dump HEX file into a database
# cat /tmp/dbdump | ./ldb --db=/tmp/test_db_new load --hex --block_size=65536 --create_if_missing --disable_walAfter that, this new database can show us the current key value info for the state store:
# ./ldb --db=/tmp/test_db_new scan User_1 : {"REGISTERTIME":1506734440801,"ROWTIME":1536358470693,"USERID":"User_1","GENDER":"MALE","ROWKEY":"User_1","REGIONID":"Region_1"} User_2 : {"REGISTERTIME":1489206390421,"ROWTIME":1536358479390,"USERID":"User_2","GENDER":"OTHER","ROWKEY":"User_2","REGIONID":"Region_9"} User_3 : {"REGISTERTIME":1506433604346,"ROWTIME":1536358533382,"USERID":"User_3","GENDER":"OTHER","ROWKEY":"User_3","REGIONID":"Region_9"} User_4 : {"REGISTERTIME":1509804593170,"ROWTIME":1536358490776,"USERID":"User_4","GENDER":"OTHER","ROWKEY":"User_4","REGIONID":"Region_2"} User_5 : {"REGISTERTIME":1517022886825,"ROWTIME":1536358526046,"USERID":"User_5","GENDER":"OTHER","ROWKEY":"User_5","REGIONID":"Region_3"} User_6 : {"REGISTERTIME":1499219356108,"ROWTIME":1536358496854,"USERID":"User_6","GENDER":"OTHER","ROWKEY":"User_6","REGIONID":"Region_3"} User_7 : {"REGISTERTIME":1513930261290,"ROWTIME":1536358516971,"USERID":"User_7","GENDER":"OTHER","ROWKEY":"User_7","REGIONID":"Region_2"} User_8 : {"REGISTERTIME":1496124294925,"ROWTIME":1536358505860,"USERID":"User_8","GENDER":"FEMALE","ROWKEY":"User_8","REGIONID":"Region_7"} User_9 : {"REGISTERTIME":1490786529384,"ROWTIME":1536358545656,"USERID":"User_9","GENDER":"FEMALE","ROWKEY":"User_9","REGIONID":"Region_2"}
4. Examine specific sst file using sst_dump
sst_dump tool can be used to gain insights about a specific SST file.- Read first 5 rows in a specific SST file
# ./sst_dump --file=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003090.sst --command=scan --read_num=5 from [] to [] Process /tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003090.sst Sst file format: block-based 'User_3' seq:2673, type:1 => {"REGISTERTIME":1498381577138,"ROWTIME":1536358918272,"USERID":"User_3","GENDER":"MALE","ROWKEY":"User_3","REGIONID":"Region_2"}
- Verify the cksum of a specific SST file
# ./sst_dump --file=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003152.sst --command=check --verify_checksum from [] to [] Process /tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003152.sst Sst file format: block-basedThis command will Iterate over all entries in the SST file but wont print any thing except if it encountered a problem in the SST file. It will also verify the checksum.
- Print SSH file properties
# ./sst_dump --file=/tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003188.sst --show_properties from [] to [] Process /tmp/kafka-streams/ksql_query_CSAS_PAGEVIEWS_FEMALE/0_0/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000003/003188.sst Sst file format: block-based Table Properties: ------------------------------ # data blocks: 1 # entries: 1 # range deletions: 0 raw key size: 14 raw average key size: 14.000000 raw value size: 128 raw average value size: 128.000000 data block size: 159 index block size (user-key? 0, delta-value? 0): 28 filter block size: 0 (estimated) table size: 187 filter policy name: N/A prefix extractor name: nullptr column family ID: 0 column family name: default comparator name: leveldb.BytewiseComparator merge operator name: nullptr property collectors names: [] SST file compression algo: NoCompression creation time: 1536359347 time stamp of earliest key: 0 # deleted keys: 0 # merge operands: 0 Raw user collected properties ------------------------------ # rocksdb.block.based.table.index.type: 0x00000000 # rocksdb.block.based.table.prefix.filtering: 0x30 # rocksdb.block.based.table.whole.key.filtering: 0x31 # rocksdb.deleted.keys: 0x00 # rocksdb.merge.operands: 0x00This will tell some useful information like how many entries are inside this SST file, and the average key side,etc.
No comments:
Post a Comment