Goal:
This article shares some sample Spark Streaming scala code for different sources -- socket text, text files in MapR-FS directory, kafka broker and MapR Event Store for Apache Kafka(MapR Streams).These are wordcount code which can be run directly from spark-shell.
Env:
MapR 6.1mapr-spark-2.3.2.0
mapr-kafka-1.1.1
mapr-kafka-ksql-4.1.1
Solution:
1. socket text
Data source:Open a socket on port 9999 and type some words as the data source.
nc -lk 9999Sample Code:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
2. text files in MapR-FS directory
Data source:Create a directory on MapR-FS and put text files inside as the data source.
hadoop fs -mkdir /tmp/textfile hadoop fs -put /opt/mapr/NOTICE.txt /tmp/textfile/Sample Code:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.textFileStream("/tmp/textfile")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
3. kafka broker
Data source:Assuming an existing kafka server is started:
./bin/kafka-server-start.sh ./config/server.propertiesCreate a new topic named "mytopic":
./bin/kafka-topics.sh --create --zookeeper localhost:5181 --replication-factor 1 --partitions 1 --topic mytopicStart a kafka console producer and type some words as data source:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopicOR use below producer:
./kafka-producer-perf-test.sh --topic mytopic --num-records 1000000 --record-size 1000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092Sample Code:
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{
ConsumerStrategies,
KafkaUtils,
LocationStrategies
}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
val topicsSet = Array("mytopic")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
consumerStrategy)
val lines = messages.map(_.value())
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
4. MapR Event Store for Apache Kafka(MapR Streams)
Data source:Create a sample MapR Streams named /sample-stream
maprcli stream create -path /sample-stream -produceperm p -consumeperm p -topicperm p/opt/mapr/ksql/ksql-4.1.1/bin/ksql-datagen quickstart=pageviews format=delimited topic=/sample-stream:pageviews maxInterval=10000OR use below producer:
./kafka-producer-perf-test.sh --topic /sample-stream:pageviews --num-records 1000000 --record-size 10000 \ --throughput 10000 --producer-props bootstrap.servers=localhost:9092
Sample code:
import org.apache.kafka.clients.consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{
ConsumerStrategies,
KafkaUtils,
LocationStrategies
}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaParams = Map[String, Object](
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.GROUP_ID_CONFIG -> "mysparkgroup",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
val topicsSet = Array("/sample-stream:pageviews")
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
consumerStrategy)
val lines = messages.map(_.value())
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
No comments:
Post a Comment