熟悉 Kafka 主题的创建。 有关详细信息,请参阅
Apache Kafka on HDInsight 快速入门
文档。
本文档中的步骤需要一个包含 Spark on HDInsight 和 Kafka on HDInsight 群集的 Azure 资源组。 这些群集都位于一个 Azure 虚拟网络中,这样 Spark 群集便可与 Kafka 群集直接通信。
为方便起见,本文档链接到了一个模板,该模板可创建所有所需 Azure 资源。
有关在虚拟网络中使用 HDInsight 的详细信息,请参阅
为 HDInsight 规划虚拟网络
文档。
将结构化流式处理与 Apache Kafka 配合使用
Spark 结构化流式处理是建立在 Spark SQL 引擎上的流处理引擎。 使用结构化流式处理时,可以使用与编写批处理查询相同的方式来编写流式处理查询。
以下代码片段演示了从 Kafka 读取数据并存储到文件。 第一个为批处理操作,而第二个为流式处理操作:
// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load()
// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
.write
.format("parquet")
.option("path","/example/batchtripdata")
.option("checkpointLocation", "/batchcheckpoint")
.save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
.writeStream
.format("parquet")
.option("path","/example/streamingtripdata")
.option("checkpointLocation", "/streamcheckpoint")
.start.awaitTermination(30000)
在这两个代码片段中,从 Kafka 读取数据并写入文件。 示例之间的区别如下:
流式处理操作还使用 awaitTermination(30000)
,这会在 30,000 毫秒后停止流。
若要将结构化流式处理与 Kafka 配合使用,项目必须具有针对 org.apache.spark : spark-sql-kafka-0-10_2.11
包的依赖项。 此包的版本应与 Spark on HDInsight 的版本相匹配。 对于 Spark 2.4(在 HDInsight 4.0 中可用),可以在 https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar 中找到各种项目类型的依赖项信息。
对于在本教程中使用的 Jupyter Notebook,以下单元格会加载此包依赖项:
%%configure -f
"conf": {
"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
Apache Kafka on HDInsight 不提供通过公共 Internet 访问 Kafka 中转站的权限。 使用 Kafka 的任何项都必须位于同一 Azure 虚拟网络中。 在本教程中,Kafka 和 Spark 群集位于同一 Azure 虚拟网络。
下图显示通信在 Spark 和 Kafka 之间的流动方式:
Kafka 服务仅限于虚拟网络内的通信。 通过 Internet 可访问群集上的其他服务,例如 SSH 和 Ambari。 有关可用于 HDInsight 的公共端口的详细信息,请参阅 HDInsight 使用的端口和 URI。
若要创建 Azure 虚拟网络,然后在其中创建 Kafka 和 Spark 群集,请使用以下步骤:
使用以下按钮登录到 Azure,并在 Azure 门户中打开模板。
Azure 资源管理器模板位于 https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json 。
此模板可创建以下资源:
HDInsight 4.0 或 5.0 群集上的 Kafka。
HDInsight 4.0 或 5.0 群集上的 Spark 2.4 或 3.1。
包含 HDInsight 群集的 Azure 虚拟网络。
本教程使用的结构化流式处理笔记本需要 HDInsight 4.0 或 5.0 上的 Spark 2.4 或 3.1。 如果使用早期版本的 Spark on HDInsight,则使用笔记本时会收到错误消息。
使用 Spark 结构化流式处理
本示例演示了如何将 Spark 结构化流式处理与 Kafka on HDInsight 配合使用。 它使用纽约市提供的出租车行程数据。 此笔记本使用的数据集来自 2016 绿色出租车行程数据。
收集主机信息。 使用下面的 curl 和 jq 命令获取 Kafka ZooKeeper 主机和代理主机信息。 这些命令设计用于 Windows 命令提示符,在其他环境中需要进行细微的更改。 将 KafkaCluster
替换为 Kafka 群集的名称,并将 KafkaPassword
替换为群集登录密码。 另外,将 C:\HDI\jq-win64.exe
替换为 jq 安装的实际路径。 在 Windows 命令提示符中输入命令,然后保存输出,以便在后续步骤中使用。
REM Enter cluster name in lowercase
set CLUSTERNAME=KafkaCluster
set PASSWORD=KafkaPassword
curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
在 Web 浏览器中,导航到 https://CLUSTERNAME.azurehdinsight.net/jupyter
,其中 CLUSTERNAME
是群集的名称。 出现提示时,输入创建群集时使用的群集登录名(管理员)和密码。
选择“新建”>“Spark”,创建一个笔记本。
Spark 流式处理具有微型批处理,这意味着数据是成批传入的,而执行程序则对这批数据运行。 如果执行程序的空闲超时少于处理批处理所需的时间,则将不断添加和删除执行程序。 如果执行程序的空闲超时大于批处理持续时间,则不会删除执行程序。 因此,我们建议你在运行流式处理应用程序时通过将 spark.dynamicAllocation.enabled 设置为 false 来禁用动态分配。
加载供 Notebook 使用的包,方法是在 Notebook 单元格中输入以下信息。 使用 CTRL + ENTER 运行该命令。
%%configure -f
"conf": {
"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
"spark.dynamicAllocation.enabled": false
创建 Kafka 主题。 编辑以下命令,将 YOUR_ZOOKEEPER_HOSTS
替换为在第一步提取的 Zookeeper 主机信息。 在 Jupyter Notebook 中输入编辑的命令,创建 tripdata
主题。
%%bash
export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS"
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
检索出租车行程数据。 在下一单元格中输入此命令,加载纽约市出租车行程数据。 先将数据加载到数据帧中,然后将数据帧作为单元格输出显示。
import spark.implicits._
// Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
val result = scala.io.Source.fromURL(url).mkString
// Create a dataframe from the JSON data
val taxiDF = spark.read.json(Seq(result).toDS)
// Display the dataframe containing trip data
taxiDF.show()
设置 Kafka 代理主机信息。 将 YOUR_KAFKA_BROKER_HOSTS
替换为在步骤 1 中提取的代理主机信息。 在下一 Jupyter Notebook 单元格中输入编辑的命令。
// The Kafka broker hosts and topic used to write to Kafka
val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
val kafkaTopic="tripdata"
println("Finished setting Kafka broker and topic configuration.")
将数据发送到 Kafka。 在以下命令中,vendorid
字段用作 Kafka 消息的键值。 将数据分区时,此键供 Kafka 使用。 所有字段都作为 JSON 字符串值存储在 Kafka 消息中。 在 Jupyter 中输入以下命令,使用批量查询将数据保存到 Kafka。
// Select the vendorid as the key and save the JSON string as the value.
val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
println("Data sent to Kafka")
声明一个架构。 以下命令演示了从 Kafka 读取 JSON 数据时如何使用架构。 在下一 Jupyter 单元格中输入此命令。
// Import bits useed for declaring schemas and working with JSON data
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
// Define a schema for the data
val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
// Reproduced here for readability
//val schema = (new StructType)
// .add("dropoff_latitude", StringType)
// .add("dropoff_longitude", StringType)
// .add("extra", StringType)
// .add("fare_amount", StringType)
// .add("improvement_surcharge", StringType)
// .add("lpep_dropoff_datetime", StringType)
// .add("lpep_pickup_datetime", StringType)
// .add("mta_tax", StringType)
// .add("passenger_count", StringType)
// .add("payment_type", StringType)
// .add("pickup_latitude", StringType)
// .add("pickup_longitude", StringType)
// .add("ratecodeid", StringType)
// .add("store_and_fwd_flag", StringType)
// .add("tip_amount", StringType)
// .add("tolls_amount", StringType)
// .add("total_amount", StringType)
// .add("trip_distance", StringType)
// .add("trip_type", StringType)
// .add("vendorid", StringType)
println("Schema declared")
选择数据并启动流。 以下命令演示了如何使用批处理查询从 Kafka 检索数据。 然后,将输出结果写入到 Spark 群集上的 HDFS 中。 在此示例中,select
从 Kafka 检索消息(值字段),然后为其应用架构。 然后,将数据以 parquet 格式写入 HDFS(WASB 或 ADL)。 在下一 Jupyter 单元格中输入此命令。
// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
// Select data and write to file
val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
println("Wrote data to file")
可以在下一 Jupyter 单元格中输入此命令,验证这些文件是否已创建。 它会在 /example/batchtripdata
目录中列出文件。
%%bash
hdfs dfs -ls /example/batchtripdata
上一示例使用了批量查询,而以下命令则演示如何使用流式处理查询执行相同的操作。 在下一 Jupyter 单元格中输入此命令。
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
println("Wrote data to file")
运行以下单元格,验证是否已通过流式处理查询写入这些文件。
%%bash
hdfs dfs -ls /example/streamingtripdata
若要清理本教程创建的资源,可以删除资源组。 删除资源组还会删除关联的 HDInsight 群集, 以及与该资源组关联的任何其他资源。
若要使用 Azure 门户删除资源组,请执行以下操作:
在 Azure 门户中,展开左侧的菜单以打开服务菜单,然后选择“资源组”以显示资源组列表。
找到要删除的资源组,然后右键单击列表右侧的“更多”按钮 (...)。
选择“删除资源组”,然后进行确认。
HDInsight 群集计费在创建群集之后便会开始,删除群集后才会停止。 HDInsight 群集按分钟收费,因此不再需要使用群集时,应将其删除。
删除 Kafka on HDInsight 群集会删除存储在 Kafka 中的任何数据。