Learning Spark Lightning-Fast Data Analysis

简介

Spark是一个用来实现快速而通用的集群计算的平台.

Spark Core

实现了Spark的基本功能,包含任务调度,内存管理,错误恢复,与存储系统的交互模块等.还包含了对弹性分布式数据集(RDD)的API定义,RDD表示分布在多个计算节点上可以并行操作的元素集合.

Spark SQL

用来操作结构化数据的程序包.通过Spark SQL,可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据,支持多种数据源,如Hive表,Parquet,Json等.并支持与RDD结合.

Spark Streaming

提供对实时数据进行流式计算的组件,比如网页服务器日志,或者网络服务中用户提交的状态更新组成的消息队列,都是数据流.

MLlib

提供了包含常见的机器学习功能的程序库.包括多种机器学习算法.

GraphX

用来操作图(如社交网络的朋友关系图)的程序库,可以进行并行的图计算.

集群管理器

为了支持在一个或数千个节点之间进行伸缩计算,同时获得最大的灵活性,支持各种集群管理器,包括Hadoop YARN,Apache Mesos,以及自带的一个简易调度器,叫做独立调度器.

开始使用

Spark都是由一个驱动程序来发起集群上的各种并行操作.驱动器包含应用的main函数,并且定义了集群上的分布式数据集,并对这些数据集应用了相关操作.驱动程序通过一个SparkContext对象访问Spark,代表对计算集群的一个连接.开启shell时已经自动创建了一个SparkConntext对象,是名为sc的变量.通过该对象可以创建RDD.

对RDD的操作,程序驱动器一般要管理多个执行器节点,不同的节点对数据中不同的部分进行处理.

创建独立应用时需要自己创建SparkContext,在编写独立应用时,需要在SBT中添加spark-core依赖,然后进行初始化:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster("local").setAppName("MyAPP")   // 设置集群URL(这里是本地)和应用名
val sc = new SparkContext(conf)                         
sc.stop()                                                           // 关闭Sprak

创建一个单词统计应用:

// 创建一个Scala版本的Spark Context 
val conf = new SparkConf().setAppName("wordCount") 
val sc = new SparkContext(conf) 
// 读取我们的输入数据 
val input = sc.textFile(inputFile) 
// 把它切分成一个个单词 
val words = input.flatMap(line => line.split(" ")) 
// 转换为键值对并计数 
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} 
// 将统计出来的单词总数存入一个文本文件,引发求值 
counts.saveAsTextFile(outputFile)

RDD基础

RDD为一个不可变的分布式对象集合.每个RDD被分为多个分区,分别运行在集群的多个节点上,可以包含Scala中的任意类型和用户自定义类型.

通过两种方式创建RDD: 读取一个外部数据集,或在驱动器程序中分发对象集合(如list或set).创建出来的RDD支持转换操作和行动操作.转换操作会由一个RDD生成一个新的RDD,行动操作会对RDD计算一个结果,并把结果返回到驱动器程序中,或存储在外部存储系统中.

两种的操作区别在于计算RDD的方式不同,因为只会对RDD进行惰性计算.只有第一次在一个行动操作中用到时才会进行计算.默认情况下,每次行动操作会对RDD进行重复计算.或者使用RDD.persist()进行缓存,以对某些计算结果进行重用.

独立程序或shell的工作方式如下:

  1. 从外部数据创建输入RDD
  2. 使用转换操作对RDD进行转换,生成新的RDD
  3. 对需要重用的结果使用persist()方法进行缓存
  4. 使用行动操作触发一次并行计算.Spark会对计算进行优化然后执行.

创建RDD:

val lines = sc.parallelize(List("pandas", "i like pandas"))
val lines = sc.textFile("/path/to/README.md")

向Spark传递函数

在Scala中,可以把定义的内联函数,方法的引用,静态方法传递给Spark,就像Scala的其他函数式API一样.但是所传递的函数和引用的数据必须是可序列化的(实现了Java的Serializable接口).传递一个对象的方法或字段时会包含对整个对象的引用.

