Qubole在AWS Lambda上提供Apache Spark

作者 : 开心源码 本文共4693个字,预计阅读时间需要12分钟 发布时间: 2022-05-12 共97人阅读

Qubole在AWS Lambda上提供Apache Spark

Qubole宣布在AWS Lambda上提供Apache Spark的工作实现。该原型能够成功扫描1 TB的数据,并对来自AWS Simple Storage Service(S3)的100 GB数据进行排序。本文深入讨论了如何构建这个原型的技术细节以及Apache Spark 2.1.0所需的代码更改。

动机

AWS Lambda是无服务器计算服务。它允许您在需要时进行扩展,同时仅支付使用的计算机,并避免配置服务器。这使得应用程序在计算需求方面具备高度的弹性,并且依然能够高效运行。

Apache Spark是一个快速,通用的大数据解决引擎。因为开发友好的API,运行许多不同种类的数据解决任务的能力以及卓越的性能,它越来越受欢迎。在AWS Lambda上运行Apache Spark应用程序的能力理论上会给予Spark的所有优势,同时允许Spark应用程序在资源使用方面具备更大的弹性。我们启动了Spark on Lambda项目来探究这个想法的可行性。

AWS Lambda上的Qubole Spark实现允许:

  • 弹性:我们现在可以运行突发性的工作负载,需要数千个使用AWS Lambda调用的Spark执行程序,而无需等待机器启动。

  • 简单性:在Qubole,我们运行云中少量最大的自动扩展Spark集群。自我管理集群已经允许民主化和广泛采用大数据技术。但是,基于Lambda的实现进一步简化了管理员的配置和管理。

  • 无服务器:运行Spark的流行方式是使用独立模式或者Apache Hadoop的YARN。这些方法假设存在一个集群,这些应用程序的弹性受究竟层集群扩大和缩小的能力的限制。随着Spark在Lambda上,一个簇的概念完全消失。

  • 透明度:Spark应用程序会调用大量的AWS Lambda函数 – 每个函数都有明确的成本。这使我们能够准确计算运行每个Spark工作负载的成本。

Qubole在AWS Lambda上提供Apache Spark

用例

少量可以使用这种组合解决的常见用例包括:

  • 数据预解决和准备:可以使用AWS Lambda完成ETL或者数据挖掘的点击流和访问日志等日志的转换。

  • 交互式数据分析:临时交互式查询非常适合Spark on Lambda,由于我们可以快速地提供大量的计算能力。

  • 流解决:解决离散事件流也是Spark on Lambda的候选用例。

关键挑战

虽然AWS Lambda带来了几乎无限的云计算资源的快速突发配置的承诺,但它有少量限制,使其成为运行Spark的挑战。我们的执行必需克服这些关键挑战:

  1. 无法直接通信:使用DAG执行框架的Spark会产生多个阶段的作业。对于跨阶段的沟通,Spark需要执行者之间的数据传输。AWS Lambda不允许在两个Lambda函数之间进行通信。这对在这个环境中运行执行者提出了挑战。

  2. 极其有限的运行时资源:AWS Lambda调用当前仅限于5分钟的最大执行时间,1536 MB内存和512 MB磁盘空间。Spark喜欢内存,可以有较大的磁盘空间,可以产生长时间运行的任务。这使得Lambda成为运行Spark的困难环境。

这些限制强制对Spark进行不重要的更改,以使其在Lambda上成功运行。

在下面的章节中,我们将详情使Spark在无服务器运行时(如AWS Lambda)上运行的更改以及在Lambda上使用Spark运行的应用程序。

履行

该实现基于对Spark的两个关键架构更改:

  1. Spark执行程序是从AWS Lambda调用中运行的。

  2. 洗牌操作使用外部存储来避免限制本地磁盘的大小,并避免Lambda调用之间的相互通信。

建立

Qubole在AWS Lambda上提供Apache Spark

Qubole在AWS Lambda上运行Apache Spark

