在本教程中,你将通过添加 BlobTrigger 函数(在文件上传到 Blob 存储容器时触发)继续在 Python 中开发本地 Azure 函数。 Azure 函数使用各种 Python 库清理和规范化存储在
Azure Blob 存储
中的 JSON 文件中的新闻文章结果数据。
必须已完成以下各项的所有步骤:
使用 Python Azure 函数获取必应新闻
1. 为 Python Functions 应用创建本地 BlobTrigger
步骤 1。
在Visual Studio Code工作区中创建新的本地 Azure 函数。
选择
“活动”栏中
的
“Azure”图标
。
在
“工作区 (本地) ”区域中
,选择“Azure 函数”图标 (
+
+ lightening) 以添加另一个 API 函数。
为函数选择模板
:选择
Azure blob storage trigger
。
提供函数名称
:输入
api_blob_trigger
。
从“local.settings.json”中选择设置
:选择
BLOB_STORAGE_CONNECTION_STRING
。 虽然此值与在本地运行时相同
AzureWebJobsStorage
,但部署函数应用时,两个连接字符串将指向不同的存储帐户。
触发器将监视的存储帐户中的路径
:输入
msdocs-python-cloud-etl-news-source/{name}
。 这表示
msdocs-python-cloud-etl-news-source
容器以及降落在该处的任何文件。
2.为 Azure Data Lake Gen 2 创建资源
Azure Data Lake Storage (ADLS) 基于 Azure Blob 文件系统 (ABFS) 通过 TLS/SSL 进行加密,并使用经过优化的驱动程序进行大数据工作负载。 其他功能(如 Blob 存储的存储层选项和高可用性 & 灾难恢复选项)使 ADLS 成为大数据分析的理想存储解决方案。
对于 Azure Data Lake Gen,存储帐户的创建与创建Azure Blob 存储相同。 唯一的区别在于,必须启用分层命名空间 (HNS) 属性。 分层命名空间是Azure Data Lake Storage的基础部分。 利用此功能,可以将对象/文件组织到目录层次结构中,以实现高效的数据访问。
按照以下步骤创建和配置Azure Data Lake Storage资源。
步骤 1。
在Azure 门户中创建 Azure 存储帐户资源。
打开浏览器窗口并导航到
Azure 门户
。
在搜索框中输入
存储
。
导航到搜索结果中
“服务
”下的
“存储帐户
”。
在
“存储帐户
”对话框中选择“
+ 创建
”按钮。
在
“存储帐户
资源”对话框窗口的左侧面板中选择“
访问控制 (IAM)
”。
在
“授予对此资源的访问权限
”部分中,选择“
添加角色分配
”按钮。
在
“添加角色分配
”对话框中,搜索并选择“
存储 Blob 数据参与者
”,然后选择“
下一步
”。
分配的访问权限
:选择
“用户、组或服务主体
”。
成员
:选择“
+ 选择成员
”。
从
“所选成员”
中,搜索并查找 Azure 帐户。
选择标识以将其添加为所选成员。
使用
“选择”
按钮添加标识。
查看所选值,然后选择“
查看 + 分配
”。
3.为 Azure Data Lake 创建容器和目录
容器充当文件系统目录,用于组织 Azure Data Lake Store 中的数据文件。 容器可以存储无限数量的 Blob,存储帐户可以有无限数量的容器。
将数据加载到数据湖时,必须考虑简化安全性、高效处理和分区工作。 Azure Data Lake Storage Gen 2 使用目录而不是 Blob 存储中的虚拟文件夹。 目录可实现更精确的安全性、控制访问和目录级文件系统操作。
步骤 1。
在
左侧
面板中的
“数据存储
”部分,选择“
容器
”,然后在“容器”窗格中选择“
+
容器
”。
将数据湖转换为数据沼泽非常简单。 因此,管理数据湖中的数据非常重要。
Azure Purview
是一项统一的数据治理服务,可帮助你管理和治理本地、多云和服务型软件 (SaaS) 数据。 通过自动化的数据发现、敏感数据分类和端到端数据世系,轻松创建数据领域的整体最新映射。
4. 使用 Python SDK 为 Data Lake 创建代码
将数据转换为适合分析的格式后,将数据加载到分析数据存储中。 数据存储可以是数据库系统、数据仓库、数据湖或 Hadoop。 每个目标都有不同的加载数据可靠性和优化性能的方法。 现在可以将数据用于分析和商业智能。 本文将转换后的数据加载到 Azure Data Lake Storage (ADLS) ,因为各种计算和分析 Azure 服务可以轻松连接到Azure Data Lake Storage。
打开
local.settings.json
文件,该文件保存了本地环境设置。
编辑文件以更新以下内容:
# HTTP request for URL contents
def get_content_from_url(url):
return requests.get(url).content
# clean metadata
def clean_metadata(jsonitem):
# get news article URL
article_url = jsonitem["url"]
# remove html tags in article name
article_title = remove_html_tags(jsonitem["name"])
# remove html tags in article description
article_descr = remove_html_tags(jsonitem["description"])
# get article contents
article_text = get_html_text(requests.get(article_url).content)
return article_url, article_title, article_descr, article_text
# strip HTML tags from a string.
def remove_html_tags(html_text):
return html.escape(re.compile(r"<[^>]+>").sub("", str(html_text)))
# get all text of a news article
# assume heavy use of `<p>`` (paragraph) HTML tag
def get_html_text(page_html):
soup = BeautifulSoup(page_html, "html.parser")
text = soup.find_all("p", text=True)
text = remove_html_tags(str(text))
return text
# normalize text
def normalize_text(text_string):
# Lowercase text
lower_string = text_string.lower()
# remove numbers
no_number_string = re.sub(r"\d+", "", lower_string)
# remove punctuation except words and space
no_punc_string = re.sub(
r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?",
no_number_string,
# remove white space
no_wspace_string = no_punc_string.strip()
# decode unicode to handle special characters
json_bytes = no_wspace_string.encode()
json_str_decoded = json.dumps(json_bytes.decode("utf-8", errors="ignore"))
return json_str_decoded
# Loop through and process each search result
def clean_documents(data_dictionary):
for item in data_dictionary:
# get news article URL.
article_url = item["url"]
logging.info("article_url: %s", article_url)
# get and remove any html tags in the name of the news article.
item["name"] = remove_html_tags(item["name"])
# get and remove any html tags in the short description of the news article.
item["description"] = remove_html_tags(item["description"])
# get the new article contents and store text.
article_text = get_html_text(requests.get(article_url).content)
# remove any html tags in the news article's text.
article_text = remove_html_tags(article_text)
# preprocess/normalize new article's text to make it easier to
# consume by analytic applications.
article_text_norm = normalize_text(article_text)
# add new data to dictionary
item["article_text"] = article_text_norm
return data_dictionary
5.使用 Python 为数据湖创建代码
在
共享文件夹
中创建名为
data_lake.py
的文件。
将以下 Python 代码复制到其中。
# ./shared/data_lake.py
import logging
import os
from azure.storage.filedatalake import DataLakeServiceClient
# Upload the data to Azure Data Lake
# Required RBAC role - Storage Blob Data Owner
def upload_to_data_lake(
azure_credential,
data_lake_account_name,
data_lake_container_name,
data_lake_directory_name,
file_name,
data_str,
# Get the client
service_client = DataLakeServiceClient(
account_url=f"https://{data_lake_account_name}.dfs.core.windows.net",
credential=azure_credential,
# Get the file system client
file_system_client = service_client.get_file_system_client(file_system=data_lake_container_name)
# Get the directory client
directory_client = file_system_client.get_directory_client(data_lake_directory_name)
# Get the file client
file_client = directory_client.get_file_client(file_name)
# Upload the data
file_client.upload_data(data_str, overwrite=True)
return file_name
7. 使用 Python 为 BlobTrigger 函数创建代码
打开
api_blob_trigger
文件夹中的
init.py
文件。
将文件的内容替换为以下 Python 代码。
# ./api_blob_trigger/__init__.py
import os
import json
import logging
import azure.functions as func
from shared.azure_credential import get_azure_default_credential
from shared.data_lake import upload_to_data_lake
from shared.transform import clean_documents
def main(myblob: func.InputStream):
logging.info("Python blob trigger function processed blob \nName: %s \nBlob Size: %s bytes", myblob.name, myblob.length)
# read the blob content as a string.
search_results_blob_str = myblob.read()
# decode the string to Unicode
blob_json = search_results_blob_str.decode("utf-8")
# parse a valid JSON string and convert it into a Python dict
# Get environment variables
data_lake_account_name = os.environ.get("DATALAKE_GEN_2_RESOURCE_NAME")
data_lake_container_name = os.environ.get("DATALAKE_GEN_2_CONTAINER_NAME")
data_lake_directory_name = os.environ.get("DATALAKE_GEN_2_DIRECTORY_NAME")
# Get Data
data = json.loads(blob_json)
# Clean Data
new_data_dictionary = clean_documents(data)
# Prepare to upload
json_str = json.dumps(new_data_dictionary)
file_name = myblob.name.split("/")[1]
new_file_name = f"processed_{file_name}"
# Get authentication to Azure
azure_default_credential = get_azure_default_credential()
# Upload to Data Lake
upload_to_data_lake(azure_default_credential, data_lake_account_name, data_lake_container_name, data_lake_directory_name, new_file_name, json_str)
logging.info(
"Successfully uploaded to data lake, old: %s, new: %s", myblob.name, new_file_name
except ValueError as err:
logging.info("Error converting %s to python dictionary: %s", myblob.name, err)
8.测试 Azure Blob 存储触发器函数
若要正确测试本地 Azure 存储 Blob 触发器函数,必须先执行 Azure HTTP 触发器函数。 由于 Azure HTTP 触发器函数创建结果文件并将其上传到 Azure Blob 存储,Blob 触发器函数会自动执行。
在本地运行函数。
func start
在本地测试函数。 使用 Web 浏览器测试 搜索 API:
http://localhost:7071/api/search?search_term=azure&count=5
验证 Blob 存储 msdocs-python-cloud-etl-news-source 容器是否有名为 这样的search_results_azure_yar6q2P80Lm4FG7.json
文件。
验证 Data Lake msdocs-python-cloud-etl-processed 容器和 news-data 目录是否具有名为 这样的processed_search_results_azure_yar6q2P80Lm4FG7.json
文件。
你完成了哪些工作
如果 Azure 函数成功,则表明已正确配置并运行以下内容:
本地 API
Azure 资源:必应搜索、密钥保管库、Blob 存储
身份验证:用户凭据可以访问资源
下一步是将代码部署到 Azure 函数资源并正确配置该资源。
排查 Azure 函数问题
如果已达到此点,并且已处理的文件不在 Data Lake 容器和目录中,请使用以下信息调试应用程序。
打开本地日志记录:
- 停止应用程序。
- 打开
./host.json
文件。
- 将 logging.logLevel.default 属性设置为
"Information"
。
- 如果 Blob 存储中有任何文件,请下载该文件并检查内容。 如果是新闻信息的 JSON 数组,则你知道 HTTP 触发器
api_search
已成功工作。
- 删除 Blob 存储中的文件。
- 再次启动应用程序,并使用 HTTP API 终结点搜索新闻。
- 查看调试日志。 它包括发生的任何错误。
- Data Lake Storage
- 必须在 local.settings.json 的 属性中
DATALAKE_GEN_2_RESOURCE_NAME
设置资源名称。
- 容器名称(如)
msdocs-python-cloud-etl-news-source
必须与 local.settings.json 中的 属性匹配BLOB_STORAGE_CONTAINER_NAME
。
- Blob 存储的用户帐户需要 存储 Blob 数据参与者 角色才能添加和读取 Blob。
部署解决方案 >>