【云驻共创】鲲鹏BoostKit大数据Spark算法加速分享

鲲鹏展翅共创行业新价值

2021“互联网+”大赛鲲鹏命题介绍

鲲鹏BoostKit大数据Spark算法加速分享


目录:

1. 命题介绍

2. 鲲鹏BoostKit大数据介绍

3. 鲲鹏Spark算法加速库


前言:

大数据是集收集,处理,存储为一体的技术总称。在海量数据处理的场景,大数据对计算及存储的要求较高,普遍以集群形式存在。不同的组件有不同的功能体现。 Spark 是专为大规模 数据处理 而设计的快速通用的计算引擎。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。


Spark 主要有三个特点

首先,高级 API 剥离了对集群本身的关注,Spark 应用开发者可以专注于应用所要做的计算本身。

其次,Spark 很快,支持交互式计算和复杂算法。

最后,Spark 是一个通用引擎,可用它来完成各种各样的运算,包括 SQL 查询、文本处理、机器学习等,而在 Spark 出现之前,我们一般需要学习各种各样的引擎来分别处理这些需求。


海量数据需要更高的并发度来加速数据处理,鲲鹏多核计算的特点能够提升大数据任务的并发度,加速大数据的计算性能。大数据并行计算特点天然匹配鲲鹏多核架构,但是,为了获得更好的性能,仍需根据硬件配置和应用程序特点,对软硬件系统做进一步的优化。



一、 命题:鲲鹏BoostKit大数据算法优化


【命题内容】∶

基于Spark 2.4.5和Hadoop 3.2.0版本,Spark GraphX中Betweenness介数中心性算法,用于描述图数据中每个节点在图中与其它节点的连通程度,体现了结点在图中的重要程度。


介数中心性算法可以支撑的应用包括:金融行业中用于评价客户的信贷风险;互联网行业中用于评价社交网络中的用户影响力及活跃度;政府行业中用于识别疾病传播的关键人员、地点;运营商行业中用于识别潜在关键客户。


服务器规格限制:一个队伍3台虚拟机,每台虚拟机的规格:华为云鲲鹏通用计算增强型Kc18核、32GB内存。系统盘:高IO 40GB;数据盘:高IO 500GB;带宽4Mbit/s。操作系统: openEuler 20.03 64bit with ARM