Spark驱动程序JVM 在VPC中的EC2实例上运行。运行Spark驱动程序的EC2实例上的安全组允许来自运行在Lambda上的执行程序的传入连接。Lambda函数作为同一个VPC的一部分运行。

我们删除了Spark,Hadoop和Hive分发(JAR),以创立可在Lambda实例的512MB磁盘限制内部署的较小软件包。

Spark执行程序作为Lambda函数

通常情况下,Spark Executors是在预装了Spark库的磁盘空间很多的计算机上启动的。但是,AWS Lambda函数只能在最大部署包大小为50 MB(ZIP / JAR文件)的情况下启动。为了能够通过Lambda运行Spark Executors,我们:

  1. 使用准系统Python Lambda函数启动AWS Lambda。

  2. Python代码立即启动后,通过从压缩的S3软件包中下载Spark库来引导Lambda运行时,提取/tmp (仅在Lambda中的可写位置)下的存档 ,并通过执行作为Lambda请求一部分传递的Java命令行来启动Spark Executor 。

  3. 而后,这个执行程序通过发送心跳返回到Spark驱动程序来加入Spark应用程序

从头开始执行上述策略可能相当缓慢。但是,AWS Lambda允许执行程序的后续实例以比第一个调用快得多的速度启动。在冷启动案例中,我们观察到执行者启动时间大约为两分钟。相比之下,在温暖的情况下(其中Lambda已经有一个供应容器),我们注意到启动时间大约为4秒。这与配置容量扩展所需的EC2实例相反,需要大约一到两分钟的时间才能运行。

新的Spark Scheduler

Spark有一个可插拔的调度程序后台CoarseGrainedSchedulerBackend
,可以与不同的集群管理器(如Apache Hadoop YARN,Mesos,Kubernetes,Standalone等)一起工作来请求资源。Spark驱动程序在启动Spark应用程序期间在内部创立相应的调度程序后台,该应用程序解决应用程序的注册,中止应用程序,请求更多的执行程序或者者杀死现有的执行程序。

例如,在YARN作为集群管理器的情况下,YarnSchedulerBackend 提交YARN应用程序以及必要的信息,ApplicationMaster
ResourceManager使用YARN用户端启动YARN。一旦应用程序被接受ResourceManager,它将启动该协议 ApplicationMaster
, 这有助于在Spark应用程序的整个生命周期内与YARN协商资源。同样,我们为AWS Lambda实施了一个新的后台。每当Spark驱动程序需要更多资源时,LambdaSchedulerBackend 就会对AWS Lambda进行API调用以取得Lambda调用。一旦调用可用,Executor将按照前面所述启动。一旦执行程序的五分钟运行时间耗尽,Spark的自动扩展组件决定要求新的执行程序,并向AWS Lambda发出新的API调用。

Spark调度器的一个额外的改变也是在执行器接近执行持续时间到期时中止执行器的任务调度。为了减少运行任务失败的可能性,调度程序在可配置的时间间隔(在我们的例子中是四分钟)之后中止分配任务给这样的执行程序。

在我们的试验中,我们也注意到AWS Lambda在超过肯定的速率后开始对请求进行限制。为理解决这些问题,我们修改了调度程序后台以恒定的线性速率发出请求,而不是一次发出大量的请求。

Qubole在AWS Lambda上提供Apache Spark

状态卸载

对Spark shuffle基础架构的更改是我们必需在Lambda上运行Spark所做的重要更改。Lambda的无状态体系结构,其运行时限制以及两个Lambda函数之间无法通信意味着我们需要一个外部存储来管理状态。利用Lambda作为AWS服务,S3成为存储洗牌数据的自动选择。映射器任务使用目录布局方案将数据直接写入S3,这允许任何执行者(下游阶段的任务)读取混洗数据而不需要外部洗牌服务。对S3的写入使用流机制来执行,其中以块的形式上传大文件/流。

