spark基础概念

tanqi
20
2025-04-09

spark特点:

1.速度快

2.简单易用,支持的api多

3.各个模块基于sparkcore通用

4.运行模式多 (standalone,yarn,mesos,local)

saprk快是数据结果从磁盘转到内存,如果需要下一次计算就从内存取就行了

spark基于JVM的,python是解释性语言需要交互给JVM,性能不如java和scala

spark可以基于内存计算

而不是spark基于内存计算如:1T数据要用1T内存计算吗?

Yarn资源调度,Mesos辅助任务调度,saprk可以读取(HDFS(基于磁盘),tachyou在内存存储数据(不如加机器))这些数据

hive:数据仓库,偏重于大数据里的大数据分析的,端口9083,常用外部表

storm:实时处理框架

spark Core: spark的核心 底层是RDD,可以基于内存处理数据

sparkStreaing: 处理流的

sparksql:处理sql的

sparkMLlib:机器学习的,

graphx:图形计算

standclone

1.woker 资源管理子节点

2.Mastor 管理woker的

saprk四种运行模式

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CiPingTongji {
  def main(args: Array[String]): Unit = {


    /*
     1.配置运行模式
            loval:本地运行模式,多用于本地测试运行,idea 运行saprk代码
            yarn: hadoop生态圈中的资源调度框架,让spark可以基于yarn进行调度
            standalone: spark自带的资源调度框架,可以分布式搭建
            mesos:资源调度框架
     2.第二个配置spark Application名称
     3.配置spark运行的资源情况:core+内存
     */
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("test")
    /*
    SparkContext 是spark的上下文,需要写spark代码必须创建的sparkContext
     */
    val sc=new SparkContext(conf)

    sc.setLogevel("Error") //关闭日志
    /*
    textFile:设置本地文件路径获取一个RDD
     */
    val lines: RDD[String] = sc.textFile("本地路径")

    /*
    flatmap大致用法是用个切割字符串返回一个map
     */
    val words: RDD[String] = lines.flatMap(line => {line.split(" ")})

      
    val pairWords: RDD[(String, Int)] = words.map(word => new Tuple2(word, 1))

    val reduceRe: RDD[(String, Int)] = pairWords.reduceByKey((v1, v2) => {v1 + v2})

    val sorre: RDD[(String, Int)] = reduceRe.sortBy(tp => tp._2)  
    
    sorre.foreach(println)        //打印
  }
}

spark核心RDD

RDD,弹性分布式数据集

只要是RDD的方法都可以叫算子

RDD五大特性:

1.RDD是由一系列的partition组成的

2.算子(方法)作用在RDD的partition上

3.RDD之间有依赖关系(类流水线)

4.分区器作用是在K,V格式的RDD上

5.partition对外提供最佳计算位置,利于数据处理的本地化

注意:

什么是K,V格式的RDD?

1.如果RDD里面存储的数据是一个个tuple2,那么这个RDD我们就叫做K,V格式的RDD。

2.sc.textFile()底层是读取HDFS文件的方法调用的MR,读取HDFS文件方法,首先对文件split,每个split对应一个block,这里的split也同时对应一个partition

3.哪里体现了RDD分布式?

partition是分布在多个节点上的

4.哪里体现RDD的弹性

1.RDD之间有依赖关系(迭代器)

2.我们可以对RDD分区数增多或者减少(分区数越高速度越快)

Lineage血统

Lineage(血统的意思)

血统=RDD的一串依赖关系

名词解释

Driver:启动了spark程序就会启动这个进程

Worker:资源管理的

input Data:处理数据

RAM:内存

tasks:任务

每个程序启动都会有个Driver发生taske到Worker进行处理数据(可以基于RAM处理),然后再返回给Driver的JVM进程内存里面

Spark代码流程

1.创建sparkConf. setMaster.setAppName

2.sc SparkContext

3.创建RDD texFile

4.使用转换算子对数据进行转换

5.使用行动算子进行触发操作

6.sc.stop() 对象停掉并销毁

spark算子分类

创建RDD的四种方式

val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")
val rdd1: RDD[Int] = sc.parallelize(Array[Int](1, 2, 3, 4, 5, 66, 9))
val rdd1: RDD[(String, Int)] = sc.parallelize(Array[(String, Int)](("a", 10), ("b", 20), ("c", 30)))
val rdd1: RDD[String] = sc.makeRDD(Array[String]("a", "b", "c", "d"))

转换算子

spark常用转换算子 (延迟执行的(懒执行的)):

map(func)	:  将每个元素传递到函数func中,返回结果是一个新的数据集

