Goal:
This article shares some sample Spark Streaming scala code for different sources -- socket text, text files in MapR-FS directory, kafka broker and MapR Event Store for Apache Kafka(MapR Streams).These are wordcount code which can be run directly from spark-shell.
Env:
MapR 6.1mapr-spark-2.3.2.0
mapr-kafka-1.1.1
mapr-kafka-ksql-4.1.1
Solution:
1. socket text
Data source:Open a socket on port 9999 and type some words as the data source.
nc -lk 9999Sample Code:
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.log4j.{Level, Logger} Logger.getRootLogger.setLevel(Level.WARN) val ssc = new StreamingContext(sc, Seconds(10)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start()
2. text files in MapR-FS directory
Data source:Create a directory on MapR-FS and put text files inside as the data source.
hadoop fs -mkdir /tmp/textfile hadoop fs -put /opt/mapr/NOTICE.txt /tmp/textfile/Sample Code:
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import org.apache.log4j.{Level, Logger} Logger.getRootLogger.setLevel(Level.WARN) val ssc = new StreamingContext(sc, Seconds(10)) val lines = ssc.textFileStream("/tmp/textfile") val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start()
3. kafka broker
Data source:Assuming an existing kafka server is started:
./bin/kafka-server-start.sh ./config/server.propertiesCreate a new topic named "mytopic":
./bin/kafka-topics.sh --create --zookeeper localhost:5181 --replication-factor 1 --partitions 1 --topic mytopicStart a kafka console producer and type some words as data source:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopicOR use below producer:
./kafka-producer-perf-test.sh --topic mytopic --num-records 1000000 --record-size 1000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092Sample Code:
import org.apache.kafka.clients.consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{ ConsumerStrategies, KafkaUtils, LocationStrategies } import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.{Seconds, StreamingContext} val ssc = new StreamingContext(sc, Seconds(10)) val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) val topicsSet = Array("mytopic") val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, consumerStrategy) val lines = messages.map(_.value()) val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start()
4. MapR Event Store for Apache Kafka(MapR Streams)
Data source:Create a sample MapR Streams named /sample-stream
maprcli stream create -path
/sample-stream
-produceperm p -consumeperm p -topicperm p
/opt/mapr/ksql/ksql-4.1.1/bin/ksql-datagen quickstart=pageviews format=delimited topic=/sample-stream:pageviews maxInterval=10000OR use below producer:
./kafka-producer-perf-test.sh --topic /sample-stream:pageviews --num-records 1000000 --record-size 10000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092
Sample code:
import org.apache.kafka.clients.consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{ ConsumerStrategies, KafkaUtils, LocationStrategies } import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.{Seconds, StreamingContext} val ssc = new StreamingContext(sc, Seconds(10)) val kafkaParams = Map[String, Object]( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) val topicsSet = Array("/sample-stream:pageviews") val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, consumerStrategy) val lines = messages.map(_.value()) val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start()
No comments:
Post a Comment