class SearchFunctions(val query: String) { 
    def isMatch(s: String): Boolean = {
        s.contains(query) 
    } 
    def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
        // 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this" 
        rdd.map(isMatch)
    }
    def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = { 
        // 问题:" query"表示"this.query",因此我们要传递整个"this" 
        rdd.map(x => x.split(query)) 
    }
    def getMatchesNoReference(rdd: RDD[String]): RDD[String] = { 
        // 安全:只把我们需要的字段拿出来放入局部变量中 
        val query_ = this.query 
        rdd.map(x => x.split(query_)) 
    }
}

转化操作与行动操作

基本RDD转化操作

基本RDD行动操作

不同RDD类型间的转换在Scala中表现为隐式转换,由 org.apache.spark.Context._ 提供.

缓存

对一个RDD调用persis()时,可以设置其缓存级别,默认缓存在JVM的堆空间中.

RDD缓存级别

或者可以在级别的末尾加上”_2”来缓存两份.

val result = input.map(x => x * x) 
result.persist(StorageLevel.DISK_ONLY) 
println(result.count()) 
println(result.collect().mkString(","))

Pair RDD

包含键值对类型的RDD称为Pair RDD,创建一个Pair RDD:

val pairs = lines.map(x => (x.split(" ")(0), x))

Pair RDD的操作:

Prir-RDD转化操作

针对两个Prir-RDD转化操作

针对两个Prir-RDD转化操作

Prir-RDD行动操作

数据分区

在分布式程序中,可以通过控制数据分布以获得最少的网络传输,以提升整体性能.Spark可以通过控制RDD分区来减少通信开销.如果RDD只需要被使用一次,则没必要进行分区.只有当数据集多次在诸如join这种基于键的操作中使用时,分区才会有帮助.

自定义分区方式:

val sc = new SparkContext(...) 
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...") 
                 .partitionBy(new HashPartitioner(100)) // 构造100个分区 .persist()

partitionBy()是一个转化操作,它的结果返回一个新的RDD.并且对partitionBy的结果进行缓存.100表示分区个数,这个值应该与集群总CPU核数一致.

可以同过RDD的partitioner属性获取分区信息,这是一个Option对象,通过isDefined判断后使用get()获取分区信息.

能够从分区中获益的操作: cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),lookup().

Spark内部知道个操作会如何影响分区方式,并将会对数据进行分区的操作的结果RDD自动设置为对应的分区器.但是转化操作的结果并不一定会按已知的分区方式分区,这是输出的RDD可能就会没有设置分区器.

会为生成的结果RDD设好分区方式的操作: cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(如果父分区有分区方式),flatMapValues()(如果父分区有分区方式),filter().

可以自定义分区方式,要实现自定义分区器,需要继承 org.apache.spark.Partitioner 类实现下面三个方法:

  1. numPartitions: Int: 返回创建出来的分区数
  2. getPartition(key: Any): Int: 返回指定键的分区编号(0 到 numPartitions-1)
  3. equals(): Java判断相等性的标准方法.用以判断RDD分区方式是否相同

Scala自定义分区方式:

class DomainNamePartitioner(numParts: Int) extends Partitioner { 

    override def numPartitions: Int = numParts 
    override def getPartition(key: Any): Int = { 
        val domain = new Java.net.URL(key.toString).getHost() 
        val code = (domain.hashCode % numPartitions) 
        if(code < 0) {
            code + numPartitions // 使其非负 }
        else{
            code 
        }
    } 
    // 用来让Spark区分分区函数对象的Java equals方法 
    override def equals(other: Any): Boolean = other match { 
        case dnp: DomainNamePartitioner => dnp.numPartitions == numPartitions 
        case _ => false 
    }
}

数据读取与保存

常见的数据源:

  1. 文件格式与文件系统
  2. Spark SQL中的结构化数据源
  3. 数据库与键值存储

文件格式数据源

// 读取文本文件
val input = sc.textFile("file:///home/holden/repos/spark/README.md")

// 读取Json文件
case class Person(name: String, lovesPandas: Boolean) // 必须是顶级类 ... 
// 将其解析为特定的case class。使用flatMap, 通过在遇到问题时返回空列表(None) 
// 来处理错误,而在没有问题时返回包含一个元素的列表(Some(_)) 
val result = input.flatMap(record => { 
    try {
        Some(mapper.readValue(record, classOf[Person])) } 
    catch {
        case e: Exception => None }
    }
)

// 保存Json
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)

