我正在使用Airflow 2.3.3。我创建了第一个dyamic dag,其中有两个任务:第一个任务是向一个API发出请求,返回一个包含我需要的数据的JSON列表;第二个任务是将这个列表转换为Dataframe类型,这样我就可以用pandas处理它,只提取有用的数据(第一个任务是多次运行的)。问题是,在我的IDLE(pycharm)中,脚本工作得很好,但当我运行dag时,我得到了一个问题,我实现的用于迭代数据框架的for循环,在应该是行的情况下,返回给我 "NoneType "值。我试着将JSON作为dict类型进行迭代,我得到了同样的结果:for循环给我返回NoneType值。
这是我做的for循环,用于从数据框架中获取行,然后获取我需要的数据。
for dic in df_consulta:
for n in dic:
if n.lower() in datos:
datos[n.lower()] = dic.get(n)
n_fiscal = dic['domicilioFiscal']
for f in n_fiscal:
if f.lower() in datos:
datos[f.lower()] = n_fiscal.get(f)
d_f.loc[len(d_f.index)] = datos
"dic" is a row that contains a dict -> this, in Airflow, returns me a "NoneType" variable and in consequence:
TypeError: 'NoneType' object is not iterable
"datos "是一个空的dict,与dic的名字键相同,所以我可以提取值
"domicilioFiscal "是 "dic "中的另一个dict。
"d_f "是我建立的最终数据框架,用于收集具有各自寄存器的数据。
下面是完整的dag代码。
import teradatasql
import pandas as pd
from airflow.decorators import task, dag
from airflow.hooks.base_hook import BaseHook
DEFAULT_ARGS = {'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)}
@dag(dag_id='Actualization_Teradata',
default_args=DEFAULT_ARGS,
start_date=datetime(2022, 9, 12),
catchup=False,
schedule_interval='@daily',
tags=['AFIP', 'Teradata', 'Actualizacion'])
def my_dag():
@task
def consulta_afip():
conn = BaseHook.get_connection(f'Teradata SYSDBA')
con_tera = teradatasql.connect(None,
host=f"111.11.11.11",
user=f"{conn.login}",
password=f"{conn.password}",
column_name='false')
query = ("SELECT top 3 * FROM TABLE WHERE COLUMN_1 = 'DATA'")
global tsql
tsql = con_tera.cursor()
tsql.execute(query)
col_headers = [i[0] for i in tsql.description]
rows = [list(i) for i in tsql.fetchall()]
data = pd.DataFrame(rows, columns=col_headers)
cuit_lista = []
for d in data['CUIT']:
cuit_lista.append(d)
results = perform_web_requests(cuit_lista, 8)
tsql.close()
return results
@task
def update_teradata(consultas):
df = pd.DataFrame(consultas)
df_consulta = df['Contribuyente']
print(df_consulta)
datos = {
'idpersona': '',
'tipopersona': '',
'estadoclave': '',
'nombre': '',
'nombreprovincia': '',
'localidad': '',
'codpostal': '',
'direccion': ''
d_f = pd.DataFrame(columns=['idpersona', 'tipopersona', 'estadoclave', 'nombre', 'nombreprovincia', 'localidad',
'codpostal', 'direccion'])
for dic in df_consulta:
for n in dic:
if n.lower() in datos:
datos[n.lower()] = dic.get(n)
n_fiscal = dic['domicilioFiscal']
for f in n_fiscal:
if f.lower() in datos:
datos[f.lower()] = n_fiscal.get(f)
d_f.loc[len(d_f.index)] = datos
columnas_teradata = ['CUIT', 'SITUACION_JURIDICA', 'ESTADO', 'NOMBRE_COMPLETO', 'PROVINCIA', 'LOCALIDAD',
'CODIGO_POSTAL', 'DIRECCION']
d_f.columns = columnas_teradata
return d_f
update_teradata.expand(consultas=consulta_afip())