spark--scala-douban模仿做了个python的版本

2019-03-02 23:46|来源: 网路

初识spark-基本概念和例子 | _yiihsia[互联网后端技术]

初识spark-基本概念和例子    

spark是一个开源的分布式计算系统,提供快速的数据分析功能。 官网地址 http://www.spark-project.org/ 据说性能高出hadoop很多(个人理解主要是因为两点:内存和cache),而且相对更加简单,灵活。非常适合需要反复迭代的计算,比如机器学习。

spark基于scala编写,对我而言也是门陌生的语言,至今还是有很多不理解的地方。 

基本概念

RDD

spark最大的亮点是提出RDD(Resilient Distributed Dataset)的概念,也就是可伸缩的分布式数据集合,本身只读,可恢复。spark本身不做物理储存,通过保存足够的信息去实际的储存中计算出RDD

RDD只要通过四种途径获取:

1、从共享的文件系统,比如HDFS
2、在驱动程序里的并行scala集合(例如数组),会发到多个节点上
3、从已存在的RDD转换
4、通过改变现有的RDD持久性。rdd是一个懒散,短暂的。
改变一个RDD的持久化通过两个动作:
cache:在第一次计算的时候保存在内存中,可以重用
save:保存到一个分布式文件系统,比如hdfs,这个保存版本会用于未来的操作

缓存形式只是一个提示。
如果集群中没有足够的内存去缓存所有的并行数据集合,spark将在使用它们的时候重新计算,选择这个方式工作(性能有所下降),如果节点发生故障或者数据集合太大,这个想法是一种松散的虚拟内存。

并行操作

RDD可以执行做个并行操作
reduce:通过相关函数合并数据集合,产生结果
collect: 发送所有元素的数据集合到驱动程序。例如,一个简单的方法去并行更新一个并行中的数组
foreach: 通过用户提供的函数去遍历所有元素,可能仅仅是一个不重要的功能
spark目前不支持在mapreduce中的grouped reduce,

共享变量

程序员通过函数去调用map,filter,reduce
当一个函数被传递到一个spark操作,执行在一个远程集群节点上,节点的工作使用的是独立副本。这些变量被复制到所有机器上。
一般情况下,共享变量的读写支持跨任务将是低效的。然而,spark提供两个共享变量的有限类型:广播变量和蓄电池。

广播变量
广播变量允许程序员保持一个只读变量到每台机器上,而不是运送它的一个副本和任务。spark使用高效的广播算法去分配广播变量,以降低通信成本。
广播变量被创建后,它应该在集群中的任何函数中替代值V, v不能再节点中传输超过一次。广播后值V不能被修改,以确保所有节点具有相同过的广播值。
当一个创建广播变量b的值v,v是一个共享文件系统保存到一个文件。 b是这个文件路径的序列化形式。当B在工人节点上查询,spark首先检查V是否在本地缓存,并从文件系统读取。 最初使用hdfs做广播变量,但是正在开发更有效的流媒体广播系统。
蓄电池
蓄电池是唯一的价值是:”通过关联操作,因此可以有效地支持并行的变量。它们可以被用来实现计数器(在MapReduce的)或者sum。spark支持原生的int,double
调用SparkContext.accumulator(v),初始化值v。在集群中做 += 操作,但是我们不能读值,只能通过驱动程序去读值用于
在工人节点上,创建一个单独的副本加器作为每个运行任务的线程的线程本地变量,从0开始。
每个任务运行后,工人发送信息到驱动程序,包含每个蓄电池的更新。驱动程序适用于每个操作的每个分区仅更新一次,以防止doublecounting任务时重新执行因失败
lineage
数据集的出处信息

 

Interpreter Integration
1、编译输出class文件到共享文件系统,集群中的工人通过java class loader加载它们。
2、为了每一行能够直接引用单例对象,我们改变了生成代码
而不是通过静态方法getInstance

例子

 

给出一些实时统计日志数据例子,例子都是本地模式计算(集群模式需要Mesos),仅供参考,实现上而言非常简单

统计日志中出现多少次hbase读取:

日志的格式每行第5位是标识字段,第6位是响应时间,第7位是类型字段