// 读取CSV
import Java.io.StringReader 
import au.com.bytecode.opencsv.CSVReader 
... 
val input = sc.textFile(inputFile) 
val result = input.map{ line => 
    val reader = new CSVReader(new StringReader(line))
    reader.readNext()
}

// 读取完整CSV
case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile) 
val result = input.flatMap{ case (_, txt) => 
    val reader = new CSVReader(new StringReader(txt)); 
    reader.readAll().map(x => Person(x(0), x(1))) 
}

// 保存CSV
pandaLovers.map(person => 
List(person.name, person.favoriteAnimal).toArray).mapPartitions{ people => 
    val stringWriter = new StringWriter(); 
    val csvWriter = new CSVWriter(stringWriter); 
    csvWriter.writeAll(people.toList) 
    Iterator(stringWriter.toString) 
}.saveAsTextFile(outFile)

SequenceFile是由没有相关关系结构的键值对文件组成的常用Hadoop格式.Hadoop实现了一套自定义的序列化框架,因此SequenceFile是由实现Hadoop的Writable接口的元素组成.

Hadoop-Writable类型对照

// 读取SequenceFile
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}
// 保存SequenceFile
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2))) 
data.saveAsSequenceFile(outputFile)

累加器

提供了将工作节点中的值聚合到驱动器程序中的简单语法.

用法:

  1. 通过驱动程序中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器,返回 org.apache.spark.Accumulator[T] 对象,T是初始值 initialValue的类型
  2. Spark闭包里的执行器代码可以使用累加器的 += 方法,增加累加器的值
  3. 驱动器程序可以调用累加器的value属性或setValue()访问累加器的值

注意:工作节点不能访问累加器的值. 同时,对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次,因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动操作中.而RDD转化操作中使用的累加器,就不能保证有这种情况了.

广播变量

可以让程序高效的向所有工作节点发送一个较大的只读值,以提供一个或多个Spark操作使用,比如一个较大的只读查询表,或者特征向量.使用的是一种类似BitTorrent的通信机制.

使用过程:

  1. 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建一个 Broadcast[T] 对象,支持任何可序列化的类型
  2. 通过value属性访问该对象的值
  3. 变量只会被发送到个节点一次,应作为只读值处理(修改这个值不会影响到其他节点)

在广播一个较大的对象时可以使用spark.serializer属性选择一个性能很高的序列化库来优化序列化过程.

基于分区进行操作

基于分区对数据进行操作可以让我们避免对每个数据元素进行重复的配置工作.

提供了基于分区的map和foreach,让部分代码只对RDD个每个分区运行一次以降低操作代价.

基于分区的操作

与外部程序间的管道

使用RDD的pipe()方法以支持使用任意语言实现Spark作业中的部分逻辑,主要他能读写Unix标准流就行.

数值RDD操作

数值RDD操作

集群运行

驱动器程序在Spark应用中的两个职责:

  1. 把用户程序转化为任务: 把用户程序转化为多个物理执行单元,称为任务(task).
  2. 为执行器节点调度任务:

在集群上运行Spark应用的详细过程:

  1. 用户通过spark-submit脚本提交应用
  2. spark-submit脚本启动驱动器程序,调用用户的main()方法.
  3. 驱动器程序与集群管理器通信,申请资源以启动执行器节点
  4. 集群管理器为驱动器程序启动执行器节点
  5. 驱动器进程执行用户应用中的操作,根据程序中定义的RDD操作,驱动器节点把工作以人物的形式发送到执行器进程
  6. 任务在执行器进程中进行计算并保存结果
  7. 如果驱动器程序的main()方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器程序,并通过集群管理器释放资源

执行spark-submit时–master可以接收的值:

master可以接收的值

spark-submit常见标识

集群管理器选择

  1. 如果从零开始,可以选择独立集群管理器
  2. 与Hadoop结合使用时选择YARN
  3. Mesos优势在于细粒度共享的选项,可以将命令分配到指定CPU
  4. 尽量将Spark运行与HDFS节点以加速存储访问

Spark SQL

