Azure Databricks 支持使用 JDBC 连接到外部数据库。 本文提供了用于配置和使用这些连接的基本语法以及 Python、SQL 和 Scala 示例。
Partner Connect 提供优化的集成,用于与很多外部数据源同步数据。 请参阅
什么是 Databricks Partner Connect?
。
本文中的示例不包括 JDBC URL 中的用户名和密码。 Databricks 建议使用
机密
来存储数据库凭据。 例如: 。
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
若要使用 SQL 引用 Databricks 机密,必须在群集初始化过程中配置 Spark 配置属性。
若要查看机密管理的完整示例,请参阅机密工作流示例。
使用 JDBC 读取数据
必须配置一些设置才能使用 JDBC 读取数据。 请注意,每个数据库对 <jdbc_url> 使用不同的格式。
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "<table_name>",
user '<username>',
password '<password>'
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark 会自动从数据库表中读取架构,并将其类型映射回 Spark SQL 类型。
Python
employees_table.printSchema
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
可针对此 JDBC 表运行查询:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
使用 JDBC 写入数据
通过 JDBC 将数据保存到表的操作使用与读取操作类似的配置。 请参阅以下示例:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "<table_name>",
user '<username>',
password '<password>'
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
默认行为会尝试创建一个新表,如果同名的表已存在,则会引发错误。
可以使用以下语法将数据追加到现有表:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "<table_name>",
user '<username>',
password '<password>'
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
可以使用以下语法覆盖现有表:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "<table_name>",
user '<username>',
password '<password>'
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
控制 JDBC 查询的并行度
默认情况下,JDBC 驱动程序仅以单个线程查询源数据库。 若要提高读取性能,需要指定多个选项来控制 Azure Databricks 对数据库进行的并行查询数。 对于小型群集,请将 numPartitions 选项设置为群集中的执行程序核心数,这样可确保所有节点并行查询数据。
在大型群集上将 numPartitions 设置为较高的值可能会导致远程数据库的性能下降,因为同时进行过多的查询可能会使服务不堪重负。 这对于应用数据库尤其麻烦。 在将该值设置为 50 以上时需要非常谨慎。
通过为 partitionColumn 选择在源数据库中计算索引的列来加快查询速度。
以下代码示例演示如何为具有 8 个核心的群集配置并行度:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition_key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min_value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max_value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "<table_name>",
user '<username>',
password '<password>',
partitionColumn "<partition_key>",
lowerBound "<min_value>",
upperBound "<max_value>",
numPartitions 8
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition_key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min_value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max_value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Azure Databricks 支持所有用于配置 JDBC 的 Apache Spark 选项。
使用 JDBC 写入数据库时,Apache Spark 使用内存中的分区数来控制并行度。 可以在进行写入之前重新分区数据以控制并行度。 请避免在大型群集上设置数量过多的分区,以防远程数据库负担过重。 以下示例演示如何在进行写入之前重新分区(8 个分区):
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "<table_name>",
user '<username>',
password '<password>'
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<new_table_name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
将查询向下推送到数据库引擎
可将整个查询向下推送到数据库,且只返回结果。 table 参数标识要读取的 JDBC 表。 可使用 SQL 查询 FROM 子句中有效的任何内容。
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
控制每个查询提取的行数
JDBC 驱动程序有一个 fetchSize 参数,它控制一次从远程数据库中提取的行数。
每列中的字符串返回要多长时间?
系统的默认值可能很小,可以通过调整来进行优化。 例如:Oracle 的默认值 fetchSize 为 10。 将其增加到 100 可将需要执行的总查询数减少至 10 分之一。 JDBC 结果是网络流量,因此请避免使用非常大的数字,不过对于许多数据集,最佳值可能在数千左右。
使用 fetchSize 选项,如以下示例所示:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc_url>",
dbtable "<table_name>",
user '<username>',
password '<password>'.
fetchSize 100
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()