Goal:
This article is to help understand different modes in kafka-connect using an example.The example will stream data from a mysql table to MapR Event Store for Apache Kafka(aka "MapR Streams") using different modes of kafka-connect -- incrementing, bulk, timestamp and timestamp+incrementing .
Env:
MapR 6.1 (secured)mapr-kafka-1.1.1
mapr-kafka-connect-jdbc-4.1.0
Solution:
Please read documentation https://mapr.com/docs/61/Kafka/kafkaConnect.html to understand the architecture of mapr-kafka-connect firstly.Use case:
In Hive Metastore backend database -- MySQL, there is a table named "TBLS" which tracks the Hive table information.I choose "TBLS" as data source because "TBLS" has a strictly incrementing column named "TBLS_ID" and also a timestamp related column named "CREATE_TIME"(int(11) data type).
We plan to keep monitoring this table and stream the data into a MapR Streams named "/tmp/hivemeta".
Standalone mode of kafka-connect is used to demonstrate this use case easily.
To monitor what query is running on the source -- MySQL, we will enable MySQL general log.
To monitor what data is written on the target -- MapR Streams, we will use Drill to query MapR Streams using kafka storage plugin.
1. Put the MySQL JDBC driver to kafka-connect JDBC path.
cp /opt/mapr/hadoop/hadoop-2.7.0/share/hadoop/common/lib/mysql-connector-java-5.1.25.jar /opt/mapr/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0/share/java/kafka-connect-jdbc/
2. Recreate a MapR Streams named "/tmp/hivemeta" each time before each test.
maprcli stream delete -path /tmp/hivemeta maprcli stream create -path /tmp/hivemeta
3. mode=incrementing
incrementing: use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.
Sample Connector file "test.conf":
name=mysql-whitelist-incre-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive table.whitelist=TBLS mode=incrementing incrementing.column.name=TBL_ID topic.prefix=/tmp/hivemeta:tblsStart the standalone mode for this Connector:
/opt/mapr/kafka/kafka-1.1.1/bin/connect-standalone.sh /opt/mapr/kafka/kafka-1.1.1/config/connect-standalone.properties ~/test.confSource side -- MySQL receives below queries:
2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > -1 ORDER BY `TBL_ID` ASC 191127 14:50:49 2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASC 191127 14:50:54 2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASC 191127 14:50:59 2534 Query SELECT * FROM `TBLS` WHERE `TBL_ID` > 77 ORDER BY `TBL_ID` ASCThe logic is the first query will do full table scan with where condition "WHERE `TBL_ID` > -1" + order-by.
Then for new incoming data, keep scanning based on "WHERE `TBL_ID` > 77" which is the current commit offset.
Target side -- MapR Streams receives the data up to that offset.
> select t.payload.TBL_ID, t.payload.TBL_NAME from kafka.`/tmp/hivemeta:tblsTBLS` as t; +---------+-----------------------+ | EXPR$0 | EXPR$1 | ... | 77 | t1 | +---------+-----------------------+
4. mode=bulk
bulk: perform a bulk load of the entire table each time it is polled.Sample Connector file "test_bulk.conf":
name=mysql-whitelist-bulk-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive table.whitelist=TBLS mode=bulk topic.prefix=/tmp/hivemeta:tblsSource side -- MySQL receives below queries:
191127 14:57:54 2537 Query SELECT * FROM `TBLS` 191127 14:57:59 2537 Query SELECT * FROM `TBLS`The logic is each time it is doing a full table scan.
It is like taking a snapshot of the whole data source periodically.
If the source table is huge, it will take lots of resource(CPU/Memory/Disk/Network) on both source and target sides.
5. mode=timestamp
timestamp: use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.Sample Connector file "test_timestamp.conf":
name=mysql-whitelist-timestamp-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive #table.whitelist=TBLS mode=timestamp query=SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t timestamp.column.name=custom_timestamp topic.prefix=/tmp/hivemeta:tblsIn this example, "CREATE_TIME" column is not "timestamp" data type in MySQL.
The data for "CREATE_TIME" is actually unix timestamp in "int" data type in MySQL.
To workaround it so that we can use "mode=timestamp", I use "query" instead of "table.whitelist" to use a MySQL function "FROM_UNIXTIME" to convert that column to a "timestamp" data type with a new column name "custom_timestamp".
Then set timestamp.column.name=custom_timestamp instead.
Source side -- MySQL receives below queries:
191127 15:34:30 2551 Query select CURRENT_TIMESTAMP 2551 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` > '2019-11-27 15:34:24' AND `custom_timestamp` < '2019-11-27 15:34:30' ORDER BY `custom_timestamp` ASC 191127 15:34:35 2551 Query select CURRENT_TIMESTAMP 2551 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` > '2019-11-27 15:34:24' AND `custom_timestamp` < '2019-11-27 15:34:35' ORDER BY `custom_timestamp` ASCThe logic is it will firstly check current timestamp and use it as the upper boundary for where condition. The lower boundary is the commited offset.
6. mode=timestamp+incrementing
timestamp+incrementing: use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.Sample Connector file "test_incre+timestamp.conf":
name=mysql-whitelist-timestampincre-source-tbls connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://v4.poc.com:3306/hive?user=hive&password=hive #table.whitelist=TBLS mode=timestamp+incrementing query=SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t timestamp.column.name=custom_timestamp incrementing.column.name=TBL_ID topic.prefix=/tmp/hivemeta:tblsSource side -- MySQL receives below queries:
191127 15:38:02 2554 Query select CURRENT_TIMESTAMP 2554 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` < '2019-11-27 15:38:02' AND ((`custom_timestamp` = '2019-11-27 15:34:24' AND `TBL_ID` > 81) OR `custom_timestamp` > '2019-11-27 15:34:24') ORDER BY `custom_timestamp`,`TBL_ID` ASC 191127 15:38:07 2554 Query select CURRENT_TIMESTAMP 2554 Query SELECT t.* from (SELECT *,FROM_UNIXTIME(CREATE_TIME) as custom_timestamp FROM TBLS) t WHERE `custom_timestamp` < '2019-11-27 15:38:07' AND ((`custom_timestamp` = '2019-11-27 15:37:57' AND `TBL_ID` > 82) OR `custom_timestamp` > '2019-11-27 15:37:57') ORDER BY `custom_timestamp`,`TBL_ID` ASCThe logic is a little more complex now:
timestamp is earlier than current timestamp
AND
( timestamp is the same but incrementing is larger
OR
timestamp is newer
)
what do you mean by timestamp is newer
ReplyDelete