You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to execute a few queries using DatabricksSQLOperator in an airflow dag. Seems like the output received from the Operator cannot be handled by Xcom.
Hence receiving error log -
[2024-05-06T20:17:44.253+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T20:17:44.254+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T20:17:44.259+0000] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=ct_sql_xcom, task_id=select_data, execution_date=20240506T170918, start_date=20240506T201741, end_date=20240506T201744
[2024-05-06T20:17:44.266+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 238 for task select_data (Object of type tuple is not JSON serializable; 1850)
What you think should happen instead
Xcom should be able to handle the JSON serialization or have set a standard with DatabricksSQL providers on acceptable response type to handle any generic case.
How to reproduce
create a DatabricksSQLOperator airflow dag
set the do_xcom_push=True
set a separate task to parse and use values sql query result
`from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
from airflow.models import Variable
env = Variable.get('AIRFLOW_VAR_ENV_NAME')
with DAG('ct_sql_xcom',
start_date = datetime(2024, 1, 30),
schedule_interval = None
) as dag:
create_file = DatabricksSqlOperator(
databricks_conn_id='databricks',
sql_endpoint_name='Serverless',
task_id="create_and_populate_from_file",
sql="select table_name from system.information_schema.tables where table_catalog = 'rsg_prod'",
do_xcom_push=True,
)
def downstream_task(**kwargs):
result = kwargs['task_instance'].xcom_pull(task_ids='create_file')
print("Received result from XCom:", result)
create_file >> downstream_task
`
Anything else
`[2024-05-06T19:13:57.614+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T19:13:57.615+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T19:13:57.620+0000] {taskinstance.py:1400} INFO - Marking task as UP_FOR_RETRY. dag_id=ct_sql_xcom, task_id=create_file , execution_date=20240506T170918, start_date=20240506T191355, end_date=20240506T191357
[2024-05-06T19:13:57.627+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 233 for task create_file (Object of type tuple is not JSON serializable; 842)`
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
apache-airflow-providers-databricks==6.3.0
Apache Airflow version
2.7.3
Operating System
(airflow)cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" (airflow)
Deployment
Other Docker-based deployment
Deployment details
No response
What happened
I am trying to execute a few queries using DatabricksSQLOperator in an airflow dag. Seems like the output received from the Operator cannot be handled by Xcom.
Hence receiving error log -
[2024-05-06T20:17:44.253+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T20:17:44.254+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T20:17:44.259+0000] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=ct_sql_xcom, task_id=select_data, execution_date=20240506T170918, start_date=20240506T201741, end_date=20240506T201744
[2024-05-06T20:17:44.266+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 238 for task select_data (Object of type tuple is not JSON serializable; 1850)
What you think should happen instead
Xcom should be able to handle the JSON serialization or have set a standard with DatabricksSQL providers on acceptable response type to handle any generic case.
How to reproduce
`from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
from airflow.models import Variable
env = Variable.get('AIRFLOW_VAR_ENV_NAME')
with DAG('ct_sql_xcom',
start_date = datetime(2024, 1, 30),
schedule_interval = None
) as dag:
create_file >> downstream_task
`
Anything else
`[2024-05-06T19:13:57.614+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T19:13:57.615+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T19:13:57.620+0000] {taskinstance.py:1400} INFO - Marking task as UP_FOR_RETRY. dag_id=ct_sql_xcom, task_id=create_file , execution_date=20240506T170918, start_date=20240506T191355, end_date=20240506T191357
[2024-05-06T19:13:57.627+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 233 for task create_file (Object of type tuple is not JSON serializable; 842)`
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: