Skip to content

Commit ea1dc0a

Browse files
50937, add debug logging for airbyte (#51503)
* 50937, init of debug logging * add more debug logs. * 50937, fix broken tests. * Update .gitignore --------- Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
1 parent e67b741 commit ea1dc0a

5 files changed

Lines changed: 31 additions & 1 deletion

File tree

providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ def __init__(
5959
def get_conn_params(self, conn_id: str) -> Any:
6060
conn = self.get_connection(conn_id)
6161

62+
# Intentionally left the password out, you can modify the log to print it out if you are doing testing.
63+
self.log.debug(
64+
"Connection attributes are: host - %s, url - %s, description - %s",
65+
conn.host,
66+
conn.schema,
67+
conn.description,
68+
)
6269
conn_params: dict = {}
6370
conn_params["host"] = conn.host
6471
conn_params["client_id"] = conn.login
@@ -78,6 +85,7 @@ def create_api_session(self) -> AirbyteAPI:
7885

7986
client = None
8087
if self.conn["proxies"]:
88+
self.log.debug("Creating client proxy...")
8189
client = Session()
8290
client.proxies = self.conn["proxies"]
8391

@@ -111,6 +119,7 @@ def get_job_details(self, job_id: int) -> Any:
111119
job_id=job_id,
112120
)
113121
)
122+
self.log.debug("Job details are: %s", get_job_res.job_response)
114123
return get_job_res.job_response
115124
except Exception as e:
116125
raise AirflowException(e)
@@ -138,12 +147,14 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa
138147
start = time.monotonic()
139148
while True:
140149
if timeout and start + timeout < time.monotonic():
150+
self.log.debug("Canceling job...")
141151
self.cancel_job(job_id=(int(job_id)))
142152
raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
143153
time.sleep(wait_seconds)
144154
try:
145155
job = self.get_job_details(job_id=(int(job_id)))
146156
state = job.status
157+
self.log.debug("Job State: %s. Job Details: %s", state, job)
147158

148159
except AirflowException as err:
149160
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
@@ -161,12 +172,14 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa
161172

162173
def submit_sync_connection(self, connection_id: str) -> Any:
163174
try:
175+
self.log.debug("Creating job request..")
164176
res = self.airbyte_api.jobs.create_job(
165177
request=JobCreateRequest(
166178
connection_id=connection_id,
167179
job_type=JobTypeEnum.SYNC,
168180
)
169181
)
182+
self.log.debug("Job request successful, response: %s", res.job_response)
170183
return res.job_response
171184
except Exception as e:
172185
raise AirflowException(e)
@@ -191,6 +204,7 @@ def test_connection(self):
191204
"""Tests the Airbyte connection by hitting the health API."""
192205
try:
193206
health_check = self.airbyte_api.health.get_health_check()
207+
self.log.debug("Health check details: %s", health_check)
194208
if health_check.status_code == 200:
195209
return True, "Connection successfully tested"
196210
return False, str(health_check.raw_response)

providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,10 @@ def execute(self, context: Context) -> None:
9393
return self.job_id
9494

9595
if not self.deferrable:
96+
self.log.debug("Running in non-deferrable mode...")
9697
hook.wait_for_job(job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout)
9798
else:
99+
self.log.debug("Running in defferable mode in job state %s...", state)
98100
if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE):
99101
self.defer(
100102
timeout=self.execution_timeout,
@@ -126,6 +128,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None:
126128
successful.
127129
"""
128130
if event["status"] == "error":
131+
self.log.debug("Error occurred with context: %s", context)
129132
raise AirflowException(event["message"])
130133

131134
self.log.info("%s completed successfully.", self.task_id)
@@ -134,6 +137,11 @@ def execute_complete(self, context: Context, event: Any = None) -> None:
134137
def on_kill(self):
135138
"""Cancel the job if task is cancelled."""
136139
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
140+
self.log.debug(
141+
"Job status for job_id %s prior to canceling is: %s",
142+
self.job_id,
143+
hook.get_job_status(self.job_id),
144+
)
137145
if self.job_id:
138146
self.log.info("on_kill: cancel the airbyte Job %s", self.job_id)
139147
hook.cancel_job(self.job_id)

providers/airbyte/src/airflow/providers/airbyte/sensors/airbyte.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def poke(self, context: Context) -> bool:
7979

8080
if status == JobStatusEnum.FAILED:
8181
message = f"Job failed: \n{job}"
82+
self.log.debug("Failed with context: %s", context)
8283
raise AirflowException(message)
8384
if status == JobStatusEnum.CANCELLED:
8485
message = f"Job was cancelled: \n{job}"
@@ -117,6 +118,7 @@ def execute(self, context: Context) -> Any:
117118
self.log.info("%s completed successfully.", self.task_id)
118119
return
119120
elif state == JobStatusEnum.FAILED:
121+
self.log.debug("Failed with context: %s", context)
120122
raise AirflowException(f"Job failed:\n{job}")
121123
elif state == JobStatusEnum.CANCELLED:
122124
raise AirflowException(f"Job was cancelled:\n{job}")
@@ -133,6 +135,7 @@ def execute_complete(self, context: Context, event: Any = None) -> None:
133135
successful.
134136
"""
135137
if event["status"] == "error":
138+
self.log.debug("An error occurred with context: %s", context)
136139
raise AirflowException(event["message"])
137140

138141
self.log.info("%s completed successfully.", self.task_id)

providers/airbyte/src/airflow/providers/airbyte/triggers/airbyte.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,8 @@ async def is_still_running(self, hook: AirbyteHook) -> bool:
117117
"""
118118
job_run_status = hook.get_job_status(self.job_id)
119119
if job_run_status in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE):
120+
self.log.debug(
121+
"Job run status is: %s with context: %s", job_run_status, hook.get_job_details(self.job_id)
122+
)
120123
return True
121124
return False

providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection, create_co
6767
job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout
6868
)
6969

70+
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status")
7071
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
71-
def test_on_kill(self, mock_cancel_job, create_connection_without_db):
72+
def test_on_kill(self, mock_cancel_job, mock_get_job_status, create_connection_without_db):
7273
conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com")
7374
create_connection_without_db(conn)
7475

@@ -83,3 +84,4 @@ def test_on_kill(self, mock_cancel_job, create_connection_without_db):
8384
op.on_kill()
8485

8586
mock_cancel_job.assert_called_once_with(self.job_id)
87+
mock_get_job_status.assert_called_once_with(self.job_id)

0 commit comments

Comments
 (0)