flatMap(func):与map相似,但是每个输J入的元素都可以映射到0或者多个输出结果

reduceBykey(func): 应用于(k,v)键值对,返回一个新的(k,v)但v返回是一个 聚合v的结果

sortBy(),sortBykey():作用在K,V格式的RDD上,对Key进行升序或者降序排序

sample:随机抽样算子,根据传进去的小数按照比例进行放回或者不放回

filter(func): 筛选出满足函数func的元素,并返回一个新的数据集(RDD)

groupBykey(): 应用于(k,v)键值对,返回一个新的(k,intrable<v>)可以理解为次数

行动算子(触发算子) 如果有一个行动算子就有一个job

转换的时候不会进行操作,只有行动的时候才进行计算

每个spark app中就有一个行动算子,就有一个job
常用行动算子:
count(): 返回数据集中的个数

first() :   返回数据集中的第一个元素

take(n): 以数组的 形式返回数据集中n个元素    

reduce(func):通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

collect():以数组的形式返回数据集中的所有元素

foreach(func):将数据集的每个元素传递到函数func中运行



   val rdd1: RDD[Int] = sc.makeRDD(List[Int](1, 2, 3, 3,4, 5))   (输入两个参数并返回一个值)聚合数据集中的元素
   val i: Int = rdd1.reduce((x1, x2) => (x1 * x2))
  println(i)

val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")

  val c: Array[String] = rdd1.collect()   返回数组所有元素
  c.foreach(println)

  val s: String = rdd1.first()    返回数组第一个元素
  println(s)

  val t: Array[String] = rdd1.take(5)  返回前五个数据
  t.foreach(println)

  val l: Long = rdd1.count()   返回数据集中的个数 数据里有24行数据
  println(l)

作业1

1.切分单词,使用sample抽样找出,出现次数最多的单词
2.过滤出现次数最多的单词,对剩余的进行统计wordcount
3.按照出现次数从大到小排序

数据自由准备

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("job")
    val sc = new SparkContext(conf)
  	sc.setLogLevel("Error")
    val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")

    val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))

    val rdd3: RDD[String] = rdd2.sample(false, 0.5)

    val rdd4: RDD[(String, Int)] = rdd3.map((m => (m, 1)))
    val rdd5: RDD[(String, Int)] = rdd4.reduceByKey((a, b) => a + b)

    val rdd6: RDD[(String, Int)] = rdd5.filter(x => !x._1.equals("hello"))

    val rdd7: RDD[(String, Int)] = rdd6.sortBy(v=>v._2,false)
    rdd7.foreach(println)

		

持久化算子

3.持久化算子

RDD的持久化
-cache   默认将数据存在内存中 懒执行
-persist		手动指定持久化级别  默认cache=persist(null)
-checkpoint

rdd.cache()   默认将数据存在内存中 懒执行

rdd.persist(StorageLevel.MEMORY_ONLY)   将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算

		MEMORY_AND_DISK:将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取

		MEMORY_ONLY_SER:将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时,更占CPU

		MEMORY_AND_DISK_SER:与MEMORY_ONLY_SER类似,但是将当内存放不下则溢出到磁盘,而不是每次需要时重新计算它们

		DISK_ONLY:仅将RDD分区全部存储到磁盘上

		MEMORY_ONLY_2 ME:与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中
(																								这需要启用堆外内存)


cache与persist注意问题:
		1.都是懒执行,需要触发行动算子执行,持久化最小单位是partition  如内存200mb 数据256mb 就有两个bluc块 
		2.可以对RDD进行持久化时赋值给一个变量,下个job中使用这个变量就是使用的是持久化数据,也可以不赋值下一个变量
		3.如果采用第二种方式赋值变量,对RDD进行持久化之后不能跟紧行动算子,也就是在后面"点".



checkpoint:
    		1.将数据持久化到指定目录,需要指定目录,当spark 转换算子执行完毕后 数据不会被切断
    	区别:checkpoint& persist
        1.cache&persist持久化数据是由spark自己完成,当转换算子之后,数据会被清空
          checkpoint数据是由外部存储系统管理,行动算子执行完成后不会被清空,多用于状态管理
  
也是懒执行算子,需要行动触发
执行流程:
	1.当spark job 执行完成之后,spark框架会从后往前回顾,找到checkpointRDD做标记
	2.回顾完成后,sapkr重新启动一个job,重新计算当前checkpoint的结果,把计算的结果放入指定目录中
  	优化:对多个RDD进行持久化之前先cache下

spark环境搭建

spark任务standalone与yran提交

standalone圆周率代码实现与提交standalone

