spark特点:
1.速度快
2.简单易用,支持的api多
3.各个模块基于sparkcore通用
4.运行模式多 (standalone,yarn,mesos,local)
saprk快是数据结果从磁盘转到内存,如果需要下一次计算就从内存取就行了
spark基于JVM的,python是解释性语言需要交互给JVM,性能不如java和scala
spark可以基于内存计算
而不是spark基于内存计算如:1T数据要用1T内存计算吗?
Yarn资源调度,Mesos辅助任务调度,saprk可以读取(HDFS(基于磁盘),tachyou在内存存储数据(不如加机器))这些数据
hive:数据仓库,偏重于大数据里的大数据分析的,端口9083,常用外部表
storm:实时处理框架
spark Core: spark的核心 底层是RDD,可以基于内存处理数据
sparkStreaing: 处理流的
sparksql:处理sql的
sparkMLlib:机器学习的,
graphx:图形计算
standclone
1.woker 资源管理子节点
2.Mastor 管理woker的
saprk四种运行模式
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CiPingTongji {
def main(args: Array[String]): Unit = {
/*
1.配置运行模式
loval:本地运行模式,多用于本地测试运行,idea 运行saprk代码
yarn: hadoop生态圈中的资源调度框架,让spark可以基于yarn进行调度
standalone: spark自带的资源调度框架,可以分布式搭建
mesos:资源调度框架
2.第二个配置spark Application名称
3.配置spark运行的资源情况:core+内存
*/
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("test")
/*
SparkContext 是spark的上下文,需要写spark代码必须创建的sparkContext
*/
val sc=new SparkContext(conf)
sc.setLogevel("Error") //关闭日志
/*
textFile:设置本地文件路径获取一个RDD
*/
val lines: RDD[String] = sc.textFile("本地路径")
/*
flatmap大致用法是用个切割字符串返回一个map
*/
val words: RDD[String] = lines.flatMap(line => {line.split(" ")})
val pairWords: RDD[(String, Int)] = words.map(word => new Tuple2(word, 1))
val reduceRe: RDD[(String, Int)] = pairWords.reduceByKey((v1, v2) => {v1 + v2})
val sorre: RDD[(String, Int)] = reduceRe.sortBy(tp => tp._2)
sorre.foreach(println) //打印
}
}spark核心RDD
RDD,弹性分布式数据集
只要是RDD的方法都可以叫算子
RDD五大特性:
1.RDD是由一系列的partition组成的
2.算子(方法)作用在RDD的partition上
3.RDD之间有依赖关系(类流水线)
4.分区器作用是在K,V格式的RDD上
5.partition对外提供最佳计算位置,利于数据处理的本地化
注意:
什么是K,V格式的RDD?
1.如果RDD里面存储的数据是一个个tuple2,那么这个RDD我们就叫做K,V格式的RDD。
2.sc.textFile()底层是读取HDFS文件的方法调用的MR,读取HDFS文件方法,首先对文件split,每个split对应一个block,这里的split也同时对应一个partition
3.哪里体现了RDD分布式?
partition是分布在多个节点上的
4.哪里体现RDD的弹性
1.RDD之间有依赖关系(迭代器)
2.我们可以对RDD分区数增多或者减少(分区数越高速度越快)
Lineage血统
Lineage(血统的意思)
血统=RDD的一串依赖关系
名词解释
Driver:启动了spark程序就会启动这个进程
Worker:资源管理的
input Data:处理数据
RAM:内存
tasks:任务
每个程序启动都会有个Driver发生taske到Worker进行处理数据(可以基于RAM处理),然后再返回给Driver的JVM进程内存里面
Spark代码流程
1.创建sparkConf. setMaster.setAppName
2.sc SparkContext
3.创建RDD texFile
4.使用转换算子对数据进行转换
5.使用行动算子进行触发操作
6.sc.stop() 对象停掉并销毁
spark算子分类
创建RDD的四种方式
val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")
val rdd1: RDD[Int] = sc.parallelize(Array[Int](1, 2, 3, 4, 5, 66, 9))
val rdd1: RDD[(String, Int)] = sc.parallelize(Array[(String, Int)](("a", 10), ("b", 20), ("c", 30)))
val rdd1: RDD[String] = sc.makeRDD(Array[String]("a", "b", "c", "d"))转换算子
spark常用转换算子 (延迟执行的(懒执行的)):
map(func) : 将每个元素传递到函数func中,返回结果是一个新的数据集
flatMap(func):与map相似,但是每个输J入的元素都可以映射到0或者多个输出结果
reduceBykey(func): 应用于(k,v)键值对,返回一个新的(k,v)但v返回是一个 聚合v的结果
sortBy(),sortBykey():作用在K,V格式的RDD上,对Key进行升序或者降序排序
sample:随机抽样算子,根据传进去的小数按照比例进行放回或者不放回
filter(func): 筛选出满足函数func的元素,并返回一个新的数据集(RDD)
groupBykey(): 应用于(k,v)键值对,返回一个新的(k,intrable<v>)可以理解为次数行动算子(触发算子) 如果有一个行动算子就有一个job
转换的时候不会进行操作,只有行动的时候才进行计算
每个spark app中就有一个行动算子,就有一个job
常用行动算子:
count(): 返回数据集中的个数
first() : 返回数据集中的第一个元素
take(n): 以数组的 形式返回数据集中n个元素
reduce(func):通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
collect():以数组的形式返回数据集中的所有元素
foreach(func):将数据集的每个元素传递到函数func中运行
val rdd1: RDD[Int] = sc.makeRDD(List[Int](1, 2, 3, 3,4, 5)) (输入两个参数并返回一个值)聚合数据集中的元素
val i: Int = rdd1.reduce((x1, x2) => (x1 * x2))
println(i)
val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")
val c: Array[String] = rdd1.collect() 返回数组所有元素
c.foreach(println)
val s: String = rdd1.first() 返回数组第一个元素
println(s)
val t: Array[String] = rdd1.take(5) 返回前五个数据
t.foreach(println)
val l: Long = rdd1.count() 返回数据集中的个数 数据里有24行数据
println(l)作业1
1.切分单词,使用sample抽样找出,出现次数最多的单词
2.过滤出现次数最多的单词,对剩余的进行统计wordcount
3.按照出现次数从大到小排序
数据自由准备
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("job")
val sc = new SparkContext(conf)
sc.setLogLevel("Error")
val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")
val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
val rdd3: RDD[String] = rdd2.sample(false, 0.5)
val rdd4: RDD[(String, Int)] = rdd3.map((m => (m, 1)))
val rdd5: RDD[(String, Int)] = rdd4.reduceByKey((a, b) => a + b)
val rdd6: RDD[(String, Int)] = rdd5.filter(x => !x._1.equals("hello"))
val rdd7: RDD[(String, Int)] = rdd6.sortBy(v=>v._2,false)
rdd7.foreach(println)
持久化算子
3.持久化算子
RDD的持久化
-cache 默认将数据存在内存中 懒执行
-persist 手动指定持久化级别 默认cache=persist(null)
-checkpoint
rdd.cache() 默认将数据存在内存中 懒执行
rdd.persist(StorageLevel.MEMORY_ONLY) 将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算
MEMORY_AND_DISK:将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取
MEMORY_ONLY_SER:将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时,更占CPU
MEMORY_AND_DISK_SER:与MEMORY_ONLY_SER类似,但是将当内存放不下则溢出到磁盘,而不是每次需要时重新计算它们
DISK_ONLY:仅将RDD分区全部存储到磁盘上
MEMORY_ONLY_2 ME:与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中
( 这需要启用堆外内存)
cache与persist注意问题:
1.都是懒执行,需要触发行动算子执行,持久化最小单位是partition 如内存200mb 数据256mb 就有两个bluc块
2.可以对RDD进行持久化时赋值给一个变量,下个job中使用这个变量就是使用的是持久化数据,也可以不赋值下一个变量
3.如果采用第二种方式赋值变量,对RDD进行持久化之后不能跟紧行动算子,也就是在后面"点".
checkpoint:
1.将数据持久化到指定目录,需要指定目录,当spark 转换算子执行完毕后 数据不会被切断
区别:checkpoint& persist
1.cache&persist持久化数据是由spark自己完成,当转换算子之后,数据会被清空
checkpoint数据是由外部存储系统管理,行动算子执行完成后不会被清空,多用于状态管理
也是懒执行算子,需要行动触发
执行流程:
1.当spark job 执行完成之后,spark框架会从后往前回顾,找到checkpointRDD做标记
2.回顾完成后,sapkr重新启动一个job,重新计算当前checkpoint的结果,把计算的结果放入指定目录中
优化:对多个RDD进行持久化之前先cache下spark环境搭建
spark任务standalone与yran提交
standalone圆周率代码实现与提交standalone
/*
老方法
*/
// val conf = new SparkConf().setMaster("local").setAppName("spark")
// val sc = new SparkContext(conf)
// sc.setLogLevel("Error")
/*
新方法 是saprksql里的 SparkSession.builder().master("local").appName("spark").getOrCreate()
聚合了老方法
计算PI
*/
val sparksess = SparkSession.builder().appName("spark").getOrCreate() //去掉.master("local")本地运行
val i: Int = sparksess.sparkContext.parallelize(1 to 10000000).map(on => {
val x = random * 2 - 1 //-1-1
val y = random * 2 - 1
if (x * x + y * y <= 1) 1 else 0
}).reduce((v1, v2) => v1 + v2)
println(s" pi= ${4.0 *i/10000000}")
// sc.setLogLevel("Error")
// val rdd1: RDD[String] = sc.textFile("C:\\sparkfdataset\\data\\hello")
1.打包jar包 提交集群运行
2. ./spark-submit --master spark://node1:7077 --class 类名 jar路径
这是standalone提交方式client和cluster两种方式与区别
Standalone-client提交任务方式
提交命令
./spark-submit --master spark://mynode1:7077
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
或者
./spark-submit --master spark://mynode1:7077
--deploy-mode client
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
执行流程
client模式提交任务后,会在客户端启动Driver进程。
Driver会向Master申请启动Application启动的资源。
Master收到请求之后会在对应的Worker节点上启动Executor
Executor启动之后,会注册给Driver端,Driver掌握一批计算资源。
Driver端将task发送到worker端执行。worker将task执行结果返回到Driver端。
总结
client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。client模式适用于程序测试,不适用于生产环境,在客户端可以看到task的执行和结果
***Standalone-cluster提交任务方式
Standalone-cluster提交任务方式
提交命令
./spark-submit --master spark://mynode1:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100执行流程
cluster模式提交应用程序后,会向Master请求启动Driver.
Master接受请求,随机在集群一台节点启动Driver进程。
Driver启动后为当前的应用程序申请资源。
Driver端发送task到worker节点上执行。
worker将执行情况和执行结果返回给Driver端。
总结
Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况的。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。
总结Standalone两种方式提交任务,Driver与集群的通信包括:
Driver负责应用程序资源的申请
任务的分发。
结果的回收。
监控task执行情况。
spark基于yarn提交任务
yarn-client提交任务方式
当然还有配置文件
这是yarn提交方式
./spark-submit --master yarn --class hello /export/jar/Spark-1.0.0.jar
提交命令
./spark-submit
--master yarn
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
或者
./spark-submit
--master yarn–client
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
或者
./spark-submit
--master yarn
--deploy-mode client
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
执行流程
客户端提交一个Application,在客户端启动一个Driver进程。
应用程序启动后会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
AM启动后,会向RS请求一批container资源,用于启动Executor.
RS会找到一批NM返回给AM,用于启动Executor。
AM会向NM发送命令启动Executor。
Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。
总结
Yarn-client模式同样是适用于测试,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.
******ApplicationMaster的作用:
1. 为当前的Application申请资源
2. 给NameNode发送消息启动Executor。
注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。
***yarn-cluster提交任务方式
提交命令
./spark-submit
--master yarn
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100
或者
./spark-submit
--master yarn-cluster
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100执行流程
客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。
RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。
AM启动,AM发送请求到RS,请求一批container用于启动Executor。
RS返回一批NM节点给AM。
AM连接到NM,发送请求到NM启动Executor。
Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。
总结
Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
ApplicationMaster的作用:
1.为当前的Application申请资源
2.给NameNode发送消息启动Excutor。
3.任务调度。
停止集群任务命令:yarn application -kill applicationID
补充算子
val rdd1 = sc.parallelize(
Array[(String, Int)](
("zhangsan", 18),
("lisi", 19),
("wangwu", 20),
("tainqi", 21)
)
)
val rdd1_1 = sc.parallelize(
Array[(String, Int)](
("zhangsan", 100),
("lisi", 200),
("wangwu", 300),
("momo", 400)
)
)
1.cogroup
val rdd2: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd1_1)
/*
当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>)),子RDD的分区与父RDD多的一致。
(zhangsan,(CompactBuffer(18),CompactBuffer(100)))
(wangwu,(CompactBuffer(20),CompactBuffer(300)))
(tainqi,(CompactBuffer(21),CompactBuffer()))
(momo,(CompactBuffer(),CompactBuffer(400)))
(lisi,(CompactBuffer(19),CompactBuffer(200)))
*/
rdd2.foreach(println)
2.union
val rdd2: RDD[(String, Int)] = rdd1.union(rdd1_1)
// 合并两个RDD的数据
/*
(zhangsan,18)
(lisi,19)
(wangwu,20)
(tainqi,21)
(zhangsan,100)
(lisi,200)
(wangwu,300)
(momo,400)
*/
rdd2.foreach(println)
3.rightOuterJoin
val rdd2: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd1_1)
//右连接
/*
(zhangsan,(Some(18),100))
(wangwu,(Some(20),300))
(momo,(None,400))
(lisi,(Some(19),200))
*/
rdd2.foreach(println)
//想要获取值就 tp._1 获取some的值
4.leftOuterjoin
val rdd3: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd1_1)
//左连接 以左侧的RDD进行关联 左侧就是rdd1 都会出现 (K,(V,option)
/*
(wangwu,(20,Some(300)))
(tainqi,(21,Some(400)))
(lisi,(19,Some(200)))
tianqin为空所以没有返回
*/
rdd3.foreach(println)
//想要获取值就 tp._2._1 获取some的值
5.join
val rdd2: RDD[(String, (Int, Int))] = rdd1.join(rdd1_1)
//作用在K,V格式的RDD上,如(K,V).join(K.W)=>(K,(V,W)) 只要相同的K就能关联在一起
/*
(zhangsan,(18,100))
(wangwu,(20,300))
(tainqi,(21,400))
(lisi,(19,200))
*/
rdd2.foreach(println) val rdd1: RDD[String] = sc.parallelize(List[(String)]("a", "b", "c", "d"))
val rdd2: RDD[String] = sc.parallelize(List[(String)]("a", "b", "c", "e"))
rdd1.distinct().foreach(println) //会不会右计算 会 只需要知道这点就可以了
rdd1.intersection(rdd2).foreach(println) //返回a,b,c 获取交集
rdd1.subtract(rdd2).foreach(println) //返回e 获取差集
val rdd1: RDD[String] = sc.parallelize(List[(String)]("a", "b", "c", "d","e"),3)
//读取每一个 进行插入 mapPartitions以分区为单位遍历RDD中的每个分区的数据
重点:遍历一个分区 如果有插入数据库的这种行为,结果还需要返回RDD就可以用这个
/*
建立数据库连接
插入数据库连接ListBuffer(a)
建立数据库连接
插入数据库连接ListBuffer(b, c)
建立数据库连接
插入数据库连接ListBuffer(d, e)
*/
rdd1.mapPartitions((x=>{
val list=new ListBuffer[String]()
println("建立数据库连接")
while (x.hasNext) {
val str: String = x.next()
list.append((str))
}
println("插入数据库连接"+list)
list.iterator
})).count()
foreachPartition
遍历的数据是每个partition的数据。
collectAsMap
对K,V格式的RDD数据回收转换成Map<K,V>
takeSample(boolean,num,seed)
takeSample可以对RDD中的数据随机获取num个,第一个参数是有无放回,第二个参数是随机获取几个元素,第三个参数如果固定,那么每次获取的数据固定。
top(num)
对RDD中的所有元素进行由大到小排序,获取前num个元素返回。
takeOrdered(num)
对RDD中的所有元素进行由小到大的排序,获取前num个元素返回。
spark Streaming 小队列流处理
spark Streaming 是spark core Api的扩展
spark Streaming有很好的扩展性 可以从kafka flume tcp等流队列获取数据
spark Streaming 可以将才处理过的数据写入文件系统和常见数据库中
spark Streaming是微次处理模型:微批次处理不会有长时间运行,并不是实时的流
Socket是java中支持基于TCP/UDP协议的通信所提供的编程模型
socket server
监听某个端口
socket client
向某个端口发起连接
TCP三次握手建立连接
我想去连接你 你准备好了吗
我收到你的消息,我现在ok了的,你可以建立连接
收到连接向server发送确认连接,再次确认o不ok,准备数据发送
NETcat
简称nc
nc -l 监听
nc host port 建立连接
很多地地方有用的, 通过mntr叫四级命令
nc -lk 9999 端口创建
nc localhost 连接某个端口
这样两端就可以发送消息