使用 IBM IOP 4.3 中的 Apache Spark SQL 处理 Solr 数据

Apache Spark 的一个关键优势是它在处理各种不同类型的数据源和格式方面的能力和灵活性,这些数据包括从文本和 CSV 等非结构化数据到关系数据库等结构明确的数据。另一方面,Apache Solr 是一个构建于 Apache Lucene 之上的快速企业搜索平台。Apache Spark 和 Apache Solr 都是 IBM Open Platform (IOP) 的关键部分。

Spark 的最大挑战之一,也是 Spark 的最大用例之一,是使用无数不同的数据源和格式来构建可靠且可扩展的 ETL 管道。在这篇博客中,我们将讨论如何使用开源的 LucidWorks Spark-Solr Connector 将 Solr 数据读入 Spark Dataframe 中,然后可以使用 Spark SQL 或 Dataframe API 对这些数据进行查询、转换和处理。

前提条件:
– 已安装 IBM Open Platform 4.3(目前为技术预览版)且所有组件都运行正常。
– 从 Github 将开源的 LucidWorks Spark-Solr Connector v3.0.0 beta jar 文件下载到您将运行 Spark Shell 的主机。

wget http://repo1.maven.org/maven2/com/lucidworks/spark/spark-solr/3.0.0-beta/spark-solr-3.0.0-beta-shaded.jar

创建一个 Solr 集合:
Apache Solr 附带了一个电影数据集样本,我们将使用它填充一个样本集合。有关该数据集的一些信息:

此数据包含以下字段:
* “id” – 电影的唯一标识符
* “name” – 电影的名称
* “directed_by” – 电影的导演
* “initial_release_date” – 电影在任何国家的最早官方上映日期
* “genre” – 电影所属的流派

要使用 data_driven_schema_configs 配置集创建一个名为“movies”且包含 1 个分片的 Solr 集合,并将复制系数设置为 1,请导航到 /usr/iop/current/solr-server 目录并以用户“solr”(Solr 的默认用户)的身份运行以下命令:

[solr@yourhost solr-server]$ bin/solr create -c movies

Connecting to ZooKeeper at yourhostname1.com:2181, yourhostname2.com:2181, yourhostname3.com:2181/solr ...
Uploading /usr/iop/current/solr-server/server/solr/configsets/data_driven_schema_configs/conf for config films to ZooKeeper at yourhostname1.com:2181, yourhostname2.com:2181, yourhostname3.com:2181/solr

Creating new collection 'movies' using command:
http://localhost:8983/solr/admin/collections?action=CREATE&name=movies&numShards=1&replicationFactor=1&maxShardsPerNode=1&collection.configName=movies

{
"responseHeader":{
"status":0,
"QTime":6247},
"success":{"172.16.167.243:8983_solr":{
"responseHeader":{
"status":0,
"QTime":1793},
"core":"movies_shard1_replica1"}}}

需要注意的是,有时,在 Solr 尝试根据数据来推断模式时,可能并不总是能够正确解释该数据。在我们的例子中,需要发出以下命令来更新模式:

curl http://localhost:8983/solr/films/schema -X POST -H 'Content-type:application/json' --data-binary '{
"add-field" : {
"name":"name",
"type":"text_general",
"multiValued":false,
"stored":true
},
"add-field" : {
"name":"initial_release_date",
"type":"tdate",
"stored":true
}
}'

要为结果建立索引并将它们存储在集合“movies”中,请导航到 /usr/iop/current/solr-server 目录,并以用户“solr”的身份发出 post 命令:

[solr@yourhost solr-server]$ bin/post -c movies example/films/films.json

使用 LucidWorks Spark-Solr Connector v3.0.0 beta jar 启动 Spark Shell:

spark-shell --jars spark-solr-3.0.0-beta-shaded.jar
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

运行以下 scala 代码行,以便连接 Solr 并从“movies”集合加载数据:

scala> val options = Map("collection" -> "movies", "zkhost" -> "your-solr-host:2181/solr")

scala> val df = spark.read.format("solr").options(options).load

验证我们已从 Solr 成功加载整个数据集:

scala> df.count()
res0: Long = 1100

scala> df.show(5)
+--------------+---------------+-------------------+--------------------+----------------+
|   directed_by|          genre|                 id|initial_release_date|            name|
+--------------+---------------+-------------------+--------------------+----------------+
|    John Lafia|  Disaster Film|/en/10_5_apocalypse|2006-03-17 16:00:...|10.5: Apocalypse|
|Robert Moresco|        Mystery|      /en/10th_wolf|2006-08-17 17:00:...| 10th & Wolf    |
|   Gary Winick|Romantic comedy| /en/13_going_on_30|2004-04-13 17:00:...|  13 Going on 30|
| John Herzfeld|       Thriller|     /en/15_minutes|2001-02-28 16:00:...|      15 Minutes|
|    Aparna Sen|       Art film| /en/15_park_avenue|2005-10-26 17:00:...|  15 Park Avenue|
+--------------+---------------+-------------------+--------------------+----------------+
only showing top 5 rows

scala> df.printSchema()
root
 |-- directed_by: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- id: string (nullable = false)
 |-- initial_release_date: timestamp (nullable = true)
 |-- name: string (nullable = true)

现在,您可以使用 Spark SQL 来查询数据:

scala> df.createOrReplaceTempView("movies")

scala> spark.sql("SELECT name FROM movies WHERE directed_by = 'Ridley Scott'").show()
+-----------------+
|             name|
+-----------------+
|  Black Hawk Down|
|        Gladiator|
|         Hannibal|
|      A Good Year|
|American Gangster|
+-----------------+

也可以使用 Dataframe API 来获得相同的结果:

scala> df.filter($"directed_by".like("Ridley Scott")).select("name").show()
+-----------------+
|             name|
+-----------------+
|  Black Hawk Down|
|        Gladiator|
|         Hannibal|
|      A Good Year|
|American Gangster|
+-----------------+

结束语

Apache Spark 提供了一个强大的、多用途的框架来处理大量不同的数据源和数据格式,我希望我已在这篇文章中向您展示了如何从 Solr 摄取/加载数据。像 Spark Dataframe 一样,Solr 是一个关键的开源企业级搜索引擎。它们使您能结合使用来自其他企业系统(从 DB2 到 Hive,再到实时流数据)的数据来执行大规模高级数据分析。在 Solr 数据上使用 Spark 的另一个优势是,能够使用 Spark 支持的任何语言。例如,Solr 没有原生的 R 接口,但借助 Spark,数据科学家能无缝地在 R 作业中利用 Solr 数据。

本文翻译自 Processing Solr data with Apache Spark SQL in IBM IOP 4.3(2017-03-21)

The post 使用 IBM IOP 4.3 中的 Apache Spark SQL 处理 Solr 数据 appeared first on developerWorks Developer Center -- 中国(Beta).