/*
    老方法
     */
    //    val conf = new SparkConf().setMaster("local").setAppName("spark")
    //    val sc = new SparkContext(conf)
    //    sc.setLogLevel("Error")

    /*
    新方法  是saprksql里的  SparkSession.builder().master("local").appName("spark").getOrCreate()
    聚合了老方法


                 计算PI
     */
    val sparksess = SparkSession.builder().appName("spark").getOrCreate()	//去掉.master("local")本地运行

    val i: Int = sparksess.sparkContext.parallelize(1 to 10000000).map(on => {
      val x = random * 2 - 1 //-1-1
      val y = random * 2 - 1
      if (x * x + y * y <= 1) 1 else 0
    }).reduce((v1, v2) => v1 + v2)

    println(s" pi= ${4.0 *i/10000000}")
    //    sc.setLogLevel("Error")
    //    val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")

1.打包jar包 提交集群运行
2.  ./spark-submit --master spark://node1:7077 --class 类名 jar路径
    这是standalone提交方式

client和cluster两种方式与区别

Standalone-client提交任务方式

提交命令

./spark-submit --master spark://mynode1:7077  
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
或者
  
./spark-submit --master spark://mynode1:7077 
--deploy-mode client  
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

执行流程

  1. client模式提交任务后,会在客户端启动Driver进程。

  2. Driver会向Master申请启动Application启动的资源。

  3. Master收到请求之后会在对应的Worker节点上启动Executor

  4. Executor启动之后,会注册给Driver端,Driver掌握一批计算资源。

  5. Driver端将task发送到worker端执行。worker将task执行结果返回到Driver端。
    总结
    client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。client模式适用于程序测试,不适用于生产环境,在客户端可以看到task的执行和结果

***Standalone-cluster提交任务方式

Standalone-cluster提交任务方式
提交命令
./spark-submit --master spark://mynode1:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

执行流程

  1. cluster模式提交应用程序后,会向Master请求启动Driver.

  2. Master接受请求,随机在集群一台节点启动Driver进程。

  3. Driver启动后为当前的应用程序申请资源。

  4. Driver端发送task到worker节点上执行。

  5. worker将执行情况和执行结果返回给Driver端。
    总结
    Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况的。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。
    总结Standalone两种方式提交任务,Driver与集群的通信包括:

  1. Driver负责应用程序资源的申请

  2. 任务的分发。

  3. 结果的回收。

  4. 监控task执行情况。

spark基于yarn提交任务

yarn-client提交任务方式

当然还有配置文件
这是yarn提交方式

./spark-submit --master yarn --class hello /export/jar/Spark-1.0.0.jar

提交命令
./spark-submit 
--master yarn
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

或者
./spark-submit 
--master yarn–client
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

或者
./spark-submit 
--master yarn 
--deploy-mode  client 
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

执行流程

  1. 客户端提交一个Application,在客户端启动一个Driver进程。

  2. 应用程序启动后会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。

  3. RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。

  4. AM启动后,会向RS请求一批container资源,用于启动Executor.

  5. RS会找到一批NM返回给AM,用于启动Executor。

  6. AM会向NM发送命令启动Executor。

  7. Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。

  8. 总结
    Yarn-client模式同样是适用于测试,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.

******ApplicationMaster的作用:

1. 为当前的Application申请资源

2. 给NameNode发送消息启动Executor。

注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。

***yarn-cluster提交任务方式

提交命令
./spark-submit 
--master yarn 
--deploy-mode cluster 
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
或者
./spark-submit 
--master yarn-cluster
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

执行流程

  1. 客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。

  2. RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。

  3. AM启动,AM发送请求到RS,请求一批container用于启动Executor。

  4. RS返回一批NM节点给AM。

  5. AM连接到NM,发送请求到NM启动Executor。

  6. Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。


总结
Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。

ApplicationMaster的作用:

1.为当前的Application申请资源

2.给NameNode发送消息启动Excutor。

3.任务调度。

停止集群任务命令:yarn application -kill applicationID

补充算子


    val rdd1 = sc.parallelize(
      Array[(String, Int)](
        ("zhangsan", 18),
        ("lisi", 19),
        ("wangwu", 20),
        ("tainqi", 21)
     )
   )

    val rdd1_1 = sc.parallelize(
      Array[(String, Int)](
        ("zhangsan", 100),
        ("lisi", 200),
        ("wangwu", 300),
       ("momo", 400)
     )
   )

