在 YARN 上使用 Oozie 调度用 Java 或 Scala 编写的 SparkSQL 或 SparkML 作业

Apache Oozie 是一个用于管理 Apache Hadoop 作业的工作流调度程序。Oozie 将多个作业作为一个有向非循环操作图 (DAG),按顺序组合到一个逻辑工作单元中。Oozie 可靠、可伸缩、可扩展且与 Hadoop 堆栈紧密集成,并使用 YARN 作为其架构中心。它开箱即用地提供了多种类型的 Hadoop 作业,比如 Java map-reduce、Pig、Hive、Sqoop、SSH 和 DistCp,还提供了特定于系统的作业,比如 Java 程序和 shell 脚本。

Apache Spark 是一个快速的通用集群计算系统。它在 Java、Scala、Python 和 R 中提供了高级 API,还提供了一个支持一般执行图的经过优化的引擎。它还支持大量更高级的工具,其中包括用于结构化数据处理的 Spark SQL for SQL、用于机器学习的 MLlib、用于图形处理的 GraphX,以及 Spark Streaming。基于 Hadoop YARN 的架构为 Spark 共享通用的集群和数据集提供了基础。

本文是系列文章的第 2 部分,该系列文章将展示如何使用 Oozie 在 YARN 上调度各种 Spark 应用程序(用 Python、SparkR、SystemML、Scala 和 SparkSQL 编写)。第 2 部分将重点介绍 SparkSQL 和 SparkML 与 Oozie 的结合使用。Spark 版本为 2.1.0,Oozie 版本为 4.3.0。

考虑一个使用 Spark Scala API 编写的简单的 SpakrSQL 应用程序。下面的示例使用 SparkSQL 来查询存储在某个文件中的结构化数据。本文的结尾部分会给出一个完整的程序清单。

请注意,以下两个数据文件(此应用程序使用了它们)必须放在 HDFS 中的 /user/${wf:user()}/examples/src/main/resources/ 下。

# vi people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

# vi people.txt 
Michael, 29
Andy, 30
Justin, 19

1.创建一个工作流定义 (workflow.xml)。下面这个简单的工作流定义执行了一个 Spark 作业:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkSQL'>
    <start to='spark-node' />
    <action name='spark-node'>
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>${master}</master>
            <name>Spark-SQL</name>
              <class>com.ibm.biginsights.oozie.examples.SparkSQLExample</class>
           <jar>${nameNode}/user/${wf:user()}/${examplesRoot}/apps/sparksql/lib/spark-examples.jar</jar>
           <spark-opts>--conf spark.driver.extraJavaOptions=-Diop.version=4.3.0.0 --conf spark.yarn.archive=hdfs://nn:8020/iop/apps/4.3.0.0-0000/spark2/spark2-iop-yarn-archive.tar.gz</spark-opts>
            <arg>2</arg>
        </spark>
        <ok to="end" />
        <error to="fail" />
    </action>
    <kill name="fail">
        <message>Workflow failed, error
            message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>
    <end name='end' />
</workflow-app>

Oozie Spark workflow.xml 中的一些元素的定义如下:

master 元素指定了 Spark Master 的 URL;例如 spark://host:port、mesos://host:port、yarn-cluster、yarn-master 或 local。对于 Spark on YARN 模式,可以在 master 元素中指定 yarn-client 或 yarn-cluster。在这个例子中,master=yarn-cluster。 name 元素指定了 Spark 应用程序的名称。 jar 元素指定了一个逗号分隔的 JAR 文件列表。 spark-opts 元素(如果存在)包含一个可通过指定“-conf key=value”传递给 Spark 驱动程序的 Spark 配置选项列表。 arg 元素包含可传递给 Spark 应用程序的参数。

要获得关于 Oozie 中的 Spark XML 模式的详细信息,请参阅 https://oozie.apache.org/docs/4.3.0/DG_SparkActionExtension.html

2.创建一个 Oozie 作业配置 (job.properties):

nameNode=hdfs://nn:8020
jobTracker=rm:8050
master=yarn-cluster
queueName=default
examplesRoot=examples
oozie.use.system.libpath=true
oozie.wf.application.path=/user/oozie/examples/apps/sparksql

3.创建一个 Oozie 应用程序目录。创建一个包含工作流定义和资源的应用程序目录结构,如下面的示例所示:

