相关文章推荐
腼腆的伤疤  ·  Azure AI 视频处理指南 - ...·  23 分钟前    · 
知识渊博的闹钟  ·  Linux ...·  2 年前    · 
豪气的青蛙  ·  Kotlin之应用Gson - ...·  2 年前    · 
要出家的台灯  ·  MATLAB 之 ...·  2 年前    · 

在本教程中,你将通过添加 BlobTrigger 函数(在文件上传到 Blob 存储容器时触发)继续在 Python 中开发本地 Azure 函数。 Azure 函数使用各种 Python 库清理和规范化存储在 Azure Blob 存储 中的 JSON 文件中的新闻文章结果数据。

必须已完成以下各项的所有步骤:

  • 使用 Python Azure 函数获取必应新闻
  • 1. 为 Python Functions 应用创建本地 BlobTrigger

    步骤 1。 在Visual Studio Code工作区中创建新的本地 Azure 函数。

  • 选择 “活动”栏中 “Azure”图标
  • “工作区 (本地) ”区域中 ,选择“Azure 函数”图标 ( + + lightening) 以添加另一个 API 函数。
  • 为函数选择模板 :选择 Azure blob storage trigger
  • 提供函数名称 :输入 api_blob_trigger
  • 从“local.settings.json”中选择设置 :选择 BLOB_STORAGE_CONNECTION_STRING 。 虽然此值与在本地运行时相同 AzureWebJobsStorage ,但部署函数应用时,两个连接字符串将指向不同的存储帐户。
  • 触发器将监视的存储帐户中的路径 :输入 msdocs-python-cloud-etl-news-source/{name} 。 这表示 msdocs-python-cloud-etl-news-source 容器以及降落在该处的任何文件。
  • 2.为 Azure Data Lake Gen 2 创建资源

    Azure Data Lake Storage (ADLS) 基于 Azure Blob 文件系统 (ABFS) 通过 TLS/SSL 进行加密,并使用经过优化的驱动程序进行大数据工作负载。 其他功能(如 Blob 存储的存储层选项和高可用性 & 灾难恢复选项)使 ADLS 成为大数据分析的理想存储解决方案。

    对于 Azure Data Lake Gen,存储帐户的创建与创建Azure Blob 存储相同。 唯一的区别在于,必须启用分层命名空间 (HNS) 属性。 分层命名空间是Azure Data Lake Storage的基础部分。 利用此功能,可以将对象/文件组织到目录层次结构中,以实现高效的数据访问。

    按照以下步骤创建和配置Azure Data Lake Storage资源。

    步骤 1。 在Azure 门户中创建 Azure 存储帐户资源。

  • 打开浏览器窗口并导航到 Azure 门户
  • 在搜索框中输入 存储
  • 导航到搜索结果中 “服务 ”下的 “存储帐户 ”。
  • “存储帐户 ”对话框中选择“ + 创建 ”按钮。
  • “存储帐户 资源”对话框窗口的左侧面板中选择“ 访问控制 (IAM) ”。
  • “授予对此资源的访问权限 ”部分中,选择“ 添加角色分配 ”按钮。
  • “添加角色分配 ”对话框中,搜索并选择“ 存储 Blob 数据参与者 ”,然后选择“ 下一步 ”。
  • 分配的访问权限 :选择 “用户、组或服务主体 ”。
  • 成员 :选择“ + 选择成员 ”。
  • “所选成员” 中,搜索并查找 Azure 帐户。
  • 选择标识以将其添加为所选成员。
  • 使用 “选择” 按钮添加标识。
  • 查看所选值,然后选择“ 查看 + 分配 ”。
  • 3.为 Azure Data Lake 创建容器和目录

    容器充当文件系统目录,用于组织 Azure Data Lake Store 中的数据文件。 容器可以存储无限数量的 Blob,存储帐户可以有无限数量的容器。

    将数据加载到数据湖时,必须考虑简化安全性、高效处理和分区工作。 Azure Data Lake Storage Gen 2 使用目录而不是 Blob 存储中的虚拟文件夹。 目录可实现更精确的安全性、控制访问和目录级文件系统操作。

    步骤 1。 左侧 面板中的 “数据存储 ”部分,选择“ 容器 ”,然后在“容器”窗格中选择“ + 容器 ”。

    将数据湖转换为数据沼泽非常简单。 因此,管理数据湖中的数据非常重要。

    Azure Purview 是一项统一的数据治理服务,可帮助你管理和治理本地、多云和服务型软件 (SaaS) 数据。 通过自动化的数据发现、敏感数据分类和端到端数据世系,轻松创建数据领域的整体最新映射。

    4. 使用 Python SDK 为 Data Lake 创建代码

    将数据转换为适合分析的格式后,将数据加载到分析数据存储中。 数据存储可以是数据库系统、数据仓库、数据湖或 Hadoop。 每个目标都有不同的加载数据可靠性和优化性能的方法。 现在可以将数据用于分析和商业智能。 本文将转换后的数据加载到 Azure Data Lake Storage (ADLS) ,因为各种计算和分析 Azure 服务可以轻松连接到Azure Data Lake Storage。

  • 打开 local.settings.json 文件,该文件保存了本地环境设置。

  • 编辑文件以更新以下内容:

    # HTTP request for URL contents def get_content_from_url(url): return requests.get(url).content # clean metadata def clean_metadata(jsonitem): # get news article URL article_url = jsonitem["url"] # remove html tags in article name article_title = remove_html_tags(jsonitem["name"]) # remove html tags in article description article_descr = remove_html_tags(jsonitem["description"]) # get article contents article_text = get_html_text(requests.get(article_url).content) return article_url, article_title, article_descr, article_text # strip HTML tags from a string. def remove_html_tags(html_text): return html.escape(re.compile(r"<[^>]+>").sub("", str(html_text))) # get all text of a news article # assume heavy use of `<p>`` (paragraph) HTML tag def get_html_text(page_html): soup = BeautifulSoup(page_html, "html.parser") text = soup.find_all("p", text=True) text = remove_html_tags(str(text)) return text # normalize text def normalize_text(text_string): # Lowercase text lower_string = text_string.lower() # remove numbers no_number_string = re.sub(r"\d+", "", lower_string) # remove punctuation except words and space no_punc_string = re.sub( r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?", no_number_string, # remove white space no_wspace_string = no_punc_string.strip() # decode unicode to handle special characters json_bytes = no_wspace_string.encode() json_str_decoded = json.dumps(json_bytes.decode("utf-8", errors="ignore")) return json_str_decoded # Loop through and process each search result def clean_documents(data_dictionary): for item in data_dictionary: # get news article URL. article_url = item["url"] logging.info("article_url: %s", article_url) # get and remove any html tags in the name of the news article. item["name"] = remove_html_tags(item["name"]) # get and remove any html tags in the short description of the news article. item["description"] = remove_html_tags(item["description"]) # get the new article contents and store text. article_text = get_html_text(requests.get(article_url).content) # remove any html tags in the news article's text. article_text = remove_html_tags(article_text) # preprocess/normalize new article's text to make it easier to # consume by analytic applications. article_text_norm = normalize_text(article_text) # add new data to dictionary item["article_text"] = article_text_norm return data_dictionary

    5.使用 Python 为数据湖创建代码

  • 共享文件夹 中创建名为 data_lake.py 的文件。

  • 将以下 Python 代码复制到其中。

    # ./shared/data_lake.py import logging import os from azure.storage.filedatalake import DataLakeServiceClient # Upload the data to Azure Data Lake # Required RBAC role - Storage Blob Data Owner def upload_to_data_lake( azure_credential, data_lake_account_name, data_lake_container_name, data_lake_directory_name, file_name, data_str, # Get the client service_client = DataLakeServiceClient( account_url=f"https://{data_lake_account_name}.dfs.core.windows.net", credential=azure_credential, # Get the file system client file_system_client = service_client.get_file_system_client(file_system=data_lake_container_name) # Get the directory client directory_client = file_system_client.get_directory_client(data_lake_directory_name) # Get the file client file_client = directory_client.get_file_client(file_name) # Upload the data file_client.upload_data(data_str, overwrite=True) return file_name

    7. 使用 Python 为 BlobTrigger 函数创建代码

  • 打开 api_blob_trigger 文件夹中的 init.py 文件。

  • 将文件的内容替换为以下 Python 代码。

    # ./api_blob_trigger/__init__.py import os import json import logging import azure.functions as func from shared.azure_credential import get_azure_default_credential from shared.data_lake import upload_to_data_lake from shared.transform import clean_documents def main(myblob: func.InputStream): logging.info("Python blob trigger function processed blob \nName: %s \nBlob Size: %s bytes", myblob.name, myblob.length) # read the blob content as a string. search_results_blob_str = myblob.read() # decode the string to Unicode blob_json = search_results_blob_str.decode("utf-8") # parse a valid JSON string and convert it into a Python dict # Get environment variables data_lake_account_name = os.environ.get("DATALAKE_GEN_2_RESOURCE_NAME") data_lake_container_name = os.environ.get("DATALAKE_GEN_2_CONTAINER_NAME") data_lake_directory_name = os.environ.get("DATALAKE_GEN_2_DIRECTORY_NAME") # Get Data data = json.loads(blob_json) # Clean Data new_data_dictionary = clean_documents(data) # Prepare to upload json_str = json.dumps(new_data_dictionary) file_name = myblob.name.split("/")[1] new_file_name = f"processed_{file_name}" # Get authentication to Azure azure_default_credential = get_azure_default_credential() # Upload to Data Lake upload_to_data_lake(azure_default_credential, data_lake_account_name, data_lake_container_name, data_lake_directory_name, new_file_name, json_str) logging.info( "Successfully uploaded to data lake, old: %s, new: %s", myblob.name, new_file_name except ValueError as err: logging.info("Error converting %s to python dictionary: %s", myblob.name, err)

    8.测试 Azure Blob 存储触发器函数

    若要正确测试本地 Azure 存储 Blob 触发器函数,必须先执行 Azure HTTP 触发器函数。 由于 Azure HTTP 触发器函数创建结果文件并将其上传到 Azure Blob 存储,Blob 触发器函数会自动执行。

  • 在本地运行函数。

    func start
    
  • 在本地测试函数。 使用 Web 浏览器测试 搜索 API:

    http://localhost:7071/api/search?search_term=azure&count=5
    
  • 验证 Blob 存储 msdocs-python-cloud-etl-news-source 容器是否有名为 这样的search_results_azure_yar6q2P80Lm4FG7.json文件。

  • 验证 Data Lake msdocs-python-cloud-etl-processed 容器和 news-data 目录是否具有名为 这样的processed_search_results_azure_yar6q2P80Lm4FG7.json文件。

    你完成了哪些工作

    如果 Azure 函数成功,则表明已正确配置并运行以下内容:

  • 本地 API
  • Azure 资源:必应搜索、密钥保管库、Blob 存储
  • 身份验证:用户凭据可以访问资源
  • 下一步是将代码部署到 Azure 函数资源并正确配置该资源。

    排查 Azure 函数问题

    如果已达到此点,并且已处理的文件不在 Data Lake 容器和目录中,请使用以下信息调试应用程序。

  • 打开本地日志记录
    1. 停止应用程序。
    2. 打开 ./host.json 文件。
    3. logging.logLevel.default 属性设置为 "Information"
    4. 如果 Blob 存储中有任何文件,请下载该文件并检查内容。 如果是新闻信息的 JSON 数组,则你知道 HTTP 触发器 api_search已成功工作。
    5. 删除 Blob 存储中的文件。
    6. 再次启动应用程序,并使用 HTTP API 终结点搜索新闻。
    7. 查看调试日志。 它包括发生的任何错误。
    8. Data Lake Storage
      • 必须在 local.settings.json 的 属性中DATALAKE_GEN_2_RESOURCE_NAME设置资源名称。
      • 容器名称(如)msdocs-python-cloud-etl-news-source必须与 local.settings.json 中的 属性匹配BLOB_STORAGE_CONTAINER_NAME
      • Blob 存储的用户帐户需要 存储 Blob 数据参与者 角色才能添加和读取 Blob。
      • 部署解决方案 >>

  •