如何使用airflow来协调简单的pandas etl python脚本?

3 人关注

我喜欢airflow的理念,但我被卡在了基础知识上。从昨天开始,我在一个vm ubuntu-postgres解决方案上运行airflow。我可以看到仪表板和示例数据:))我现在想要的是迁移一个示例脚本,我用它来处理原始数据和准备数据。

想象一下,你有一个包含csv文件的文件夹。今天,我的脚本对它进行迭代,把每个文件传到一个列表中,这个列表将被转换为df文件。之后,我准备好它们的列名,做一些数据清理工作,并把它写成不同的格式。

1: pd.read_csv for files in directory

2: 创建一个DF

3:清洁列名

4:清洁值(与STP 3平行)。

5:将结果写到数据库中

我得如何根据气流来组织我的文件?脚本应该是什么样子的?我是传递一个方法,一个文件,还是要为每个部分创建几个文件?在这一点上,我缺乏基本的概念:(我读到的关于气流的所有内容都比我的简单案例复杂得多。我正在考虑放弃气流,转而使用Bonobo, Mara, Luigi,但我认为气流是值得的。

1 个评论
到目前为止,你已经创造了什么?我们可以看看吗?
python
pandas
etl
airflow
airflow-scheduler
Christian
Christian
发布于 2018-06-04
2 个回答
tobi6
tobi6
发布于 2021-09-08
已采纳
0 人赞同

我会使用 PythonOperator ,把整个代码放到一个Python函数中,创建一个Airflow任务,就可以了。

如果有必要将这些步骤分开,也可以将csv文件的加载放在一个函数中,将数据库的写入也放在一个函数中。所有这些都将被放在一个单一的DAG中。

因此,你的一个DAG将有三个任务,如。

loadCSV (PythonOperator)
parseDF (PythonOperator)
pushToDB (PythonOperator)

如果你使用几个任务,你need to use 气流的XCom.在开始时,只使用一项任务比较容易。

这里有几个代码例子,在标签airflow下。当你创建了一些东西后,再问吧。

d_-
作为补充说明,这里有另一个S.O.线程,它展示了类似的东西。 stackoverflow.com/questions/57861233/...
与通过外部 .csv 文件交换数据相比,葡萄园XCom后端能够在DAG中的任务之间实现更有效的零拷贝数据共享,请参见。 v6d.io/notes/airflow.html
sighingnow
sighingnow
发布于 2021-09-08
0 人赞同

对于还停留在这个问题上的人来说,我们最近为气流实现了一个定制的XCom后端,其支持是 葡萄园 在这种情况下,要支持这种情况。

The provider is opensource there: https://github.com/v6d-io/v6d/tree/main/python/葡萄园/contrib/airflow

有了Vineyard XCom后端,用户可以直接拥有生产和消费 pandas.DataFrame 的dag,而不需要任何 "to_csv "+"from_csv "的黑客。

import numpy as np
import pandas as pd
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
default_args = {
    'owner': 'airflow',
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def taskflow_etl_pandas():
    @task()
    def extract():
        order_data_dict = pd.DataFrame({
            'a': np.random.rand(100000),
            'b': np.random.rand(100000),
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        return {"total_order_value": order_data_dict["a"].sum()}
    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)