將筆記本附加至 Lakehouse。 在左側,選取
[新增
] 以新增現有的 Lakehouse 或建立 Lakehouse。
將sparklyr 連線至 Synapse Spark 叢集
在中使用
spark_connect()
下列連接方法來建立
sparklyr
連接。 我們支持稱為
synapse
的新連接方法,可讓您連線到現有的Spark會話。 這可大幅減少
sparklyr
會話開始時間。 此外,我們已將此連線方法貢獻給
開放原始碼 sparklyr 專案
。 透過
method = "synapse"
,您可以在相同的會話中使用
sparklyr
和
SparkR
,並輕鬆地
在它們
之間共享數據。
# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)
使用sparklyr讀取數據
新的Spark會話不包含任何數據。 第一個步驟是將數據載入 Spark 工作階段的記憶體,或將 Spark 指向資料的位置,以便依需求存取數據。
# load the sparklyr package
library(sparklyr)
# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)
head(mtcars_tbl)
使用 sparklyr
,您也可以 write
使用ABFS路徑,從Lakehouse檔案取得 read
數據。 若要讀取和寫入 Lakehouse,請先將其新增至您的工作階段。 在筆記本左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。
若要尋找 ABFS 路徑,請以滑鼠右鍵按兩下 Lakehouse 中的 [檔案 ] 資料夾,然後選取 [ 複製ABFS 路徑]。 貼上您的路徑以在此程式代碼中取代 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files
:
temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"
# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')
# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv)
head(mtcarsDF)
使用sparklyr操作數據
sparklyr
使用下列方法提供多個方法來處理 Spark 內的數據:
dplyr
命令
SparkSQL
Spark 的功能轉換器
使用 dplyr
您可以使用熟悉 dplyr
的命令在 Spark 內準備數據。 命令會在 Spark 內執行,因此 R 與 Spark 之間沒有不必要的資料傳輸。
按兩下 操作數據 搭配dplyr
,以查看搭配Spark使用 dplyr的額外檔。
# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)
cargroup <- group_by(mtcars_tbl, cyl) %>%
count() %>%
arrange(desc(n))
cargroup
sparklyr
並將 dplyr
R 命令轉譯為適用於我們的 Spark SQL。 若要檢視產生的查詢,請使用 show_query()
:
# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)
使用 SQL
您也可以直接對 Spark 叢集中的數據表執行 SQL 查詢。 物件 spark_connection()
會實作 Spark 的 DBI 介面,因此您可以使用 dbGetQuery()
來執行 SQL,並以 R 數據框架傳回結果:
library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")
上述兩種方法都依賴 SQL 語句。 Spark 提供命令,讓某些資料轉換更方便,而且不需要使用 SQL。
例如, ft_binarizer()
命令可簡化新數據行的建立,指出另一個數據行的值是否高於特定臨界值。
您可以從參考 -FT 找到可用的 sparklyr
Spark 功能轉換器完整清單。
mtcars_tbl %>%
ft_binarizer("mpg", "over_20", threshold = 20) %>%
select(mpg, over_20) %>%
head(5)
在和 之間 sparklyr
共享數據 SparkR
當您使用method = "synapse"
連線sparklyr
到 synapse spark 叢集時,可以在相同的工作階段中使用 sparklyr
和 SparkR
,並輕鬆地在它們之間共用數據。 您可以在 中 sparklyr
建立 Spark 數據表,並從 讀取它 SparkR
。
# load the sparklyr package
library(sparklyr)
# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)
# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")
head(mtcars_sparklr)
以下是我們用來 ml_linear_regression()
配合線性回歸模型的範例。 我們使用內mtcars
建數據集,並查看我們是否可以根據汽車重量()預測汽車的油耗(mpg
wt
),以及發動機包含的汽缸數目(cyl
)。 我們假設在每個案例中,和每個特徵之間的 mpg
關聯性都是線性的。
產生測試和定型數據集
使用分割,70% 用於定型,30% 用於測試模型。 玩這個比例會導致不同的模型。
# split the dataframe into test and training dataframes
partitions <- mtcars_tbl %>%
select(mpg, wt, cyl) %>%
sdf_random_split(training = 0.7, test = 0.3, seed = 2023)
將羅吉斯回歸模型定型。
fit <- partitions$training %>%
ml_linear_regression(mpg ~ .)
現在,使用 summary()
來深入瞭解模型的品質,以及每個預測值的統計意義。
summary(fit)
您可以呼叫 ml_predict()
,在測試數據集上套用模型。
pred <- ml_predict(fit, partitions$test)
head(pred)
如需可透過sparklyr取得的SparkML模型清單,請造訪 參考 - ML
中斷與Spark叢集的連線
您可以呼叫 spark_disconnect()
或選取 筆記本功能區頂端的 [停止會話 ] 按鈕,結束 Spark 會話。
spark_disconnect(sc)
深入瞭解 R 功能:
如何使用SparkR
如何使用 Tidyverse
R 連結庫管理
建立 R 視覺效果
教學課程:鱷梨價格預測
教學課程:航班延誤預測
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:https://aka.ms/ContentUserFeedback。
提交並檢視相關的意見反應