1.cogroup

   val rdd2: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd1_1)

    /*
    当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>)),子RDD的分区与父RDD多的一致。
    (zhangsan,(CompactBuffer(18),CompactBuffer(100)))
    (wangwu,(CompactBuffer(20),CompactBuffer(300)))
    (tainqi,(CompactBuffer(21),CompactBuffer()))
    (momo,(CompactBuffer(),CompactBuffer(400)))
    (lisi,(CompactBuffer(19),CompactBuffer(200)))
     */
   rdd2.foreach(println)

2.union

   val rdd2: RDD[(String, Int)] = rdd1.union(rdd1_1)

    //  合并两个RDD的数据
    /*
          (zhangsan,18)
          (lisi,19)
          (wangwu,20)
          (tainqi,21)
          (zhangsan,100)
          (lisi,200)
           (wangwu,300)
          (momo,400)
     */
    rdd2.foreach(println)

3.rightOuterJoin
    val rdd2: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd1_1)

    //右连接
    /*
        (zhangsan,(Some(18),100))
        (wangwu,(Some(20),300))
        (momo,(None,400))
         (lisi,(Some(19),200))
     */
    rdd2.foreach(println)
    //想要获取值就    tp._1 获取some的值


4.leftOuterjoin

    val rdd3: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd1_1)
      //左连接 以左侧的RDD进行关联 左侧就是rdd1 都会出现 (K,(V,option)
    /*
        (wangwu,(20,Some(300)))
        (tainqi,(21,Some(400)))
        (lisi,(19,Some(200)))
        tianqin为空所以没有返回
     */

    rdd3.foreach(println)
    //想要获取值就    tp._2._1 获取some的值


5.join
    val rdd2: RDD[(String, (Int, Int))] = rdd1.join(rdd1_1)
       //作用在K,V格式的RDD上,如(K,V).join(K.W)=>(K,(V,W)) 只要相同的K就能关联在一起
    /*
      (zhangsan,(18,100))
      (wangwu,(20,300))
      (tainqi,(21,400))
      (lisi,(19,200))
     */
    rdd2.foreach(println)
    val rdd1: RDD[String] = sc.parallelize(List[(String)]("a", "b", "c", "d"))
    val rdd2: RDD[String] = sc.parallelize(List[(String)]("a", "b", "c", "e"))

  	rdd1.distinct().foreach(println)   //会不会右计算 会 只需要知道这点就可以了
    rdd1.intersection(rdd2).foreach(println)  //返回a,b,c  获取交集
    rdd1.subtract(rdd2).foreach(println)      //返回e       获取差集

  
    val rdd1: RDD[String] = sc.parallelize(List[(String)]("a", "b", "c", "d","e"),3)

    //读取每一个 进行插入  mapPartitions以分区为单位遍历RDD中的每个分区的数据 
     重点:遍历一个分区  如果有插入数据库的这种行为,结果还需要返回RDD就可以用这个
    /*
        建立数据库连接
        插入数据库连接ListBuffer(a)
        建立数据库连接
         插入数据库连接ListBuffer(b, c)
        建立数据库连接
        插入数据库连接ListBuffer(d, e)
     */

   rdd1.mapPartitions((x=>{
     val list=new ListBuffer[String]()
     println("建立数据库连接")
     while (x.hasNext) {
       val str: String = x.next()
       list.append((str))

     }
     println("插入数据库连接"+list)
     list.iterator
   })).count()

foreachPartition
遍历的数据是每个partition的数据。

	collectAsMap
对K,V格式的RDD数据回收转换成Map<K,V>

takeSample(boolean,num,seed)
takeSample可以对RDD中的数据随机获取num个,第一个参数是有无放回,第二个参数是随机获取几个元素,第三个参数如果固定,那么每次获取的数据固定。

top(num)
对RDD中的所有元素进行由大到小排序,获取前num个元素返回。

takeOrdered(num)
对RDD中的所有元素进行由小到大的排序,获取前num个元素返回。

spark Streaming 小队列流处理

spark Streaming 是spark core Api的扩展

spark Streaming有很好的扩展性 可以从kafka flume tcp等流队列获取数据

spark Streaming 可以将才处理过的数据写入文件系统和常见数据库中

spark Streaming是微次处理模型:微批次处理不会有长时间运行,并不是实时的流

Socket是java中支持基于TCP/UDP协议的通信所提供的编程模型

socket server

监听某个端口

socket client

向某个端口发起连接

TCP三次握手建立连接

我想去连接你 你准备好了吗

我收到你的消息,我现在ok了的,你可以建立连接

收到连接向server发送确认连接,再次确认o不ok,准备数据发送

NETcat

简称nc

nc -l 监听

nc host port 建立连接

很多地地方有用的, 通过mntr叫四级命令

nc -lk 9999 端口创建

nc localhost 连接某个端口

这样两端就可以发送消息

动物装饰