spark core之RDD编程

作者 : 开心源码 本文共3127个字,预计阅读时间需要8分钟 发布时间: 2022-05-12 共180人阅读

Spark Core之RDD编程

??spark提供了对数据的核心笼统——弹性分布式数据集(Resilient Distributed Dataset,简称RDD)。RDD是一个分布式的数据集合,数据可以跨越集群中的多个机器节点,被分区并行执行。
??在spark中,对数据的所有操作不外乎创立RDD、转化已有RDD及调使用RDD操作进行求值。spark会自动地将RDD中的数据分发到集群中并行执行。

五大特性

  • a list of partitions
    ??RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的list;将数据加载为RDD时,一般会遵循数据的本地性(一般一个hdfs里的block会加载为一个partition)。
  • a function for computing each split
    ??RDD的每个partition中都会有function,即函数应使用,其作使用是实现RDD之间partition的转换。
  • a list of dependencies on other RDDs
    ??RDD会记录它的依赖,为了容错(重算,cache,checkpoint),即内存中的RDD操作出错或者丢失时会进行重算。
  • Optionally,a Partitioner for Key-value RDDs
    ??可选项,假如RDD里面存的数据是key-value形式,则可以传递一个自己设置的Partitioner进行重新分区,例如自己设置的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面。
  • Optionally, a list of preferred locations to compute each split on
    ??可选项,最优的位置去计算每个分片,即数据的本地性。

创立RDD

??spark提供了两种创立RDD的方式:读取外部数据源、将驱动器程序中的集合进行并行化。

并行化集合

??用sparkContext的parallelize()方法将集合并行化。
??parallelize()方法第二个参数可指定分区数。spark会为每个分区创立一个task任务,通常每个cpu需要2-4个分区。spark会自动地根据集群大小设置分区数,也支持通过parallelize()方法的第二个参数手动指定。

scala

val data = Array(1, 2, 3, 4, 5)val distData = sc.parallelize(data)

java

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);JavaRDD<Integer> distData = sc.parallelize(data);

python

data = [1, 2, 3, 4, 5]distData = sc.parallelize(data)

??注:除了开发和测试外,这种方式使用得不多。这种方式需要把整个数据集先放到一台机器的内存中。

读取外部数据源

??spark可接入多种hadoop支持的数据源来创立分布式数据集。包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。
??spark支持多种存储格式,包括textFiles、SequenceFiles及其余hadoop存储格式。

scala

scala> val distFile = sc.textFile("data.txt")distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

java

JavaRDD<String> distFile = sc.textFile("data.txt");

python

>>> distFile = sc.textFile("data.txt")

RDD操作

??RDD支持两种操作:转化操作和行动操作。

算子.png

转化操作

??RDD的转化操作会返回一个新的RDD。转化操作是惰性求值的,只有行动操作使用到转化操作生成的RDD时,才会真正进行转化。

转化算子.png

??spark用lineage(血统)来记录转化操作生成的不同RDD之间的依赖关系。依赖分为窄依赖(narrow dependencies)和宽依赖(wide dependencies)。

  • 窄依赖
    • 子RDD的每个分区依赖于常数个父分区
    • 输入输出一对一,结果RDD的分区结构不变,主要是map、flatMap
    • 输入输出一对一,但结果RDD的分区结构发生变化,如union、coalesce
    • 从输入中选择部分元素的算子,如filter、distinct、subtract、sample
  • 宽依赖
    • 子RDD的每个分区依赖于所有父RDD分区
    • 对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
    • 对两个RDD基于key进行合并和重组,如join

      转化算子依赖.png

行动操作

??行动操作则会向驱动器程序返回结果或者把结果写入外部系统,会触发实际的计算。

action算子.png

缓存方式

??RDD通过persist方法或者cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调使用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重使用。
??cache最终也是调使用了persist方法,默认的存储级别是仅在内存存储一份。

缓存.jpg
??Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
缓存方式.png
??缓存有可能丢失,RDD的缓存容错机制保证即便缓存丢失也能保证计算正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,因为RDD的各个Partition是相对独立的,因而只要要计算丢失的部分就可,并不需要重算一律Partition。

容错机制

  • Lineage机制
    • RDD的Lineage记录的是粗粒度的特定数据Transformation操作行为。当RDD的部分分区数据丢失时,可以通过Lineage来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运使用场合,所以Spark并不适使用于所有高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能的提升。
    • Spark Lineage机制是通过RDD的依赖关系来执行的
      • 窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据。
      • 宽依赖则要等到父RDD所有数据都计算完成后,将父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。宽依赖要将祖先RDD中的所有数据块一律重新计算,所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。
  • Checkpoint机制
    • 简介
      • 当RDD的action算子触发计算结束后会执行checkpoint;Task计算失败的时候会从checkpoint读取数据进行计算。
    • 实现方式(checkpoint有两种实现方式,假如代码中没有设置checkpoint,则用local的checkpoint模式,假如设置路径,则用reliable的checkpoint模式。)
      • LocalRDDCheckpointData:临时存储在本地executor的磁盘和内存上。该实现的特点是比较快,适合lineage信息需要经常被删除的场景(如GraphX),可容忍executor挂掉。
      • ReliableRDDCheckpointData:存储在外部可靠存储(如hdfs),可以达到容忍driver 挂掉情况。尽管效率没有存储本地高,但是容错级别最好。

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » spark core之RDD编程

发表回复