Spark SQL提供了一下三大功能:

  1. 可以从各种结构化数据源中读取数据
  2. 不仅支持在Spark程序内使用SQL语句进行数据检查,也支持从类似商业智能软件Tableau这样的外部工具中,通过标准数据库连接器(JDBC/ODBC)连接Spark进行SQL查询
  3. 当在Spark内部使用Spark SQL时,支持SQL与常规的代码高度整合,包括连接RDD与SQL表,公开的自定义SQL函数接口等.

为了实现以上功能,实现了一种特殊的RDD,称为SchemaRDD,是存放row对象的RDD,每个Row对象代表一行数据,同时还包含了结构信息,即数据字段,可以利用结构信息比普通RDD更加高效的存取数据.支持普通RDD没有的操作,如执行SQL查询,可以从外部数据源创建,也可以从查询结果或普通RDD中创建.

基础的SQLContext只支持Spark SQL的一个子集,要使用完整的Hive支持则需要引入Hive依赖,以支持Hive表访问,UDF和SerDe,以及Hive查询语句.通过使用HiveContext.使用时并不需要先部署Hive.

// 导入Spark SQL 
import org.apache.spark.sql.hive.HiveContext 
// 如果不能使用hive依赖的话 
import org.apache.spark.sql.SQLContext
// 创建Spark SQL的HiveContext
 val hiveCtx = ...
// 导入隐式转换支持 
import hiveCtx._

// 创建SQL上下文环境
val sc = new SparkContext(...) 
val hiveCtx = new HiveContext(sc)

// 读取并查询推文
val input = hiveCtx.jsonFile(inputFile) 
// 注册输入的SchemaRDD 
input.registerTempTable("tweets") 
// 依据retweetCount(转发计数)选出推文 
val topTweets = hiveCtx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")

SchemaRDD支持的数据类型:

SchemaRDD支持的数据类型

SchemaRDD支持的数据类型

最后一种类型,也就是结构体,可以直接被表示为其他的Row对象.所有这些复杂类型都可以进行嵌套.

Row对象表示SchemaRDD中的记录,其本质就是一个定长的字段数组.提供多种getter方法.

获取每一行的第一个字段,即第一列:

val topTweetText = topTweets.map(row => row.getString(0))

使用SchemaRDD专用的方法对数据进行缓存:

hiveCtx.cacheTable("tableName")

支持的数据源包括Hive表,JSON,Parquet.使用SQL时只会扫描使用到的字段而不是整个表.或者通过指定结构信息将常规RDD转换为SchemaRDD.

Hive

Spark SQL支持所有Hive的存储格式(SerDe),包括文本文件,RCFiles,ORC,Avro,Protocol Buffer.

连接已部署的Hive时需要提供相应的配置文件,或者不提供配置时在本地创建Hive元数据仓.

// 从Hive中读取
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc) 
val rows = hiveCtx.sql("SELECT key, value FROM mytable") 
val keys = rows.map(row => row.getInt(0))

基于RDD

带有 case class 的普通RDD可以隐式转换为SchemaRDD.

基于 case class 创建SchemaRDD:

case class HappyPerson(handle: String, favouriteBeverage: String) 
... 
// 创建了一个人的对象,并且把它转成SchemaRDD 
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee"))) 
// 注意:此处发生了隐式转换 
// 该转换等价于
sqlCtx.createSchemaRDD(happyPeopleRDD) 
happyPeopleRDD.registerTempTable("happy_people")

UDF

用户自定义函数,可以使用编程语言注册自定义函数,并在SQL中调用,以提供高级功能支持.

字符串长度UDF:

registerFunction("strLenScala", (_: String).length) 
val tweetLength = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")

Spark Streaming

与RDD概念相似,Spark Streaming使用离散化流作为抽象表示,叫做DStream.是随时间推移而受到的数据序列.在内部,每个时间区间收到的数据都作为RDD存在,而DStream是由这些RDD组成的序列.可以从各种输入源创建,比如Flume,Kafka或HDFS.创建出的DStream同样支持两种操作,转化操作会生成新的DStream,行动操作将数据写入外部系统.与RDD支持的操作类似,同时提供了基于时间的操作,比如滑动窗口.

