Databricks 建议对包含数千个文件的数据源使用
COPY INTO
命令进行增量和批量数据加载。 Databricks 建议在高级用例中使用
自动加载程序
。
在本教程中,你将使用
COPY INTO
命令将数据从云对象存储加载到 Azure Databricks 工作区的表中。
一个 Azure 订阅、该订阅中的一个 Azure Databricks 工作区以及该工作区中的一个群集。 要创建这些内容,请参阅
快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业
。 如果按照此快速入门中的说明操作,则无需按照“运行 Spark SQL 作业”部分中的说明操作。
工作区中运行 Databricks Runtime 11.0 或更高版本的全用途
群集
。 若要创建通用群集,请参阅
创建群集
。
熟悉 Azure Databricks 工作区用户界面。 请参阅
浏览工作区
。
熟悉如何使用
Databricks 笔记本
。
可将数据写入到此位置;此演示使用 DBFS 根作为示例,但 Databricks 建议使用 Unity 目录配置的外部存储位置。
本教程假定基本熟悉 Azure Databricks 和默认工作区配置。 如果无法运行提供的代码,请联系工作区管理员,确保有权访问计算资源和可以写入数据的位置。
请注意,提供的代码使用参数
source
来指定将配置为
COPY INTO
数据源的位置。 编写后,此代码指向 DBFS 根目录中的位置。 如果对外部对象存储位置具有写入权限,请将源字符串
dbfs:/
部分替换为对象存储的路径。 由于此代码块还会执行递归删除来重置此演示,因此请确保不要将此数据指向生产数据,并且保留
/user/{username}/copy-into-demo
嵌套目录以避免覆盖或删除现有数据。
创建一个新的 SQL 笔记本
并
将其附加到运行 Databricks Runtime 11.0 或更高版本的群集
。
复制并运行以下代码以重置本教程中使用的存储位置和数据库:
%python
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"copyinto_{username}_db"
source = f"dbfs:/user/{username}/copy-into-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
dbutils.fs.rm(source, True)
复制并运行以下代码以配置一些用于随机生成数据的表和函数:
-- Configure random data generator
CREATE TABLE user_ping_raw
(user_id STRING, ping INTEGER, time TIMESTAMP)
USING json
LOCATION ${c.source};
CREATE TABLE user_ids (user_id STRING);
INSERT INTO user_ids VALUES
("potato_luver"),
("beanbag_lyfe"),
("default_username"),
("the_king"),
("n00b"),
("frodo"),
("data_the_kid"),
("el_matador"),
("the_wiz");
CREATE FUNCTION get_ping()
RETURNS INT
RETURN int(rand() * 250);
CREATE FUNCTION is_active()
RETURNS BOOLEAN
RETURN CASE
WHEN rand() > .25 THEN true
ELSE false
步骤 2:将示例数据写入云存储
Azure Databricks 上很少写入 Delta Lake 以外的数据格式。 此处提供的代码将写入 JSON,模拟外部系统,该系统可能会将另一个系统的结果转储到对象存储中。
复制并运行以下代码以编写一批原始 JSON 数据:
-- Write a new batch of data to the data source
INSERT INTO user_ping_raw
SELECT *,
get_ping() ping,
current_timestamp() time
FROM user_ids
WHERE is_active()=true;
步骤 3:使用 COPY INTO 以幂等形式加载 JSON 数据
必须先创建目标 Delta Lake 表,然后才能使用 COPY INTO。 在 Databricks Runtime 11.0 及更高版本中,无需在 CREATE TABLE 语句中提供表名以外的任何内容。 对于以前版本的 Databricks Runtime,必须在创建空表时提供架构。
复制并运行以下代码以创建目标 Delta 表并从源加载数据:
-- Create target table and load data
CREATE TABLE IF NOT EXISTS user_ping_target;
COPY INTO user_ping_target
FROM ${c.source}
FILEFORMAT = JSON
FORMAT_OPTIONS ("mergeSchema" = "true")
COPY_OPTIONS ("mergeSchema" = "true")
由于此操作是幂等的,因此可以多次运行该操作,但数据只会加载一次。
步骤 4:预览表的内容
可以运行简单的 SQL 查询来手动查看此表的内容。
复制并执行以下代码以预览表:
-- Review updated table
SELECT * FROM user_ping_target
步骤 5:加载更多数据和预览结果
可以多次重新运行步骤 2-4,以将新批随机原始 JSON 数据加载到源中,并用 COPY INTO 以幂等方式将其加载到 Delta Lake,并预览结果。 尝试无序或多次运行这些步骤,以模拟正在写入多批原始数据或在没有新数据到达的情况下多次执行 COPY INTO。
步骤 6:清理教程
完成本教程后,如果不再需要保留相关资源,可以在工作区中清理这些资源。
复制并运行以下代码以删除数据库、表和删除所有数据:
%python
# Drop database and tables and remove data
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)
若要停止计算资源,请转到“群集”选项卡并终止群集。
COPY INTO 参考文章