Fallback to default value if [aws] cloudwatch_task_handler_json_serializer not set#36851
Merged
eladkal merged 1 commit intoapache:mainfrom Jan 18, 2024
Merged
Conversation
f3e5198 to
9d44a6e
Compare
hussein-awala
approved these changes
Jan 17, 2024
Member
hussein-awala
left a comment
There was a problem hiding this comment.
LGTM
However, we should investigate to know why it doesn't load the default value
9d44a6e to
c325f8b
Compare
Contributor
Author
|
Update tests, just figure out that |
Contributor
Agree, why the condition here is not satisfied? |
Contributor
Author
|
That is a good question, and unfortunetly I do not have an answer yet. I've just create simple snippet for reproduce it locally and debug purpouse import tempfile
from unittest import mock
from pathlib import Path
def get_airflow_config_defaults():
with tempfile.TemporaryDirectory(prefix="airflow_home_") as td:
temp_dir = Path(td)
airflow_config = temp_dir / "airflow.cfg"
airflow_config.touch()
with mock.patch.dict("os.environ", {"AIRFLOW_HOME": str(temp_dir), "AIRFLOW_CONFIG": str(airflow_config)}):
from airflow.configuration import conf
return conf._default_values._sections
for section, options in get_airflow_config_defaults().items():
print(f" {section} ".center(70, "-"))
for option, value in options.items():
print(f"{option}: {value!r}")And it printed only default values from the Airflow, but not Airflow Providers Huge output under the spoiler-------------------------------- core --------------------------------
dags_folder: '/var/folders/g7/vy97v5m51_v9hdpf_qxqzqvr0000gn/T/airflow_home_9n0ggwff/dags'
hostname_callable: 'airflow.utils.net.getfqdn'
might_contain_dag_callable: 'airflow.utils.file.might_contain_dag_via_default_heuristic'
default_timezone: 'utc'
executor: 'SequentialExecutor'
auth_manager: 'airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager'
parallelism: '32'
max_active_tasks_per_dag: '16'
dags_are_paused_at_creation: 'True'
max_active_runs_per_dag: '16'
load_examples: 'True'
plugins_folder: '/var/folders/g7/vy97v5m51_v9hdpf_qxqzqvr0000gn/T/airflow_home_9n0ggwff/plugins'
execute_tasks_new_python_interpreter: 'False'
fernet_key: ''
donot_pickle: 'True'
dagbag_import_timeout: '30.0'
dagbag_import_error_tracebacks: 'True'
dagbag_import_error_traceback_depth: '2'
dag_file_processor_timeout: '50'
task_runner: 'StandardTaskRunner'
default_impersonation: ''
security: ''
unit_test_mode: 'False'
enable_xcom_pickling: 'False'
allowed_deserialization_classes: 'airflow\\..*'
killed_task_cleanup_time: '60'
dag_run_conf_overrides_params: 'True'
dag_discovery_safe_mode: 'True'
dag_ignore_file_syntax: 'regexp'
default_task_retries: '0'
default_task_retry_delay: '300'
max_task_retry_delay: '86400'
default_task_weight_rule: 'downstream'
default_task_execution_timeout: ''
min_serialized_dag_update_interval: '30'
compress_serialized_dags: 'False'
min_serialized_dag_fetch_interval: '10'
max_num_rendered_ti_fields_per_task: '30'
check_slas: 'True'
xcom_backend: 'airflow.models.xcom.BaseXCom'
lazy_load_plugins: 'True'
lazy_discover_providers: 'True'
hide_sensitive_var_conn_fields: 'True'
sensitive_var_conn_names: ''
default_pool_task_slot_count: '128'
max_map_length: '1024'
daemon_umask: '0o077'
database_access_isolation: 'False'
test_connection: 'Disabled'
------------------------------ database ------------------------------
alembic_ini_file_path: 'alembic.ini'
sql_alchemy_conn: 'sqlite:////var/folders/g7/vy97v5m51_v9hdpf_qxqzqvr0000gn/T/airflow_home_9n0ggwff/airflow.db'
sql_engine_encoding: 'utf-8'
sql_alchemy_pool_enabled: 'True'
sql_alchemy_pool_size: '5'
sql_alchemy_max_overflow: '10'
sql_alchemy_pool_recycle: '1800'
sql_alchemy_pool_pre_ping: 'True'
sql_alchemy_schema: ''
load_default_connections: 'True'
max_db_retries: '3'
check_migrations: 'True'
------------------------------ logging -------------------------------
base_log_folder: '/var/folders/g7/vy97v5m51_v9hdpf_qxqzqvr0000gn/T/airflow_home_9n0ggwff/logs'
remote_logging: 'False'
remote_log_conn_id: ''
delete_local_logs: 'False'
google_key_path: ''
remote_base_log_folder: ''
remote_task_handler_kwargs: ''
encrypt_s3_logs: 'False'
logging_level: 'INFO'
celery_logging_level: ''
fab_logging_level: 'WARNING'
logging_config_class: ''
colored_console_log: 'True'
colored_log_format: '[%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s'
colored_formatter_class: 'airflow.utils.log.colored_log.CustomTTYColoredFormatter'
log_format: '[%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s'
simple_log_format: '%%(asctime)s %%(levelname)s - %%(message)s'
dag_processor_log_target: 'file'
dag_processor_log_format: '[%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s'
log_formatter_class: 'airflow.utils.log.timezone_aware.TimezoneAware'
secret_mask_adapter: ''
task_log_prefix_template: ''
log_filename_template: 'dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log'
log_processor_filename_template: '{{ filename }}.log'
dag_processor_manager_log_location: '/var/folders/g7/vy97v5m51_v9hdpf_qxqzqvr0000gn/T/airflow_home_9n0ggwff/logs/dag_processor_manager/dag_processor_manager.log'
task_log_reader: 'task'
extra_logger_names: ''
worker_log_server_port: '8793'
trigger_log_server_port: '8794'
file_task_handler_new_folder_permissions: '0o775'
file_task_handler_new_file_permissions: '0o664'
celery_stdout_stderr_separation: 'False'
enable_task_context_logger: 'True'
------------------------------ metrics -------------------------------
metrics_use_pattern_match: 'False'
metrics_allow_list: ''
metrics_block_list: ''
statsd_on: 'False'
statsd_host: 'localhost'
statsd_port: '8125'
statsd_prefix: 'airflow'
stat_name_handler: ''
statsd_datadog_enabled: 'False'
statsd_datadog_tags: ''
statsd_datadog_metrics_tags: 'True'
statsd_disabled_tags: 'job_id,run_id'
statsd_influxdb_enabled: 'False'
otel_on: 'False'
otel_host: 'localhost'
otel_port: '8889'
otel_prefix: 'airflow'
otel_interval_milliseconds: '60000'
otel_debugging_on: 'False'
otel_ssl_active: 'False'
------------------------------ secrets -------------------------------
backend: ''
backend_kwargs: ''
use_cache: 'False'
cache_ttl_seconds: '900'
-------------------------------- cli ---------------------------------
api_client: 'airflow.api.client.local_client'
endpoint_url: 'http://localhost:8080'
------------------------------- debug --------------------------------
fail_fast: 'False'
-------------------------------- api ---------------------------------
enable_experimental_api: 'False'
auth_backends: 'airflow.api.auth.backend.session'
maximum_page_limit: '100'
fallback_page_limit: '100'
google_oauth2_audience: ''
google_key_path: ''
access_control_allow_headers: ''
access_control_allow_methods: ''
access_control_allow_origins: ''
enable_xcom_deserialize_support: 'False'
------------------------------ lineage -------------------------------
backend: ''
----------------------------- operators ------------------------------
default_owner: 'airflow'
default_deferrable: 'false'
default_cpus: '1'
default_ram: '512'
default_disk: '512'
default_gpus: '0'
default_queue: 'default'
allow_illegal_arguments: 'False'
----------------------------- webserver ------------------------------
access_denied_message: 'Access is Denied'
config_file: '/var/folders/g7/vy97v5m51_v9hdpf_qxqzqvr0000gn/T/airflow_home_9n0ggwff/webserver_config.py'
base_url: 'http://localhost:8080'
default_ui_timezone: 'UTC'
web_server_host: '0.0.0.0'
web_server_port: '8080'
web_server_ssl_cert: ''
web_server_ssl_key: ''
session_backend: 'database'
web_server_master_timeout: '120'
web_server_worker_timeout: '120'
worker_refresh_batch_size: '1'
worker_refresh_interval: '6000'
reload_on_plugin_change: 'False'
secret_key: 'tO0W77E9bkT90WjDG6XozA=='
workers: '4'
worker_class: 'sync'
access_logfile: '-'
error_logfile: '-'
access_logformat: ''
expose_config: 'False'
expose_hostname: 'False'
expose_stacktrace: 'False'
dag_default_view: 'grid'
dag_orientation: 'LR'
grid_view_sorting_order: 'topological'
log_fetch_timeout_sec: '5'
log_fetch_delay_sec: '2'
log_auto_tailing_offset: '30'
log_animation_speed: '1000'
hide_paused_dags_by_default: 'False'
page_size: '100'
navbar_color: '#fff'
navbar_text_color: '#51504f'
navbar_hover_color: '#eee'
navbar_text_hover_color: '#51504f'
navbar_logo_text_color: '#51504f'
default_dag_run_display_number: '25'
enable_proxy_fix: 'False'
proxy_fix_x_for: '1'
proxy_fix_x_proto: '1'
proxy_fix_x_host: '1'
proxy_fix_x_port: '1'
proxy_fix_x_prefix: '1'
cookie_secure: 'False'
cookie_samesite: 'Lax'
default_wrap: 'False'
x_frame_enabled: 'True'
show_recent_stats_for_completed_runs: 'True'
update_fab_perms: 'True'
session_lifetime_minutes: '43200'
instance_name_has_markup: 'False'
auto_refresh_interval: '3'
warn_deployment_exposure: 'True'
audit_view_excluded_events: 'gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data'
enable_swagger_ui: 'True'
run_internal_api: 'False'
auth_rate_limited: 'True'
auth_rate_limit: '5 per 40 second'
caching_hash_method: 'md5'
show_trigger_form_if_no_params: 'False'
allow_raw_html_descriptions: 'False'
allowed_payload_size: '1.0'
------------------------------- email --------------------------------
email_backend: 'airflow.utils.email.send_email_smtp'
email_conn_id: 'smtp_default'
default_email_on_retry: 'True'
default_email_on_failure: 'True'
ssl_context: 'default'
-------------------------------- smtp --------------------------------
smtp_host: 'localhost'
smtp_starttls: 'True'
smtp_ssl: 'False'
smtp_port: '25'
smtp_mail_from: 'airflow@example.com'
smtp_timeout: '30'
smtp_retry_limit: '5'
------------------------------- sentry -------------------------------
sentry_on: 'false'
sentry_dsn: ''
----------------------------- scheduler ------------------------------
job_heartbeat_sec: '5'
scheduler_heartbeat_sec: '5'
local_task_job_heartbeat_sec: '0'
num_runs: '-1'
scheduler_idle_sleep_time: '1'
min_file_process_interval: '30'
parsing_cleanup_interval: '60'
stale_dag_threshold: '50'
dag_dir_list_interval: '300'
print_stats_interval: '30'
pool_metrics_interval: '5.0'
scheduler_health_check_threshold: '30'
enable_health_check: 'False'
scheduler_health_check_server_host: '0.0.0.0'
scheduler_health_check_server_port: '8974'
orphaned_tasks_check_interval: '300.0'
child_process_log_directory: '/var/folders/g7/vy97v5m51_v9hdpf_qxqzqvr0000gn/T/airflow_home_9n0ggwff/logs/scheduler'
scheduler_zombie_task_threshold: '300'
zombie_detection_interval: '10.0'
catchup_by_default: 'True'
ignore_first_depends_on_past_by_default: 'True'
max_tis_per_query: '16'
use_row_level_locking: 'True'
max_dagruns_to_create_per_loop: '10'
max_dagruns_per_loop_to_schedule: '20'
schedule_after_task_execution: 'True'
parsing_pre_import_modules: 'True'
parsing_processes: '2'
file_parsing_sort_mode: 'modified_time'
standalone_dag_processor: 'False'
max_callbacks_per_loop: '20'
dag_stale_not_seen_duration: '600'
use_job_schedule: 'True'
allow_trigger_in_future: 'False'
trigger_timeout_check_interval: '15'
task_queued_timeout: '600.0'
task_queued_timeout_check_interval: '120.0'
allowed_run_id_pattern: '^[A-Za-z0-9_.~:+-]+$'
create_cron_data_intervals: 'True'
----------------------------- triggerer ------------------------------
default_capacity: '1000'
job_heartbeat_sec: '5'
triggerer_health_check_threshold: '30'
------------------------------ kerberos ------------------------------
ccache: '/tmp/airflow_krb5_ccache'
principal: 'airflow'
reinit_frequency: '3600'
kinit_path: 'kinit'
keytab: 'airflow.keytab'
forwardable: 'True'
include_ip: 'True'
------------------------------ sensors -------------------------------
default_timeout: '604800' |
Contributor
Author
|
This one load default providers config with conf.make_sure_configuration_loaded(True):
return conf._default_values._sectionsAnyway fix in this PR also required for Airflow 2.6 where no Providers configurations exists. |
vincbeck
approved these changes
Jan 17, 2024
dirrao
approved these changes
Jan 18, 2024
eladkal
approved these changes
Jan 18, 2024
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Pretty interesting bug initially reported in Slack
I was able to reproduce it locally if for some reason
[aws] cloudwatch_task_handler_json_serializerconfiguration not set into theairflow.cfgor in environment variable, it doesn't fallback to default value into the provider configuration as result worker failed with^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.