package JT
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.lit
import org.joda.time.LocalDate
object jingtai {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("静态全量抽取")
.enableHiveSupport()
.getOrCreate()
var df_mysql =spark.read.format("jdbc")
.option("url","jdbc:mysql://node3:3306/tablee?useSSL=false")
.option("user","root")
.option("password","123456")
.option("dbtable","table1")
.load()
//获取前一天的时间 当然也可以手动指定
val lo_time=LocalDate.now().plusDays(-1).toString().replace("-","")
//添加一个分区字段,值为前面获取的时间
val frame = df_mysql.withColumn("data", lit(lo_time))
frame
.write
.mode(SaveMode.Overwrite)
.partitionBy("data")
.saveAsTable("ods.table1")
}
}
package ZL
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
object ZLid {
def main(args: Array[String]): Unit = {
}
}
val spark = SparkSession.builder().master("local[*]").appName("增量id")
.config("spark.sql.warehouse", "hdfs://node1:9870//user/hive/warehouse")
.config("hive.metastore.dir", "thrift://node3:9083")
.enableHiveSupport()
.getOrCreate()
spark.sql(
"""
|set hive.exec.dynamic.partition=true
|""".stripMargin)
spark.sql(
"""
|set hive.exec.dynamic.partition.mode=nonstrict
|""".stripMargin)
val max_id: Int = spark.table("ods.ft_expertinfo")
.select(max(col("id").cast("int")))
.first()
.getInt(0)
var mysql: Dataset[Row] = spark.read.format("jdbc")
.option("url", "jdbc:mysql://node3:3306/ft?useSSL=false")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "ft_expertinfo")
.load()
.where(col("Id").cast("int") > lit(max_id))
mysql.show()
var ft_mysql: Dataset[Row] = mysql.withColumn("etldate",date_format(col("CreateDate"),"yyyyMMdd")).limit(50)
ft_mysql.show()
ft_mysql
.write
.mode(SaveMode.Append)
.partitionBy("etldate")
.saveAsTable("ods.ft_expertinfo")
package ZL
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, date_format, from_unixtime, lit, max, unix_timestamp}
import org.joda.time.LocalDate
import java.text.SimpleDateFormat
object ZLtime {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("静态全量抽取")
.config("spark.sql.warehouse.dir", "hdfs://node1:9870//user/hive/warehouse")
.config("hive.metastore.uris", "thrift://node3:9083")
.enableHiveSupport()
.getOrCreate()
//动态分区严格模式
spark.sql(
"""
|set hive.exec.dynamic.partition=true
|""".stripMargin)
spark.sql(
"""
|set hive.exec.dynamic.partition.mode=nonstrict
|""".stripMargin)
// val max_time= spark.table("ods.table4")
// .select(max(col("dt").cast("int")))
// .first().getInt(0)
//获取hive表中的最大时间值 转为int 方便对比
val max_time2= spark.table("ods.table5")
.select(max(date_format(col("data_d"),"yyyyMMdd").cast("int")))
.first()
.getInt(0)
var ft_mysql: Dataset[Row] =spark.read.format("jdbc")
.option("url","jdbc:mysql://node3:3306/tablee?useSSL=false")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","123456")
.option("dbtable","table3")
.load()
.where(date_format(col("data_d"),"yyyyMMdd").cast("int")>lit(max_time2))
//对比hive原始数据与增量数据的时间
//在mysql中 添加新列作为分区字段
var df: DataFrame =ft_mysql.withColumn("dt",date_format(col("data_d"),"yyyyMMdd"))
df.show()
df
.write
.format("hive") //需要指定一下不然会报错
.partitionBy("dt") //设置分区字段
.mode(SaveMode.Append) //Append增加方式
.saveAsTable("ods.table5") //指定hive的ft_expertinfo表
df.show()
}
}
package QC
import org.apache.spark.sql.functions.{col, date_format}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.joda.time.LocalDate
object chongFU {
def main(args: Array[String]): Unit = {
// 清洗数据 创建sparksession
val spark = SparkSession
.builder()
.appName("数据清洗服务")
.master("local[*]")
.config("hive.metastore.dir", "thrift://node3:9083")
.enableHiveSupport()
.getOrCreate()
//这里使用的是数据去重的表列
val hive: DataFrame = spark.table("ods.qc1").select("name", "age", "gender",
"phone", "data_d")
// 这是去重之后的数据
var qc_hive= hive.distinct()
//qc_hive.na.drop().show() //这是删除null值的列
//qc_hive.na.fill("table1").show() //这里是填充为table1
val t_tmil=LocalDate.nAow().plusDays(-1).toString().replace("-","") //填充昨天的时间
//给去重后的表创建视图
qc_hive.createOrReplaceTempView("qc_data")
var dwd_hive = qc_hive.withColumn("dt", date_format(col("data_d"), "yyyyMMdd")).na.fill(t_tmil)
dwd_hive.show()
dwd_hiveQ
.write
.partitionBy("dt")
.mode(SaveMode.Overwrite)
.saveAsTable("dwd.qc1")
spark.close()
}
}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, date_format}
object Biao {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("初始化表")
.config("spark.sql.warehouer.dir", "hdfs://node1:9870//user/hive/warehouse")
.config("hive.metastore.uris", "thrift://node3:9083")
.enableHiveSupport() //开启hive支持
.getOrCreate()
//读取mysql的库
val df: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://node3:3306/ft?useSSL=false")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "ft_expertinfo") //需要使用那张表
.load()
//获取字段信息
val expertinfo_schema = df.schema.map(x => x.name + " " + x.dataType.sql).mkString(",")
spark.sql("drop table if exists ods.ft_expertinfo")
//在hiveods库创建地表分区字段exists
val ods_hive_expertinfo=
s"""
|create table if not exists ods.ft_expertinfo(
|$expertinfo_schema
|)
|partitioned by(etldate string)
|row format delimited fields terminated by '\t'
|""".stripMargin
spark.sql(ods_hive_expertinfo)
//从mysql抽取数据到hive
val ods_ft_expertinfo = df.withColumn("etldate", date_format(col("CreateDate"), "yyyyMMdd")).limit(50)
ods_ft_expertinfo
.write
.mode(SaveMode.Overwrite)
.partitionBy("etldate")
.saveAsTable("ods.ft_expertinfo")
val ft_expertinfo = spark.table("ods.ft_expertinfo")
ft_expertinfo.orderBy(col("Id").desc).show()
println("===================一共有多少数据=========="+ft_expertinfo.count()+"=============")
//读取mysql全量表
val df1 = spark.read.format("jdbc")
.option("url", "jdbc:mysql://node3:3306/ft?useSSL=false")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "ft_remarktags")
.load()
//获取全量表字段
val ft_remarktags = df1.schema.map(x => x.name + " " + x.dataType.sql).mkString(",")
spark.sql("drop table if exists ods.ft_remarktags")
val create_ods_ft_remarktags=
s"""
|create table if not exists ods.ft_remarktags(
|$ft_remarktags
|)
|partitioned by(data string)
|row format delimited fields terminated by '\t'
|""".stripMargin
spark.sql(create_ods_ft_remarktags)
spark.sql("use ods")
spark.sql("show tables").show()
}
}
object DstreamWard {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.joda.time.LocalDate
object cleaning2 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.enableHiveSupport()
.config("hive.metastore.uris", "thrift://192.168.17.30:9083")
.getOrCreate()
//TODO 读取ods层数据
val cs_mysql: DataFrame = spark.table("ods.csdate")
cs_mysql.distinct()
val df1: DataFrame = cs_mysql.withColumn("dwd_insert_user",lit("user1"))
val df2 = df1.withColumn("dwd_insert_time",lit(LocalDate.now().toString))
val df3 = df2.withColumn("dwd_modify_user",lit("user1"))
val df4 = df3.withColumn("dwd_modify_time",lit(LocalDate.now().toString))
df4.createTempView("datetable")
val cs_dwd_table: DataFrame = spark.sql(
"""
|select num,name,date_format(etime,'yyyy-MM-dd HH:mm:ss') as etime,etldate,dwd_insert_user,
|date_format(dwd_insert_time,'yyyy-MM-dd HH:mm:ss') as dwd_insert_time,dwd_modify_user,
|date_format(dwd_modify_time,'yyyy-MM-dd HH:mm:ss') as dwd_modify_time from datetable
|""".stripMargin)
cs_dwd_table.write.partitionBy("etldate").mode(SaveMode.Overwrite).saveAsTable("dwd.csdate")
}
}
}
}