closes: https://github.com/apache/airflow/issues/57792
### Problem
When …`retries=0` was explicitly set in DAG `default_args`, the value was being ignored during serialization, causing tasks to fall back to the `default_task_retries` configuration value instead of the explicitly set `0`.
### Example
```
[core]
...
default_task_retries = 3
...
```
```python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG(
'test_dag',
default_args=default_args,
description='Test DAG that echoes a message and fails',
schedule=None,
catchup=False,
tags=['test'],
)
test_task = BashOperator(
task_id='echo_and_fail',
bash_command='echo "This is a test message" && exit 1',
dag=dag,
)
```
### Cause
The serialization logic excluded values that matched schema defaults (e.g., `retries=0`) from the serialized json even when they differed from `client_defaults` (e.g., `retries=3` from config). During deser, when `retries` was missing from the serialized task data it would result it to back to `client_defaults.tasks.retries` (3), losing the explicitly set value (0).
### Fix
Updated the ser/deser logic to only exclude values that match both the schema default AND `client_defaults`. If a value matches the schema default but differs from `client_defaults`, it is included because that means it was explicitly set.
Additionally, added a generic method `get_operator_const_fields()` to detect fields with `"const": true` in the schema (like `_is_sensor` and `_is_mapped`), which should always be excluded when `False`, regardless of `client_defaults`.
### Testing
Before:
```python
os.environ['AIRFLOW__CORE__DEFAULT_TASK_RETRIES'] = '3'
def return_deserialized_task_after_serializng(retries):
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.sdk.bases.operator import OPERATOR_DEFAULTS
dag_0 = DAG(f'test_{retries}', default_args={'retries': retries}, start_date=datetime(2024, 1, 1))
task_0 = BashOperator(task_id=f't{retries}', bash_command='echo', dag=dag_0)
serialized_0 = SerializedDAG.to_dict(dag_0)
task_data_0 = serialized_0["dag"]["tasks"][0]["__var"]
client_defaults = serialized_0.get("client_defaults", {}).get("tasks", {})
deserialized_dag_0 = SerializedDAG.from_dict(serialized_0)
deserialized_task_0 = deserialized_dag_0.get_task(f't{retries}')
return deserialized_task_0
```
```python
ds_task_0 = return_deserialized_task_after_serializng(0)
ds_task_0.retries
Out[8]: 3
ds_task_1 = return_deserialized_task_after_serializng(1)
ds_task_1.retries
Out[16]: 1
```
After:
```python
ds_task_0 = return_deserialized_task_after_serializng(0)
ds_task_0.retries
Out[6]: 0
ds_task_1 = return_deserialized_task_after_serializng(1)
ds_task_1.retries
Out[7]: 1
```
Tested with DAG:
```python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG(
'test_dag',
default_args=default_args,
description='Test DAG that echoes a message and fails',
schedule=None,
catchup=False,
tags=['test'],
)
test_task = BashOperator(
task_id='echo_and_fail',
bash_command='echo "This is a test message" && exit 1',
dag=dag,
)
```
Result with no retries:
<img width="1727" height="937" alt="image" src="https://github.com/user-attachments/assets/f8f0a13e-9dd2-4fb2-ba43-8b0553a16384" />
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information.
In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).