虽然我们最初使用S3作为随机存储器运行了所有试验,但在内部我们使用Hadoop文件系统,这使得随机存储器可插入。这使得可以轻松地将S3替换为HDFS或者Amazon弹性文件系统(EFS)。

在这个架构中,因为任务的混洗数据被保存到S3,我们也做了调度器的改变,以避免需要重新提交解决执行器(Lambda)失败的阶段。

性能测量

最大的问题是这个实现如何执行我们要处理的日常使用案例。下面报告两个简单而实际的用例的结果。

扫描1 TB的数据

使用1000个Lambda执行程序读取1 TB数据的数据集上的行计数操作只要要47秒。因为每100ms计算时间的AWS成本为0.000002501美元,因而成本为1.18美元。

所花费的时间与启动集群并执行相同操作所需的时间形成鲜明比照。在云中启动Spark群集的平均时间需要两到三分钟。对于具备足够容量的已在运行的集群,此操作的内存要求和并发将至少需要50个r3.xlarge实例,需要额外的维护开销,并优化这些实例的生命周期。

排序100 GB的数据

通过在Lambda上使用Spark,我们能够在不到10分钟(正好为579.7秒)的情况下对100 GB数据进行排序,其中400个并发执行程序随时运行,共有800个Lambda函数。

技术细节

  1. 我们发现128 MB是正确的拆分大小。64 MB的分割大小导致大量的S3调用引入了推迟,并添加了应用程序的端到端运行时间。另一方面,解决256 MB的拆分大小将达到AWS Lambda执行的内存限制。

  2. AWS Lambda可以同时执行数千个Lambda函数。对于这个应用程序,我们必需将其限制为400个并发Lambda函数,由于并发写入shuffle数据到导致S3服务节流的存储桶中 – 虽然可以通过更改shuffle写入方案和基于应用程序生成的前缀

安装程序也应该可扩展以分类更大的数据集。

成本估算

总的来说,它花费了1000个Lambda函数,运行5分钟,内存为1.5GB。此规范的Lambda函数每100ms成本$ 0.000002501。

总成本= $ 0.000002501 * 5 * 60 * 10 * 1000 = $ 7.5

这绝对是昂贵的,但这是因为偶尔的S3限制,到期的Lambda执行持续时间导致任务失败,并被重试。这既是成本,也是加起来的时间。我们在下面的未来工作部分列出了少量变化,这将有助于减少此基准的运行时间并导致较低的Lambda成本。这个例子还重申,需要大量数据的应用程序需要保存为状态,并且可以享受Lambda的好处,例如简单性,弹性和透明度,但可能会产生额外的成本开销。

GitHub仓库

Apache Spark(基于版本2.1.0)的代码可用于AWS Lambda。

未来的工作

上述工作有助于对Lambda上Spark的应用进行分类。我们还确定了可以进行改进的其余领域,以添加合适应用的规模和子集:

  • 调度程序:AWS Lambda将最大执行持续时间限制为5分钟。可以超越上面提出的调度更改,并通过查看一个阶段中任务的最大运行时间来智能地计划任务,而不是将其安排在没有足够剩余执行持续时间的Lambda上的执行器上。

  • 随机播放:使用AWS S3,随机数据上的重命名操作可能会引起偶尔的最终一致性问题。S3上的这些重命名可以被消除或者以避免最终一致操作的方式解决。

  • 随机播放:使用400多个并发Lambda执行并写入S3存储桶,应用程序可以体验节流。通过使用唯一的前缀对写入到S3的混洗数据进行分区来执行密钥的负载平衡,可以对此进行扩展以取得更好的性能。

  • 洗牌:添加了对LAMBDA功能火花而从S3读取多洗牌嵌段可以通过在新的块提取器并行这些请求被掩蔽的推迟。

  • 假如AWS Lambda支持其余可伸缩文件存储处理方案(例如AWS Elastic File System(EFS)),则执行程序可以启动和重新整理数据写入时间,

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Qubole在AWS Lambda上提供Apache Spark

发表回复