val spark = new SparkContext(“local”,”test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(line => line.contains(“hbase_time”))
println(lines.count());
统计读取hbase的平均响应时间:
val spark = new SparkContext(“local[2]“, “test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(_.contains(“hbase_time”))
val times = lines.map(dd => dd.split(” “)(6).toInt).reduce(_ + _)
println(“times:” + times/lines.count())
统计hbase的请求类型:
val spark = new SparkContext(“local[2]“, “test”)
val file = spark.textFile(“D:\\data\\keykeys-log\\log.log”)
val lines = file.filter(_.contains(“hbase_time”))
val ones = lines.map(word => (word.split(” “)(7), 1)).reduceByKey(_ + _)
ones.foreach(println)
参考:
Spark: Cluster Computing with Working Sets . Matei Zaharia, Mosharaf Chowdhury, Michael J. Franklin, Scott Shenker, Ion Stoica. USENIX HotCloud 2010 . June 2010.


, , ,

发表评论

 

*

 


转自:http://www.cnblogs.com/lexus/archive/2012/03/29/2422595

相关问答

更多
  • bg4.png spark的开发语言来scalar,因此会scalar是最好的,至于 java,python则是为了兼容性,有相关接口,但是资料相对较少一些。 查看更多答案>> 麻烦采纳,谢谢!
  • 如果是spark1的话应该用的是2.10,2的话应该是2.11,具体的到spark安装目录下看一下jars中scala开头的jar包就可以了
  • 楼主应该是写错了吧,到目前为止spark版本只到 1.6.1,对应scala版本为 2.10.x。
  • Scala SDK在主要版本之间不是二进制兼容的(例如,2.10和2.11)。 如果您有将要与Spark一起使用的Scala代码,并且该代码是针对特定主要版本的Scala(例如2.10)编译的,那么您将需要使用兼容版本的Spark。 例如,如果您在Scala中编写Spark 1.4.1代码并且使用的是2.11.4编译器,则应使用Spark 1.4.1_2.11。 如果你没有使用Scala代码,那么Spark 1.4.1_2.10和Spark 1.4.1_2.11之间应该没有功能差异(如果有的话,它很可能是一 ...
  • bg4.png spark的开发语言来scalar,因此会scalar是最好的,至于 java,python则是为了兼容性,有相关接口,但是资料相对较少一些。
  • Spark 2.2.0是默认构建和分发以与Scala 2.11一起使用的。 要在Scala中编写应用程序,您需要使用兼容的Scala版本(例如2.11.X)。 你的scala版本可能是2.12.X. 这就是它抛出异常的原因。 Spark 2.2.0 is built and distributed to work with Scala 2.11 by default. To write applications in Scala, you will need to use a compatible Scal ...
  • 来自spark-submit --help : Usage: spark-submit [options] [app arguments] 您需要在命令行上添加jar或python文件,然后添加选项。 所以,在你的例子中,你需要这样做 ./bin/spark-submit --class Sentimenter --master local[4] /home/ubuntu/spark/spark-example-master/target/scala-2.10 ...
  • 您可以设置在pyspark中使用的python可执行文件和由环境变量“PYSPARK_PYTHON”设置的spark-submit。 例如, PYSPARK_PYTHON = / opt / local / python-2.7 / bin / python pyspark I solve this problem with the removing of python2.7.5 from HD. The topic can be closed
  • Scala不是一个包,它是一个在Java运行时之上执行的库。 同样,Scala编译器scalac运行在Java运行时之上。 事实上,你在你的“系统”中安装了一个Scala版本,这很方便,但没有任何必要。 因此,从一个版本的Scala(2.9.2)启动sbt是完全可能的,但通过传递合适的标志(例如-classpath )来指示它使用完全不同的Scala版本(2.10.x)运行其他命令(编译) 。 请参阅: Java可以运行编译的Scala代码吗? Scala is not a package, it is a ...
  • TL; DR你不能。 两个问题: Spark(它并不是Spark特有的)将使用Scala版本来编译它。 安装在机器上的Scala编译版本根本不相关。 Spark还不支持Scala 2.12,因此重新编译不是一个选项。 TL;DR You cannot. Two problems: Spark (it is not really specific to Spark) will use Scala version which has been used to compile it. Version of Sca ...