scala 处理Spark数据

tanqi
17
2025-04-09
package JT

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.lit
import org.joda.time.LocalDate

object jingtai {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("静态全量抽取")
      .enableHiveSupport()
      .getOrCreate()

    var df_mysql =spark.read.format("jdbc")
      .option("url","jdbc:mysql://node3:3306/tablee?useSSL=false")
      .option("user","root")
      .option("password","123456")
      .option("dbtable","table1")
      .load()

    //获取前一天的时间 当然也可以手动指定
    val lo_time=LocalDate.now().plusDays(-1).toString().replace("-","")

    //添加一个分区字段,值为前面获取的时间
    val frame = df_mysql.withColumn("data", lit(lo_time))

   frame
     .write
     .mode(SaveMode.Overwrite)
     .partitionBy("data")
     .saveAsTable("ods.table1")
  }

}
package ZL

import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

object ZLid {
  def main(args: Array[String]): Unit = {
      }

}
val spark = SparkSession.builder().master("local[*]").appName("增量id")
      .config("spark.sql.warehouse", "hdfs://node1:9870//user/hive/warehouse")
      .config("hive.metastore.dir", "thrift://node3:9083")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql(
      """
        |set hive.exec.dynamic.partition=true
        |""".stripMargin)
    spark.sql(
      """
        |set hive.exec.dynamic.partition.mode=nonstrict
        |""".stripMargin)

    val max_id: Int = spark.table("ods.ft_expertinfo")
      .select(max(col("id").cast("int")))
      .first()
      .getInt(0)

    var mysql: Dataset[Row] = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://node3:3306/ft?useSSL=false")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "ft_expertinfo")
      .load()
      .where(col("Id").cast("int") > lit(max_id))
    mysql.show()


    var ft_mysql: Dataset[Row] = mysql.withColumn("etldate",date_format(col("CreateDate"),"yyyyMMdd")).limit(50)

    ft_mysql.show()

    ft_mysql
      .write
      .mode(SaveMode.Append)
      .partitionBy("etldate")
      .saveAsTable("ods.ft_expertinfo")
package ZL

import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, date_format, from_unixtime, lit, max, unix_timestamp}
import org.joda.time.LocalDate

import java.text.SimpleDateFormat

object ZLtime {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("静态全量抽取")
      .config("spark.sql.warehouse.dir", "hdfs://node1:9870//user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://node3:9083")
      .enableHiveSupport()
      .getOrCreate()


    //动态分区严格模式
    spark.sql(
      """
        |set hive.exec.dynamic.partition=true
        |""".stripMargin)
    spark.sql(
      """
        |set hive.exec.dynamic.partition.mode=nonstrict
        |""".stripMargin)


//       val max_time= spark.table("ods.table4")
//          .select(max(col("dt").cast("int")))
//          .first().getInt(0)

      //获取hive表中的最大时间值 转为int 方便对比
      val max_time2= spark.table("ods.table5")
        .select(max(date_format(col("data_d"),"yyyyMMdd").cast("int")))
        .first()
        .getInt(0)

    var ft_mysql: Dataset[Row] =spark.read.format("jdbc")
      .option("url","jdbc:mysql://node3:3306/tablee?useSSL=false")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","123456")
      .option("dbtable","table3")
      .load()
      .where(date_format(col("data_d"),"yyyyMMdd").cast("int")>lit(max_time2))
      //对比hive原始数据与增量数据的时间

    //在mysql中 添加新列作为分区字段
    var df: DataFrame =ft_mysql.withColumn("dt",date_format(col("data_d"),"yyyyMMdd"))
    df.show()


    df
      .write
      .format("hive")  //需要指定一下不然会报错
      .partitionBy("dt")  //设置分区字段
      .mode(SaveMode.Append)              //Append增加方式
      .saveAsTable("ods.table5")   //指定hive的ft_expertinfo表
    df.show()
  }

}
package QC

import org.apache.spark.sql.functions.{col, date_format}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.joda.time.LocalDate