Spark开源组件中Betweenness算法采用公开网络数据集com-Amazon(点数量33万,边数量92万,http://snap.stanford.edu/data/com-Amazon.html),算法精度为75%,计算耗时为60Os,精度低、计算效率差,无法满足实际业务需求,期望从算法技术原理、鲲鹏亲和性适配角度,优化算法的精度和效率,精度提升到90%以上,计算耗时降低到90s以下


【答题要求】:

1、 算法交付软件需要可以运行在Spark平台上,并提供部署运行的指导文档。

2、 保持Betweenness算法的对外使用接口,与开源Spark算法一致。


【提示】

从鲲鹏亲和性(多核并发、数据结构优化、通信优化)和算法原理(降低算法计算复杂度)优化Spark分布式组件的Betweenness算法



二、鲲鹏BoostKit大数据介绍

Spark - 基本组成和概念


spark core: 实现了spark的基础功能(任务调度,内存管理。错误恢复,与存储系统交互等),以及对弹性api数据集的API定义。

spark SQL: 是spark用来操作结构化数据的程序包,支持多种数据源hive,parquet,josn等。 Spark SQL 通常用于交互式查询,但这一领域同类产品太多,更多作为MapReduce的替代者,用于批量作业。

spark streaming: 对实时数据进行流式计算的组件,提供了用来操作数据流的API,并于spark core中的RDD API高度对应。 Spark Streaming 流式batch处理。主要同类产品为storm。

spark MUib: 提供常见的机器学习(ML)功能的程序库。 提供了机器学习相关的统计、分类、回归等领域的多种算法实现。其一致的 API 接口大大降低了用户的学习成本。

GraphX 是spark面向图像计算提供的框架和算法库, 支持分布式,Pregel 提供的 API 可以解决图计算中的常见问题。


资源管理组件

Cluster Manager(集群资源管理器): 是指在集群上获取资源的外部服务,目前有以下几种。


Standalone : Spark原生的资源管理,由Master负责资源的管理。


Hadoop Yarn : 由YARN中的ResourceManager负责资源的管理。


Mesos : 由Mesos中的Mesos Master负责资源的管理。



应用程序

Application (应用程序)︰ 是指用户编写的Spark应用程序,包含驱动程序( Driver )和分布在集群中多个节点上运行的Executor代码,在执行过程中由一个或多个作业组成。


Driver(驱动程序) : Spark中的Driver即运行上述Application的main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。


ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常用SparkContext代表Driver。



作业执行

Worker(工作节点)∶ 集群中任何可以运行Application代码的节点,类似于YARN中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点。


Master(总控进程): Spark Standalone运行模式下的主节点,负责管理和分配集群资源来运行SparkAppliation。


Executor(执行进程): Application运行在Worker节点上的一个进程,该进程负责运行Task,并负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor。在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于Hadoop MapReduce中的YarnChild。



作业(Job ) : RDD中由行动操作所生成的一个或多个调度阶段。


调度阶段( Stage ): 每个作业会因为RDD之间的依赖关系拆分成多组任务集合,称为调度阶段,也叫做住务集。( TaskSet )。调度阶段的划分是由DAGScheduler来划分的调度阶段有Shuffle Map Stage和Result Stage两种。


任务(Task): 具体执行任务。 分发到Executor上的工作任务,是Spark实际执行应用的最小单元。


DAGScheduler : DAGScheduler是面向调度阶段的任务调度器,负责接收Spark应用提交的作业,根据RDD的依赖关系划分调度阶段,并提交调度阶段给

TaskScheduler。


TaskScheduler : TaskScheduler是面向任务的调度器,它接受DAGScheduler提交过来的调度阶段,然后以把任务分发到Work节点运行,由Worker节点的Executor来运行该任务。



三、 鲲鹏Spark算法加速库

开源Spark组件在鲲鹏平台的部署运行


编译Spark组件: https://support.huaweicloud.com/prtg-apache-kunpengbds/kunpengspark_02_0001.html



编译Hadoop组件: https://support.huaweicloud.com/prtg-apache-kunpengbds/kunpenghadoop_02_0001.html



编译Zookeeper组件: 编译Zookeeper组件: https://support.huaweicloud.com/prtg-apache-kunpengbds/kunpengzookeeper_02_0001.html


安装部署Spark组件: https://support.huaweicloud.com/dpmg-apache-kunpengbds/kunpengspark_04_0001.html



鲲鹏的Maven仓库: https://mirrors.huaweicloud.com/kunpeng/


华为云的中央仓库: https://mirrors.huaweicloud.com/repository/maven/



开源Spark组件性能调优


Spark平台调优

https://support.huaweicloud.com/tngg-kunpengbds/kunpengspark2xhdp_05_0004.html


Spark算法调优

https://support.huaweicloud.com/fg-kunpengbds/kunpengbdsspark_06_0087.html




开源Spark组件Betweenness算法测试

1、 开源Betweenness算法下载地址:

源码: https://github.com/Sotera/distributed-graph-analytics/tree/master/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/hbse


2、 添加MainApp.scala主程序,执行betweenness算法,获取结果,计算精度和执行时间


3、 在鲲鹏服务器上编译打包hbse_2.11-0.1.jar


4、 wget http://snap.stanford.edu/data/com-Amazon.html

上传到HDFS

hdfs put com-amazon.txt/betweenness/com-amazon.txt


*开源代码工程及运行脚本会发送给参数队列




3、 运行算法

spark-submit \

--master yarn --deploy-mode client\--name "Betweenness_opensource" \

--num-executors 6 --executor-memory 14G --executor-cores 4 --driver-memory 4G\--jars "./lib/scopt_2.11-3.2.0.jar" \

./lib/hbse_2.11-0.1.jar

-i hdfs://betweenness/com-amazon.txt -g hdfs://betweenness/com-amazon-groundTruth.txt

其中hbse_2.11-0.1.jar是开源betweenness算法软件包

scopt 2.11-3.2.0.jar是依赖包,下载地址: http://www.java2s.com/example/iar/s/download-scopt211320 j ar-file.html


-i是标明原始数据集在hdfs的路径﹐-g是标明数据集求解结果在hdfs的路径,用来计算精度。




Betweenness算法介绍(介数中心性算法)


算法原理

介数中心性算法计算图中介数中心性值最大的K个结点,是网络中心性重要的度量参数之一,同时也是图计算领域的基础算法。介数中心性算法的目的是衡量图中每一个结点与其它结点之间的互动程度,经过结点的最短路径数越多,该结点的介数中心性值越大,结点在图中的重要性也就越高。结点的介数中心性值是基于图中最短路径经过该结点的次数计算,最后选取图中介数中心性值最大的K个结点,输出这K个结点的编号和介数中心性值。


算法定义


介数中心性的定义: 图中每个结点的介数中心性等于图中所有结点对的最短路径经过该结点的次数除以结点对之间所有最短路径总条数。每个节点v的介数中心性可通过以下公式计算:


σst(v) 是从结点s到t的最短路径的数量,   σst(v) 是从结点s到t且经过结点v的最短路径数量。


其中 σst(v)表示经过节点 v的s→t的最短路径条数, σst表示 s→t的最短路径条数。

直观上来说,betweenness反映了节点v作为“桥梁”的重要程度。


算法应用


介数中心性算法可用于识别图上最重要的一批结点,可以支撑的应用包括:

安平行业: 用于识别欺诈团伙中的核心成员、识别谣言传播的关键人员。

金融行业: 用于评价客户的信贷风险。

互联网行业: 用于评价社交网络中的用户影响力及活跃度。

政府机关: 用于识别疾病传播的关键人员、地点。

电信运营商: 用于识别潜在关键客户。





鲲鹏BoostKit大数据:积极建设开源软件生态


全面支持开源大数据

支持开源Apache大数据各场景组件



开源社区接纳ARM生态

Hadoop、Hive、Hbase、Spark和Flink、ElasticSearch、Kudu等核心组件的开源社区支持ARM

备注: Hadoop、ElasticSearch开源社区已经提供官方版本的ARM软件包

鲲鹏镜像仓: https://repo.huaweicloud.com/kunpeng/





开源数据虚拟化引擎openLooKeng , openLooKeng致力于为大数据用户提供极简的数据分析体验,让用户像使用“数据库”一样使用“大数据”。

openLooKeng是一款开源的高性能数据虚拟化引擎。提供统一SQL接口,具备跨数据源/数据中心分析能力以及面向交互式、批、流等融合查询场景。同时增强了前置调度、跨源索引、动态过滤、跨源协同、水平拓展等能力。

Spark组件提供原生的机器学习MLlib和图GraphX算法库,支持在分布式集群上运行。鲲鹏基于算法原理和芯片特征针对机器学习和图分析算法进行深入优化,实现相比原生算法性能提升50%。


机器学习&图分析算法加速库提供以下算法优化,后续版本会持续更新增加算法。


机器学习算法: 分类回归(随机森林、GBDT、SVM、逻辑回归、线性回归、决策树、PreFixSpan、KNN、XGBoost)算法、聚类(Kmeans、LDA、DBScan)算法、推荐(ALS)算法、特征工程(PCA、SVD、Pearson、Covariance、Spearman)算法


图分析算法: 群体分析(极大团、弱团、Louvain、标签传播、连接组件、CC)、拓扑度量(三角形计数、Cluster Coefficient)算法、路径分析(最短路径、循环检测、广度优先搜索)、骨干分析(PageRank、亲密度、Kcore、Degree、TrustRank、PersonPageRank、Betweenness)算法、相似分类算法(子图匹配)、图表示学习类算法(Node2Vec)


BoostKit图算法,加速亿级图谱分析

某省级项目9000万节点,20亿边关系图谱

社团发现类:基于业务场景选择实体,确定实体间的关系,从而构成具备业务属性的关系图谱。社团发现类算法能在此关系图谱上,挖掘抽象图中的关系稠密团体,为挖掘目标团伙提供数据基础。


全量极大团挖掘算法:

耗时1小时内,执行性能较友商提升6倍,局部稠密场景,友商是非精确求解,且无法算出结果。

基于团渗透的社区发现算法:

耗时1小时内,执行性能较友商提升5倍。

BoostKit机器学习算法,加速十亿级样本特征分析

某运营商局点,全量样本~10亿条,中标样本~10万条,模型精度由80%提升至99.9%。

特征降维算法(PCA)∶提炼关键特征,降低计算复杂度,将 计算时间由5小时降低为1小时 ;

聚类算法(DBSCAN)︰提取重要样本,降低专家复核成本, 从10万级样本规模降低为千级样本规模


SVD的英文全称是Singular Value Decomposition,翻译过来是奇异值分解。这其实是一种线性代数算法,用来对矩阵进行拆分。拆分之后可以提取出关键信息,从而降低原数据的规模。因此广泛利用在各个领域当中,例如信号处理、金融领域、统计领域。在机器学习当中也有很多领域用到了这个算法,比如推荐系统、搜索引擎以及数据压缩等等。SVD算法不光可以用于降维算法中的特征分解,还可以用于推荐系统,以及自然语言处理等领域。是很多机器学习算法的基石。


图分析算法优化方案:分布式PageRank算法

1. 内存占用优化:基于稀疏压缩的数据表示,使得算法的内存占用下降30%,有效解决大数据规模下的内存瓶颈问题

2. 收敛检测优化:简化收敛检测过程,使整体任务量减少5%-10%。

3. 全量迭代+残差迭代组合优化:有效降低前期数据膨胀带来的shuffle瓶颈,整体性能可提升 0.5X-2X


通过算法计算模式的自适应切换,整体shuffle量减少50%,性能较优化前平均提升50%+




注: 本文整理自华为云社区内容共创活动之 鲲鹏BoostKit大数据Spark算法加速分享 点此回看 直播


查看活动详情: https://bbs.huaweicloud.com/blogs/293957


(完)