使用Databricks 读写SQL Server数据
-
DDI集群与SQL Server实例网络打通。
-
登录RDS管理控制台
RDS管理控制台
-
点击右侧导航栏
实例列表
选择实例所在
region
-
点击实例ID进入实例详情页面
-
点击实例详情右侧导航栏
数据库连接
-
如图所示查看RDS实例所在的
VPC
和
VSwitch
-
登录到Databricks数据洞察集群
阿里云Databricks控制台
-
选择集群所在region进入
集群
列表
-
点击集群实例进入集群详情页面
-
点击详情页面上方
数据源
页签进入数据源页面点击
添加
-
选择通用网络,选择对应的VPC和VSwith点击
下一步
点击
确认
等待创建成功
-
将数据源ENI IP添加至RDS白名单
-
等待1-j步骤的数据源实例创建好以后找到ENI IP
-
进入RDS管控实例点击数据库连接,选择白名单
-
选择创建好的白名单组,点击
修改
-
添加白名单选配项,点击
确认
3.登录Databricks数据洞察集群进入Notebook,代码实现SQL Server数据读写。
示例文本下载:
The_Sorrows_of_Young_Werther.txt
在Notebook中使用%spark读取OSS文件,并执行WordCount代码实现。
%spark
// 从oss读取数据到spark的rdd
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
// 从oss地址读取文本文档(注意oss文件在账号下上传到对应目录)
val text = sc.textFile("oss://your bucket/demo/The_Sorrows_of_Young_Werther.txt")
// 使用Scala做WordCount处理
val counts = text.flatMap(_.split("\\s+")).map(s => s.replaceAll("""[\.";,!]""", "")).map(word => (word, 1L)).reduceByKey(_+_).map(e => Row.apply(e._1, e._2))
lazy val schema = StructType(
StructField("word", StringType) ::
StructField("count", LongType) :: Nil)
val count_df = sqlContext.createDataFrame(counts, schema)
count_df.show()
在Notebook中使用%spark读写SQL Server数据
%spark
// ddi读写sqlserver数据
val sqlServer = "your server.rds.aliyuncs.com"
val sqlServerDb = "ddi_test"
val sqlServerUrl = s"jdbc:sqlserver://$sqlServer:1433;databaseName=$sqlServerDb"
//write
count_df.write.mode("overwrite").format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").save()
//read
val sqlServer_read_df = spark.read.format("jdbc").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", sqlServerUrl).option("user","uname").option("password", "your ps").option("dbtable", "ddi_count").load
sqlServer_read_df.show()