object chongFU {
  def main(args: Array[String]): Unit = {
    // 清洗数据  创建sparksession
    val spark = SparkSession
      .builder()
      .appName("数据清洗服务")
      .master("local[*]")
      .config("hive.metastore.dir", "thrift://node3:9083")
      .enableHiveSupport()
      .getOrCreate()

    //这里使用的是数据去重的表列
    val hive: DataFrame = spark.table("ods.qc1").select("name", "age", "gender",
      "phone", "data_d")

    // 这是去重之后的数据
    var qc_hive= hive.distinct()

    //qc_hive.na.drop().show()   //这是删除null值的列
    //qc_hive.na.fill("table1").show()   //这里是填充为table1

    val t_tmil=LocalDate.nAow().plusDays(-1).toString().replace("-","")   //填充昨天的时间

    //给去重后的表创建视图
    qc_hive.createOrReplaceTempView("qc_data")
    var dwd_hive = qc_hive.withColumn("dt", date_format(col("data_d"), "yyyyMMdd")).na.fill(t_tmil)
    dwd_hive.show()

    dwd_hiveQ
      .write
      .partitionBy("dt")
      .mode(SaveMode.Overwrite)
      .saveAsTable("dwd.qc1")
    spark.close()

  }
}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, date_format}

object Biao {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("初始化表")
      .config("spark.sql.warehouer.dir", "hdfs://node1:9870//user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://node3:9083")
      .enableHiveSupport() //开启hive支持
      .getOrCreate()

    //读取mysql的库
    val df: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://node3:3306/ft?useSSL=false")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "ft_expertinfo") //需要使用那张表
      .load()

    //获取字段信息
    val expertinfo_schema = df.schema.map(x => x.name + " " + x.dataType.sql).mkString(",")

    spark.sql("drop table if exists ods.ft_expertinfo")

    //在hiveods库创建地表分区字段exists
    val ods_hive_expertinfo=
      s"""
         |create table if not exists ods.ft_expertinfo(
         |$expertinfo_schema
         |)
         |partitioned by(etldate string)
         |row format delimited fields terminated by '\t'
         |""".stripMargin
    spark.sql(ods_hive_expertinfo)

    //从mysql抽取数据到hive
    val ods_ft_expertinfo = df.withColumn("etldate", date_format(col("CreateDate"), "yyyyMMdd")).limit(50)
    ods_ft_expertinfo
      .write
      .mode(SaveMode.Overwrite)
      .partitionBy("etldate")
      .saveAsTable("ods.ft_expertinfo")

    val ft_expertinfo = spark.table("ods.ft_expertinfo")
    ft_expertinfo.orderBy(col("Id").desc).show()
    println("===================一共有多少数据=========="+ft_expertinfo.count()+"=============")

    //读取mysql全量表
    val df1 = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://node3:3306/ft?useSSL=false")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "ft_remarktags")
      .load()

    //获取全量表字段
    val ft_remarktags = df1.schema.map(x => x.name + " " + x.dataType.sql).mkString(",")

    spark.sql("drop table if exists ods.ft_remarktags")

    val create_ods_ft_remarktags=
      s"""
         |create table if not exists ods.ft_remarktags(
         |$ft_remarktags
         |)
         |partitioned by(data string)
         |row format delimited fields terminated by '\t'
         |""".stripMargin

    spark.sql(create_ods_ft_remarktags)
    spark.sql("use ods")
    spark.sql("show tables").show()
  }

}
object DstreamWard {
  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.functions.lit
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    import org.joda.time.LocalDate

    object cleaning2 {
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME","root")
        val spark: SparkSession = SparkSession.builder()
          .master("local[*]")
          .enableHiveSupport()
          .config("hive.metastore.uris", "thrift://192.168.17.30:9083")
          .getOrCreate()

        //TODO 读取ods层数据
        val cs_mysql: DataFrame = spark.table("ods.csdate")
        cs_mysql.distinct()

        val df1: DataFrame = cs_mysql.withColumn("dwd_insert_user",lit("user1"))
        val df2 = df1.withColumn("dwd_insert_time",lit(LocalDate.now().toString))
        val df3 = df2.withColumn("dwd_modify_user",lit("user1"))
        val df4 = df3.withColumn("dwd_modify_time",lit(LocalDate.now().toString))

        df4.createTempView("datetable")

        val cs_dwd_table: DataFrame = spark.sql(
          """
            |select num,name,date_format(etime,'yyyy-MM-dd HH:mm:ss') as etime,etldate,dwd_insert_user,
            |date_format(dwd_insert_time,'yyyy-MM-dd HH:mm:ss') as dwd_insert_time,dwd_modify_user,
            |date_format(dwd_modify_time,'yyyy-MM-dd HH:mm:ss') as dwd_modify_time from datetable
            |""".stripMargin)

        cs_dwd_table.write.partitionBy("etldate").mode(SaveMode.Overwrite).saveAsTable("dwd.csdate")
      }
    }
  }
}

动物装饰