Goal:
KSQL is a streaming SQL engine that enables stream processing against Apache Kafka and MapR Streams(aka MapR-ES). KSQL interacts directly with the Kafka Streams API, removing the requirement of building a Java app.This article provides an example to quickly help you hands on ksql on MapR Streams by referring to this quick start.
Env:
MapR 6.0.1Solution:
Note: ksql is still in developer preview in MapR 6.0.1.1. Download the ksql tarball.
wget http://package.mapr.com/labs/ksql/package-ksql-4.1.0-SNAPSHOT-package.tgz tar -xzvf package-ksql-4.1.0-SNAPSHOT-package.tgz cd ksql
2. Create a MapR intenal Stream with public access
This is because security is not supported for this Developer Preview on MapR 6.0.1.maprcli stream create -path /var/mapr/kafka-internal-stream -produceperm p -consumeperm p -topicperm p
- It is used for internal topic creation and cli-side partition assignment.
- Internal stream should be created before starting KSQL server.
[root@v2 data]# maprcli stream topic list -path /var/mapr/kafka-internal-stream partitions maxlag logicalsize topic consumers physicalsize 1 0 0 __mapr__ksql_transient_4130383735097621206_1536269653676_assignment 0 0 1 0 0 ksql__commands 0 0 1 0 0 __mapr__ksql_transient_7811816849950865146_1536269574284_assignment 0 0
3. Modify the configuration for ksql
vim ./config/ksqlserver.propertiesSample is:
bootstrap.servers=localhost:9092 ksql.command.topic.suffix=commands listeners=http://localhost:8080 application.id=myksql
4. Create a sample MapR stream to work with for below use case.
maprcli stream create -path /sample-stream -produceperm p -consumeperm p -topicperm p
5. Generate messages using ksql-datagen
ksql provides a very handy tool named "ksql-datagen" to help avoid creating a test producer program.Refer to this link.
Here we are generating sample messages into 2 topics named "pageviews" and "users".
The schema is shown as below:
- "pageviews":
./bin/ksql-datagen quickstart=pageviews format=delimited topic=/sample-stream:pageviews maxInterval=10000sample data is like:
1 --> ([ 1536339899430 | 'User_8' | 'Page_12' ]) 11 --> ([ 1536339904639 | 'User_9' | 'Page_11' ]) 21 --> ([ 1536339906898 | 'User_1' | 'Page_25' ]) ...
- "users":
./bin/ksql-datagen quickstart=users format=json topic=/sample-stream:users maxInterval=10000sample data is like:
User_6 --> ([ 1488285478109 | 'User_6' | 'Region_1' | 'FEMALE' ]) User_7 --> ([ 1509065238387 | 'User_7' | 'Region_5' | 'OTHER' ]) User_2 --> ([ 1491876978482 | 'User_2' | 'Region_1' | 'OTHER' ]) ...Then use below 2 commands to check the topic information inside the MapR Stream:
maprcli stream topic info -path /sample-stream -topic pageviews -json maprcli stream topic info -path /sample-stream -topic users -json
6. Start ksql in standalone mode
./bin/ksql-cli local --properties-file ./config/ksqlserver.propertiesAfter that, the ksql java process is running:
[root@v2 config]# ps -ef|grep -i ksql root 6768 13847 66 12:08 pts/1 00:00:07 java -cp /root/ksql/share/java/ksql/*: -Xmx3g -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dksql.log.dir=/tmp/ksql-logs -Dlog4j.configuration=file:/root/ksql/config/log4j-file.properties io.confluent.ksql.Ksql local --properties-file ./config/ksqlserver.propertiesShow the current properties:
ksql> SHOW PROPERTIES; Property | Value ------------------------------------------------------------------------------------------------------------------ streams.default.stream | ksql.transient.prefix | transient_ commit.interval.ms | 2000 ksql.sink.replicas | 1 listeners | http://localhost:9098 default.deserialization.exception.handler | io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler bootstrap.servers | localhost:9099 ksql.schema.registry.url | http://localhost:8081 ksql.sink.partitions | 4 ksql.statestore.suffix | _ksql_statestore ksql.service.id | ksql_ cache.max.bytes.buffering | 10000000 ksql.default.stream | ksql.sink.window.change.log.additional.retention | 1000000 auto.offset.reset | latest num.stream.threads | 4 ksql.persistent.prefix | query_ application.id | ksql_ ------------------------------------------------------------------------------------------------------------------
7. Create stream for topic "pageviews" and create table for topic "users".
Here the "stream" and "table" are concept of kafka.- A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. Like an "INSERT".
- A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). Like an "UPDATE".
Now here are the exact commands in ksql:
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='/sample-stream:pageviews', value_format='DELIMITED'); Message ---------------- Stream created ---------------- ksql> DESCRIBE pageviews_original; Field | Type -------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) VIEWTIME | BIGINT USERID | VARCHAR(STRING) PAGEID | VARCHAR(STRING) -------------------------------------- For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>; ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='/sample-stream:users', value_format='JSON', key = 'userid'); Message --------------- Table created --------------- ksql> DESCRIBE users_original; Field | Type ------------------------------------------ ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) REGISTERTIME | BIGINT GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) USERID | VARCHAR(STRING) ------------------------------------------ For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
8. Running a query reading above stream or table.
By default KSQL reads the topics for streams and tables from the latest offset.ksql> select * from pageviews_original ; 1536342236738 | 4871 | 1536342235234 | User_8 | Page_12 1536342243408 | 4881 | 1536342241904 | User_4 | Page_18 1536342252370 | 4891 | 1536342250867 | User_4 | Page_13 ...The default location for local state store (RocksDB) is /tmp/kafka-streams, and for each KStreams application subdirectory /tmp/kafka-streams/<application.id> is created.
In the meantime, we can check the application id for the current running queries:
[root@v2 ~]# ls -altr /tmp/kafka-streams/ drwxr-xr-x 3 root root 17 Sep 7 10:54 ksql_transient_1376501806109265799_1536342862444Then using the application.id we can check the "jstack <ksql pid>" output to find out the related threads for this query:
[root@v2 ~]# jstack 6768 |grep ksql_transient_1376501806109265799_1536342862444 "ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-CleanupThread" #197 daemon prio=5 os_prio=0 tid=0x00007f9298076800 nid=0x1ed4 waiting on condition [0x00007f927d0e3000] "ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-8" #196 prio=5 os_prio=0 tid=0x00007f9298075000 nid=0x1ed3 runnable [0x00007f927d1e4000] "ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-7" #195 prio=5 os_prio=0 tid=0x00007f9298074000 nid=0x1ed2 runnable [0x00007f927d3e6000] "ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-6" #194 prio=5 os_prio=0 tid=0x00007f9298073000 nid=0x1ed1 runnable [0x00007f927d4e7000] "ksql_transient_1376501806109265799_1536342862444-76a58c09-c3a3-4c13-adb7-ab00b7f7ccf7-StreamThread-5" #193 prio=5 os_prio=0 tid=0x00007f92982e9800 nid=0x1ed0 runnable [0x00007f927dced000]
9. Create a persistent query to store the data into a MapR Stream topic
Unlike the non-persistent query above, results from this query are written to a MapR Stream topic "pageviews_female".CREATE STREAM pageviews_female WITH (kafka_topic='/sample-stream:pageviews_female', value_format='json') AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';After the stream is created and returned, actually the thread is running in the background:
[root@v2 ~]# jstack 6768 |grep ksql_query_CSAS_PAGEVIEWS_FEMALE "ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-CleanupThread" #261 daemon prio=5 os_prio=0 tid=0x00007f92e0039000 nid=0x27aa waiting on condition [0x00007f927cee1000] "ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-16" #260 prio=5 os_prio=0 tid=0x00007f92e0037800 nid=0x27a9 runnable [0x00007f927cfe2000] "ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-15" #259 prio=5 os_prio=0 tid=0x00007f92e0037000 nid=0x27a8 runnable [0x00007f927d0e3000] "ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-14" #258 prio=5 os_prio=0 tid=0x00007f92e0036000 nid=0x27a7 runnable [0x00007f927d1e4000] "ksql_query_CSAS_PAGEVIEWS_FEMALE-af068d1f-9ff4-4acb-abe7-e4ff37a5c91c-StreamThread-13" #257 prio=5 os_prio=0 tid=0x00007f92e002d000 nid=0x27a6 runnable [0x00007f927dced000]And the MapR Stream topic is created:
[root@v2 ~]# maprcli stream topic info -path /sample-stream -topic pageviews_female -json { "timestamp":1536344466684, "timeofday":"2018-09-07 11:21:06.684 GMT-0700 AM", "status":"OK", "total":4, "data":[ { "partitionid":0, "physicalsize":0, "logicalsize":0, "maxoffset":10, "minoffsetacrossconsumers":0, "mintimestamp":"2018-09-07T11:16:20.523-0700 AM", "maxtimestamp":"2018-09-07T11:21:00.328-0700 AM", "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM", "fid":"2078.69.262670", "master":"v1.poc.com:5660", "servers":"v2.poc.com:5660, v1.poc.com:5660, v4.poc.com:5660", "timestamptype":"CreateTime" }, { "partitionid":1, "physicalsize":212992, "logicalsize":90112, "maxoffset":4, "minoffsetacrossconsumers":0, "mintimestamp":"2018-09-07T11:19:19.452-0700 AM", "maxtimestamp":"2018-09-07T11:20:57.326-0700 AM", "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM", "fid":"2092.32.262366", "master":"v2.poc.com:5660", "servers":"v2.poc.com:5660, v4.poc.com:5660, v3.poc.com:5660", "timestamptype":"CreateTime" }, { "partitionid":2, "physicalsize":0, "logicalsize":0, "maxoffset":3, "minoffsetacrossconsumers":0, "mintimestamp":"2018-09-07T11:18:47.229-0700 AM", "maxtimestamp":"2018-09-07T11:20:52.785-0700 AM", "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM", "fid":"2182.96.262582", "master":"v4.poc.com:5660", "servers":"v1.poc.com:5660, v4.poc.com:5660, v3.poc.com:5660", "timestamptype":"CreateTime" }, { "partitionid":3, "physicalsize":0, "logicalsize":0, "maxoffset":6, "minoffsetacrossconsumers":0, "mintimestamp":"2018-09-07T11:15:48.273-0700 AM", "maxtimestamp":"2018-09-07T11:20:13.588-0700 AM", "mintimestampacrossconsumers":"1969-12-31T04:00:00.000-0800 PM", "fid":"2080.41.262504", "master":"v1.poc.com:5660", "servers":"v2.poc.com:5660, v1.poc.com:5660, v3.poc.com:5660", "timestamptype":"CreateTime" } ] }Since here the result topic "pageviews_female" is created in json format, so basically you can also use Drill to query it as well. Refer to this link.
No comments:
Post a Comment