+-~/sparksql/
  +-job.properties
  +-workflow.xml
  +-lib/
    +-spark-example.jar

spark-example.jar 文件包含 Spark 应用程序。

4.将该应用程序复制到 HDFS。将 sparksql/ 目录复制到 HDFS 中的用户主目录。确保 HDFS 中的 sparksql 位置与 job.properties 中的 oozie.wf.application.path 值相匹配。

$ hadoop fs -put sparksql/ /user/oozie/examples/apps

5.运行示例作业。

1.运行以下命令来提交 Oozie 作业:

$ cd ~/sparksql
$ oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run
job: 0000005-170413230214295-oozie-oozi-W


2.检查工作流作业状态:

$ oozie job -oozie http://oozie-host:11000/oozie -info 0000005-170413230214295-oozie-oozi-W

Job ID : 0000005-170413230214295-oozie-oozi-W
-------------------------------------------------------------------------------
Workflow Name : SparkSQL
App Path      : /user/oozie/examples/apps/sparksql
Status        : SUCCEEDED
Run           : 0
User          : oozie
Group         : -
Created       : 2017-04-25 02:28 GMT
Started       : 2017-04-25 02:28 GMT
Last Modified : 2017-04-25 02:29 GMT
Ended         : 2017-04-25 02:29 GMT
CoordAction ID: -

Actions
------------------------------------------------------------------------------------------------------------------------
ID                                                                            Status    Ext ID                 Ext Status Err Code  
------------------------------------------------------------------------------------------------------------------------
0000005-170413230214295-oozie-oozi-W@:start:                                  OK        -                      OK         -         
------------------------------------------------------------------------------------------------------------------------
0000005-170413230214295-oozie-oozi-W@spark-node                               OK        job_1492070316231_0015 SUCCEEDED  -         
------------------------------------------------------------------------------------------------------------------------
0000005-170413230214295-oozie-oozi-W@end                                      OK        -                      OK         -         
------------------------------------------------------------------------------------------------------------------------

完整的 Scala 程序

object SparkSQLExample {
  case class Person(name: String, age: Long)
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    import spark.implicits._
    runBasicDataFrameExample(spark)
    runDatasetCreationExample(spark)
    runInferSchemaExample(spark)
    runProgrammaticSchemaExample(spark)
    spark.stop()
  }
  private def runBasicDataFrameExample(spark: SparkSession): Unit = {
    val df = spark.read.json("examples/src/main/resources/people.json")
    df.show()
    import spark.implicits._
    df.printSchema()
    df.select("name").show()
    df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
    df.groupBy("age").count().show()
    df.createOrReplaceTempView("people")
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()
    df.createGlobalTempView("people")
    spark.sql("SELECT * FROM global_temp.people").show()
    spark.newSession().sql("SELECT * FROM global_temp.people").show()
  }
  private def runDatasetCreationExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    val path = "examples/src/main/resources/people.json"
    val peopleDS = spark.read.json(path).as[Person]
    peopleDS.show()
  }
  private def runInferSchemaExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val peopleDF = spark.sparkContext
      .textFile("examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.createOrReplaceTempView("people")
    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
    teenagersDF.map(teenager => "Name: " + teenager(0)).show()
    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
    teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
  }
  private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
    import spark.implicits._
    val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
    val schemaString = "name age"
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val rowRDD = peopleRDD
      .map(_.split(","))
      .map(attributes => Row(attributes(0), attributes(1).trim))
    val peopleDF = spark.createDataFrame(rowRDD, schema)
    peopleDF.createOrReplaceTempView("people")
    val results = spark.sql("SELECT name FROM people")
    results.map(attributes => "Name: " + attributes(0)).show()
  }
}

SparkML 作业

也可以使用 Oozie Spark 操作来调度 SparkML 作业。执行此过程的步骤与调度 SparkSQL 作业的步骤类似,但该程序使用了一个 SparkML 库。也可以使用 Java 来编写该程序,将 Java 类名称放在 Oozie 工作流定义中。

更多资源:
Scheduling a Spark job written in PySpark or SparkR on YARN with Oozie

本文翻译自 Scheduling a SparkSQL or SparkML job written in Java or Scala on YARN with Oozie(2017-07-03)

The post 在 YARN 上使用 Oozie 调度用 Java 或 Scala 编写的 SparkSQL 或 SparkML 作业 appeared first on developerWorks Developer Center -- 中国(Beta).