本文介绍如何使用Databricks 读写阿里云RDS(SQL Server)数据源数据。

前提条件

使用Databricks 读写SQL Server数据

  1. DDI集群与SQL Server实例网络打通。

    1. 登录RDS管理控制台 RDS管理控制台

    2. 点击右侧导航栏 实例列表 选择实例所在 region

    3. 点击实例ID进入实例详情页面

    4. 点击实例详情右侧导航栏 数据库连接

    5. 如图所示查看RDS实例所在的 VPC VSwitch data

    6. 登录到Databricks数据洞察集群 阿里云Databricks控制台

    7. 选择集群所在region进入 集群 列表

    8. 点击集群实例进入集群详情页面

    9. 点击详情页面上方 数据源 页签进入数据源页面点击 添加 打他

    10. 选择通用网络,选择对应的VPC和VSwith点击 下一步 点击 确认 等待创建成功 data

  2. 将数据源ENI IP添加至RDS白名单

    1. 等待1-j步骤的数据源实例创建好以后找到ENI IP 打他

    2. 进入RDS管控实例点击数据库连接,选择白名单 data

    3. 选择创建好的白名单组,点击 修改 data

    4. 添加白名单选配项,点击 确认 data

3.登录Databricks数据洞察集群进入Notebook,代码实现SQL Server数据读写。

示例文本下载: The_Sorrows_of_Young_Werther.txt

在Notebook中使用%spark读取OSS文件,并执行WordCount代码实现。

%spark
// 从oss读取数据到spark的rdd
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
// 从oss地址读取文本文档(注意oss文件在账号下上传到对应目录)
val text = sc.textFile("oss://your bucket/demo/The_Sorrows_of_Young_Werther.txt")
// 使用Scala做WordCount处理
val counts = text.flatMap(_.split("\\s+")).map(s => s.replaceAll("""[\.";,!]""", "")).map(word => (word, 1L)).reduceByKey(_+_).map(e => Row.apply(e._1, e._2))
lazy val schema = StructType(
    StructField("word", StringType) ::
    StructField("count", LongType) :: Nil)
val count_df = sqlContext.createDataFrame(counts, schema)
count_df.show()
data

在Notebook中使用%spark读写SQL Server数据

%spark
// ddi读写sqlserver数据
val sqlServer = "your server.rds.aliyuncs.com"
val sqlServerDb = "ddi_test"
val sqlServerUrl = s"jdbc:sqlserver://$sqlServer:1433;databaseName=$sqlServerDb"
//write
count_df.write.mode("overwrite").format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").save()
//read
val sqlServer_read_df = spark.read.format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").load
sqlServer_read_df.show()
data