// 依赖支持
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.dstream.DStream 
import org.apache.spark.streaming.Duration 
import org.apache.spark.streaming.Seconds

// 进行流式筛选,打印包含'error'的行
// 从SparkConf创建StreamingContext并指定1秒钟的批处理大小 
val ssc = new StreamingContext(conf, Seconds(1)) 
// 连接到本地机器7777端口上后,使用收到的数据创建DStream 
val lines = ssc.socketTextStream("localhost", 7777) 
// 从DStream中筛选出包含字符串"error"的行 
val errorLines = lines.filter(_.contains("error")) 
// 打印出有"error"的行 
errorLines.print()

要开始收集数据,必须显式调用StreamingContext的start()方法.然后Spark Streaming就会开始把Spark作业不断交给下面的SparkContext去执行,执行会在另一个线程中进行,所以需要调用awaitTermination等待流计算完成.

// 启动流计算环境StreamingContext并等待它"完成" 
ssc.start() 
// 等待作业完成 
ssc.awaitTermination()

一个Streaming Context只能执行一次,只有在配置好需要的DStream和对应的操作时才能启动.

// 为其提供流数据输入,进行测试
$ spark-submit --class com.oreilly.learningsparkexamples.scala.StreamingLogInput $ASSEMBLY_JAR local[4]
$ nc localhost 7777 # 使你可以键入输入的行来发送给服务器 
<此处是你的输入>

架构与抽象

Spark Streaming使用为批次架构,把流式计算当做一系列连续的小规模批处理来对待.每个批次按时间间隔创建: 在每个区间开始时,一个新的批次被创建,在该区间内收到的数据会被添加到这个批次中,时间区间结束时,批次停止增长.时间区间大小由批次间隔参数决定,一般为500ms到几秒之间.每个输入批次都形成一个RDD,以Spark作业的方式处理并生成其他RDD.处理结果以批处理方式传给外部系统.

SparkStreaming高层架构

Spark Streaming的编程抽象是离散化流,即DStream.是一个RDD序列,每个RDD代表数据流中一个时间片内的数据.

DStream是一个持续的RDD序列

DStream转化关系

Spark Streaming为每个输入源启动对应的接收器,接收器以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为RDD,收集后会复制到另一个执行器进程已保证容错,被缓存在执行器进程内存,与缓存RDD一样.然后Spark Streaming周期性的运行Spark作业来处理这些数据,把数据与之前时间区间中的RDD进行整合.

Streaming执行过程

除了对数据复制保证容错,同时提供检查点机制支持数据恢复,可以把状态阶段性的存储到可靠文件中(HDFS),一般5-10个批次.

转化操作

  1. 无状态转化操作: 每个批次的数据不依赖于之前批次的数据
  2. 有状态转化操作: 需要使用之前批次的数据或中间结果来计算当前批次的数据

无状态转化

无状态转化操作将简单的RDD转化应用到每个批次上,即转化DStream的每个RDD,即 分别 应用到每个RDD.支持大部分普通RDD的转化操作.

DStream无状态转化操作

无状态转化:

// 假设ApacheAccessingLog是用来从Apache日志中解析条目的工具类 
val accessLogDStream = logData.map(line => ApacheAccessLog.parseFromLogLine(line))  // 解析
val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)) 
val ipCountsDStream = ipDStream.reduceByKey((x, y) => x + y)

无状态转化也能在多个DStream之间进行整合,不过仍是在各个时间区间内,比如cogroup(),join(),leftOutJoin().这些方法应用到DStream时,会对每个批次分别执行对应的RDD操作.

连接两个DStream(在一个时间区间内):

val ipBytesDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize())) 
val ipBytesSumDStream = ipBytesDStream.reduceByKey((x, y) => x + y) 
val ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream)

或者使用transform(),直接操作内部的RDD,接收一个任意RDD到RDD的函数,会对数据流中的每个批次进行调用,生成一个新的流.通常用于重用对RDD的批处理代码.

比如有一个extractOutliers函数,用来从一个日志记录的RDD中提取出异常值RDD.可以使用transform进行重用:

val outlierDStream = accessLogsDStream.transform { rdd => extractOutliers(rdd) }

有状态转化

