博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
9.Spark Streaming
阅读量:5096 次
发布时间:2019-06-13

本文共 4126 字,大约阅读时间需要 13 分钟。

Spark Streaming


1

2
3
4
5
6
7
8
9

原文链接:

1240

Spark Streaming使用Spark的简单编程模型提供了可扩展,容错,高效的处理流数据的方式。它将流数据转换为“微”批次,这使得Spark的批处理编程模型能够应用于Streaming用例。这种统一的编程模型使得批量和交互式数据处理与流媒体的结合变得容易。图10显示了Spark Streaming如何用于分析来自多个数据源的数据源。
1240

Spark Streaming中的核心抽象是离散流(DStream)。DStream是一系列RDD。每个RDD包含以可配置的时间间隔接收的数据。图12显示了Spark Streaming如何通过将输入数据转换为RDD序列来创建DStream。每个RDD包含由间隔长度定义的每2秒接收的流数据。这可能只有1/2秒,因此处理时间的延迟可能低于1秒。

Spark Streaming还提供复杂的窗口操作符,可帮助您在滚动窗口中对RDD集合进行高效计算。DStream公开了一个API,其中包含应用于组件RDD的运算符(转换和输出运算符)。我们来尝试使用Spark Streaming下载中给出的一个简单示例来理解这一点。假设你想在Twitter流中找到热门哈希标签。请参阅以下示例来查找完整的代码段:

spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scalaval sparkConf = new SparkConf().setAppName(“TwitterPopularTags”)val ssc = new StreamingContext(sparkConf, Seconds(2))val stream = TwitterUtils.createStream(ssc, None, filters)

上面的代码段正在设置Spark Streaming Context。Spark Streaming将在DStream中创建一个包含每两秒检索的Tweets的RDD。

val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))

上述代码片段将tweet转换为单词序列,然后只对以#开头的那些进行过滤。

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}.

上面的代码片段显示了如何计算在60秒的窗口中提到的主题标签的次数的滚动聚合。

topCounts60.foreachRDD(rdd => {val topList = rdd.take(10)println(“\nPopular topics in last 60 seconds (%stotal):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%stweets)”.format(tag, count))} })

上面的代码片段显示了如何提取十大热门推文,然后打印出来

ssc.start()

上述代码段指示Spark Streaming Context开始检索Tweets。我们来看几个流行的操作。假设我们正在从套接字读取流文本

al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)

map(func)

目的:通过将此函数应用于DStream中的高分辨率RDDS来创建新的lines.map(x=>x.tolnt*10).print()
prompt> nc -lk 9999
12
34
输出:
120
340

flatMap(func)

目的:与map相同,但映射函数可以输出0个以上的项目

lines.flatMap(_.split(“ “)).print()

prompt> nc -lk

9999
Spark很有趣
输出:
Spark很有趣

count()

目的:创建包含数据元素数的RDD的DStream

lines.flatMap(_.split(“ “)).print()

prompt> NC -lk

9999
你好
火花
输出:
4

reduce(func)

目的:与count相同但不是count,通过应用该函数导出该值

lines.map(x=>x.toInt).reduce(_+_).print()

prompt> nc -lk

9999
1
3
5
7
输出:
16

countByValue()

目的:这与map相同,但映射函数可以输出0个或更多个项目

lines.map(x=>x.toInt).reduce(_+_).print()

prompt> nc -lk 9999

9999
火花
火花
有趣的
乐趣
输出:
(is,1)
(spark,2)
(fun,2)

reduceByKey(FUNC,[numTasks])

lines.map(x=>x.toInt).reduce(_+_).print()

prompt> nc -lk 9999

9999
火花
火花
有趣的
乐趣
输出:
(is,1)
(spark,2)
(fun,2)

reduceByKey(FUNC,[numTasks])

val words = lines.flatMap(_.split(“ “))val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_)wordCounts.print()

prompt>nc –lk 9999

spark is fun
fun
fun
输出:
(is,1)
(spark,1)
(fun,3)

以下示例显示了Apache Spark如何将Spark批处理与Spark Streaming相结合。这是一个一体化技术堆栈的强大功能。在此示例中,我们读取了一个包含品牌名称的文件,并对包含文件中任何品牌名称的流数据集进行过滤。

transform(func)

目的:通过
DStream中的所有RDD应用RDD-> RDD转换来创建新的DStream。

brandNames.txt coke nike sprite reebokval sparkConf = new SparkConf(). setAppName(“NetworkWordCount”)val sc = new SparkContext(sparkConf)val ssc = new StreamingContext(sc,Seconds(10))val lines = ssc。socketTextStream(“localhost”9999,StorageLevel.MEMORY_AND_DISK_SER)val brands = sc.textFile(“/ Users / akuntamukkala / temp / brandNames.txt”)lines.transform(rdd => { rdd.intersection(brands)})print )

prompt> nc -lk 9999

msft
apple
nike
sprite
ibm
输出:
sprite
nike

updateStateByKey(func)

目的:创建一个新的DStream,通过应用给定的函数来更新每个键的值。 请参阅Spark Streaming中的StatefulNetworkCount示例。

这有助于计算一个单词发生的总次数的运行聚合

公共窗口操作

window(windowLength,slideInterval)

目的:返回从源DStream的窗口批量计算的新DStream

val win = lines.window(Seconds(30),Seconds(10)); win.foreachRDD(rdd => { rdd.foreach(x => println(x +“”))})

prompt> nc -lk 9999

10(第0秒)
20(10秒后)
30(20秒后)
40(30秒后)
输出:
10
10 20
20 10 30
20 30 40 (drops 10)

countByWindow(windowLength,slideInterval)

目的:返回一个新的滑动窗口中的元素数

lines.countByWindow(Seconds(30),Seconds(10)).print()

prompt> nc -lk 9999

10(第0秒)
20(10秒后)
30(20秒后)
40(30秒后)
输出:
1
2
3
3

有关其他转换运算符,请参考:http : //spark.apache.org/docs/latest/streaming-programming-guide.html#transformations

Spark Streaming具有强大的输出运算符。我们已经在上面的例子中看到foreachRDD()。其他请参考:http : //spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations


公众号:it全能程序猿

1240


转载于:https://www.cnblogs.com/xinxiucan/p/7252145.html

你可能感兴趣的文章
django之多表查询-2
查看>>
快速幂
查看>>
改善C#公共程序类库质量的10种方法
查看>>
AIO 开始不定时的抛异常: java.io.IOException: 指定的网络名不再可用
查看>>
MyBaits动态sql语句
查看>>
HDU4405(期望DP)
查看>>
拉格朗日乘子法 那些年学过的高数
查看>>
vs code 的便捷使用
查看>>
Spring MVC @ResponseBody返回中文字符串乱码问题
查看>>
用户空间与内核空间,进程上下文与中断上下文[总结]
查看>>
JS 中的跨域请求
查看>>
JAVA开发环境搭建
查看>>
vim插件ctags的安装和使用
查看>>
mysql基础语句
查看>>
Oracle中的rownum不能使用大于>的问题
查看>>
[Data Structure & Algorithm] 有向无环图的拓扑排序及关键路径
查看>>
git 常用命令
查看>>
cassandra vs mongo (1)存储引擎
查看>>
Visual Studio基于CMake配置opencv1.0.0、opencv2.2
查看>>
Vue音乐项目笔记(三)
查看>>