[Advaita-l] localcic

GR Vishwanath grv144 at gmail.com
Wed Oct 11 14:28:12 EDT 2017


package util

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import dataxu.etl.config.ETLProperties
import dataxu.etl.spark.{SparkETL, SparkModule, SparkSessionProvider}
import dataxu.etl.spark.workflow.CICRefreshETLWorkflow

/**
  * Test utility for running an impressions ETL workflow locally.
Modify the input vals in order
  * to tweak input to the runner (e.g. impressionsInputManifestPath,
swapOutputPath, etc.)
  */
object LocalCICRefreshETLWorkflowRunner {

  def main(args: Array[String]): Unit = {
    System.setProperty("spark.dataxu.environment", "test")

    val basePath =
s"file://${System.getProperty("user.home")}/test/rwh-etl/data/"
    val CICRefreshETLOutputPath = s"${basePath}out/cicrefresh"

    val session = buildSparkSession2()


    import dataxu.etl.spark.SparkModule.sparkSessionProvider.sparkSession
    //sparkSession.sql("select object_uid, brand_safety_level,
brand_safety_level_description,'CAMP' from dx_campaign limit 10
").collect().foreach(println)

    //sparkSession.sql("select campaign_cost_model,count(*)  from
dx_campaign group by 1 order by 2 desc limit 20
").collect().foreach(println)
   // sparkSession.sql("select * from rpt_flight_dimensions limit 20
").collect().foreach(println)

    sparkSession.sql("select * from
dx_flight_content_channel").collect().foreach(println)

    sparkSession.sql("select * from FAIL").collect().foreach(println)



    new CICRefreshETLWorkflow(
      new SparkETL(new SparkModule {
        override lazy val sparkSessionProvider: SparkSessionProvider =
new SparkSessionProvider {
          override def sparkSession: SparkSession = session
        }
      }),
      CICRefreshETLOutputPath = CICRefreshETLOutputPath,
      queryLimit = Some(20000),
      numParallelProcesses = 1,
      addResultsToMetastore = true,
      properties = ETLProperties())
      .runETLWorkflow()
  }


  private def buildSparkSession2(): SparkSession = {
    val conf = new SparkConf(true)
    conf.setMaster("local[4]")
    conf.setAppName("Local Impressions ETL")

    conf.set("spark.ui.enabled", "false")
    conf.set(
      "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version",
      "2")
    conf.set("spark.sql.catalogImplementation", "hive")
    conf.set(  "spark.hadoop.javax.jdo.option.ConnectionURL",
      "jdbc:mariadb://warehouse-etl-metastore-t-rdshivemetastorecluster-1d18t23yjdjch."
+
        "cluster-cvyprmjkfqcg.us-east-1.rds.amazonaws.com"
        + ":3306/hive")
    conf.set("spark.hadoop.javax.jdo.option.ConnectionDriverName",
"org.mariadb.jdbc.Driver")

    conf.set("spark.hadoop.javax.jdo.option.ConnectionUserName", "hive")
    conf.set("spark.hadoop.javax.jdo.option.ConnectionPassword",
"thaes(oo3pheiRio,nei")

    conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

    conf.set("fs.s3.awsAccessKeyId", "AKIAI7OGCK3Y2QEOQ2CA")
    conf.set("fs.s3.awsSecretAccessKey",
"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")
    val sessionBuilder = SparkSession.builder()
    sessionBuilder.enableHiveSupport()
    sessionBuilder.config(conf).getOrCreate()
  }

  private def buildSparkSession(): SparkSession = {
    val conf = new SparkConf(true)
    conf.setMaster("local[4]")
    conf.setAppName("Local Impressions ETL")

    conf.set("spark.ui.enabled", "false")
    conf.set(
      "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version",
      "2")
    val sessionBuilder = SparkSession.builder()
    sessionBuilder.enableHiveSupport()
    sessionBuilder.config(conf).getOrCreate()
  }



}


More information about the Advaita-l mailing list