有状态转化操作是跨数据区间跟踪数据的操作: 一些先前批次的数据也被用来在新的批次中计算结果.主要的两种类型是滑动窗口和updateStateByKey(),前者以一个时间阶段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化(例如构建一个代表用户会话的对象).

有状态转化需要检查点机制保证容错性,将一个目录传递给ssc.checkpoint()以打开检查点支持.

ssc.checkpoint("hdfs://...")

基于窗口的转化操作

基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内(窗口),通过整合多个批次的结果,计算出整个窗口的结果.

滑动窗口需要两个参数: 窗口时长 和 滑动步长,两者都必须是StreamContext的批次间隔的整数倍.窗口时长控制每次计算最近的多少个批次的数据.滑动步长默认与批次间隔相等,用来控制对新的DStream进行计算的间隔,即多久对窗口做一次计算.

每隔2个批次就对前3个批次的数据做一次计算

DStream最简单的窗口操作是window(),它返回一个新的DStream来表示所请求的窗口操作的结果数据.即,window()生成的DStream中的每个RDD会包含多个批次中的数据,然后再对这些数据进行count(),transform()等操作.

使用window()对窗口进行计数:

val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10)) 
val windowCounts = accessLogsWindow.count()

reduceByWindow() 和 reduceByKeyAndWindow() 可以方便的对窗口进行归约操作.接收一个归约函数,在整个窗口上执行,比如 +.

另一种特殊方式,通过新进窗口的数据和离开窗口的数据,让Spark增量计算归约结果.这种特殊形式需要提供归约函数的逆函数,比如 + 的逆函数为 -, 对于较大的窗口,提供逆函数可以大大提高执行效率.

窗口归约操作

IP地址访问计数:

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) 
val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x, y) => x + y}, // 加上新进入窗口的批次中的元素 
                                                     {(x, y) => x - y}, // 移除离开窗口的老批次中的元素 
                                                     Seconds(30), // 窗口时长 
                                                     Seconds(10)) // 滑动步长

提供了countByWindow() 和 countByValueAndWindow() 作为对数据进行计数操作的简写.前者会返回一个DStream包含窗口中元素个数.后者返回一个DStream包含窗口中每个值的个数.

窗口计数操作:

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} 
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) 
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

UpdateStateByKey转化操作

有时候需要在DStream中跨批次维护状态(如用户session). updateStateByKey()为我们提供了一个对状态变量访问,用于键值对形式的DStream.

给定一个由(键, 事件)对构成的DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部状态为(键,状态)对.

比如,网络服务器日志中,事件可能是对网站的访问,此时键是用户ID,使用updateStateByKey()可以跟踪每个用户最近访问的10个页面.这个列表就是状态对象,我们会在每个事件到来时更新这个状态.

要使用updateStateByKey(),提供了一个update(events, oldState)函数,接收与某键相关的事件以及该键之前的状态,返回这个键的新状态,该函数签名如下:

  1. events: 是在当前批次中收到的事件列表(可能为空)
  2. oldState: 是一个可选的状态对象,存放在Option内,如果一个键灭有之前的状态,这个值可以空缺
  3. newState: 有函数返回,也以Opiton形式存在,我们可以返回一个空的Option来表示想要删除该状态

updateStateByKey()的结果是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的.

统计状态码计数的例子,跟窗口不同,计数自程序启动后无限增长:

def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
    Some(state.getOrElse(0L) + values.size) 
}
val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L)) 
val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)

输出操作

// 保存为文本
ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")

// 保存为SequenceFile
val writableIpAddressRequestCount = ipAddressRequestCount.map {
    (ip, count) => (new Text(ip), new LongWritable(count)) 
} 
writableIpAddressRequestCount.saveAsHadoopFiles[ SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")

输入源

文件流: 支持从任意Hadoop兼容的文件系统目录中的文件创建数据流.需要为目录名字提供统一的日期,文件也必须原子化创建.

val logData = ssc.textFileStream(logDirectory)

Akka actor流: 可以把actor作为数据源的流,创建一个actor然后实现 org.apache.spark. streaming.receiver.ActorHelper, 然后调用actor的store()函数将数据从actor复制到SparkStreaming.

Apache Kafka

Flume