相关文章推荐
爱旅游的小虾米  ·  Installation from ...·  1 周前    · 
酷酷的金鱼  ·  How to Effectively ...·  5 月前    · 
愤怒的伤疤  ·  QRadar: How to use ...·  1 年前    · 

在Airflow中做for循环时出现NoneType错误

1 人关注

我正在使用Airflow 2.3.3。我创建了第一个dyamic dag,其中有两个任务:第一个任务是向一个API发出请求,返回一个包含我需要的数据的JSON列表;第二个任务是将这个列表转换为Dataframe类型,这样我就可以用pandas处理它,只提取有用的数据(第一个任务是多次运行的)。问题是,在我的IDLE(pycharm)中,脚本工作得很好,但当我运行dag时,我得到了一个问题,我实现的用于迭代数据框架的for循环,在应该是行的情况下,返回给我 "NoneType "值。我试着将JSON作为dict类型进行迭代,我得到了同样的结果:for循环给我返回NoneType值。

这是我做的for循环,用于从数据框架中获取行,然后获取我需要的数据。

for dic in df_consulta:
    for n in dic:
        if n.lower() in datos:
            datos[n.lower()] = dic.get(n)
    n_fiscal = dic['domicilioFiscal']
    for f in n_fiscal:
        if f.lower() in datos:
            datos[f.lower()] = n_fiscal.get(f)
    d_f.loc[len(d_f.index)] = datos

"dic" is a row that contains a dict -> this, in Airflow, returns me a "NoneType" variable and in consequence:

TypeError: 'NoneType' object is not iterable

"datos "是一个空的dict,与dic的名字键相同,所以我可以提取值

"domicilioFiscal "是 "dic "中的另一个dict。

"d_f "是我建立的最终数据框架,用于收集具有各自寄存器的数据。

下面是完整的dag代码。

import teradatasql
import pandas as pd
from airflow.decorators import task, dag
from airflow.hooks.base_hook import BaseHook
DEFAULT_ARGS = {'owner': 'airflow',
                'depends_on_past': False,
                'retries': 1,
                'retry_delay': timedelta(minutes=5)}
@dag(dag_id='Actualization_Teradata',
     default_args=DEFAULT_ARGS,
     start_date=datetime(2022, 9, 12),
     catchup=False,
     schedule_interval='@daily',
     tags=['AFIP', 'Teradata', 'Actualizacion'])
def my_dag():
    @task
    def consulta_afip():
        conn = BaseHook.get_connection(f'Teradata SYSDBA')
        con_tera = teradatasql.connect(None,
                                       host=f"111.11.11.11",
                                       user=f"{conn.login}",
                                       password=f"{conn.password}",
                                       column_name='false')
        query = ("SELECT top 3 * FROM TABLE WHERE COLUMN_1 = 'DATA'")
        global tsql
        tsql = con_tera.cursor()
        tsql.execute(query)
        col_headers = [i[0] for i in tsql.description]
        rows = [list(i) for i in tsql.fetchall()]
        data = pd.DataFrame(rows, columns=col_headers)
        cuit_lista = []
        for d in data['CUIT']:
            cuit_lista.append(d)
        results = perform_web_requests(cuit_lista, 8)
        tsql.close()
        return results
    @task
    def update_teradata(consultas):
        df = pd.DataFrame(consultas)
        df_consulta = df['Contribuyente']
        print(df_consulta)
        datos = {
            'idpersona': '',
            'tipopersona': '',
            'estadoclave': '',
            'nombre': '',
            'nombreprovincia': '',
            'localidad': '',
            'codpostal': '',
            'direccion': ''
        d_f = pd.DataFrame(columns=['idpersona', 'tipopersona', 'estadoclave', 'nombre', 'nombreprovincia', 'localidad',
                                    'codpostal', 'direccion'])
        for dic in df_consulta:
            for n in dic:
                if n.lower() in datos:
                    datos[n.lower()] = dic.get(n)
            n_fiscal = dic['domicilioFiscal']
            for f in n_fiscal:
                if f.lower() in datos:
                    datos[f.lower()] = n_fiscal.get(f)
            d_f.loc[len(d_f.index)] = datos
        columnas_teradata = ['CUIT', 'SITUACION_JURIDICA', 'ESTADO', 'NOMBRE_COMPLETO', 'PROVINCIA', 'LOCALIDAD',
                             'CODIGO_POSTAL', 'DIRECCION']
        d_f.columns = columnas_teradata
        return d_f
    update_teradata.expand(consultas=consulta_afip())