Skip to content

Commit 061b8f9

Browse files
some refactor
1 parent 43d5f01 commit 061b8f9

1 file changed

Lines changed: 46 additions & 34 deletions

File tree

lighter/ws_client.py

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ def __init__(
3838

3939
self.ws = None
4040
self.ping_interval = ping_interval
41-
self.stop_event = threading.Event()
4241

4342
def on_message(self, ws, message):
4443
if isinstance(message, str):
@@ -60,7 +59,6 @@ def on_message(self, ws, message):
6059
# Respond to ping with pong
6160
ws.send(json.dumps({"type": "pong"}))
6261
elif message_type == "pong":
63-
# Noop
6462
pass
6563
else:
6664
self.handle_unhandled_message(message)
@@ -160,57 +158,71 @@ def handle_unhandled_message(self, message):
160158
raise Exception(f"Unhandled message: {message}")
161159

162160
def on_error(self, ws, error):
163-
self.stop_event.set()
164161
raise Exception(f"Error: {error}")
165162

166163
def on_close(self, ws, close_status_code, close_msg):
167-
self.stop_event.set()
168164
raise Exception(f"Closed: {close_status_code} {close_msg}")
169165

170-
def _ping_loop(self):
171-
while not self.stop_event.is_set():
172-
time.sleep(self.ping_interval)
173-
if self.ws and not self.stop_event.is_set():
166+
def _ping_loop(self, stop_event):
167+
while not stop_event.is_set():
168+
stop_event.wait(self.ping_interval)
169+
if self.ws and not stop_event.is_set():
174170
try:
175171
self.ws.send(json.dumps({"type": "ping"}))
176-
except Exception:
172+
except Exception as e:
173+
print(f"Ping failed: {e}")
177174
break
178175

179-
async def _ping_loop_async(self):
180-
while not self.stop_event.is_set():
181-
await asyncio.sleep(self.ping_interval)
182-
if self.ws and not self.stop_event.is_set():
183-
try:
176+
async def _ping_loop_async(self, stop_event):
177+
while not stop_event.is_set():
178+
try:
179+
await asyncio.sleep(self.ping_interval)
180+
if self.ws and not stop_event.is_set():
184181
await self.ws.send(json.dumps({"type": "ping"}))
185-
except Exception:
186-
break
182+
except asyncio.CancelledError:
183+
break
184+
except Exception as e:
185+
print(f"Async ping failed: {e}")
186+
break
187187

188188
def run(self):
189-
self.stop_event.clear()
190-
ws = connect(self.base_url)
191-
self.ws = ws
189+
stop_event = threading.Event()
190+
ping_thread = None
191+
try:
192+
with connect(self.base_url) as ws:
193+
self.ws = ws
194+
ping_thread = threading.Thread(target=self._ping_loop, args=(stop_event,), daemon=True)
195+
ping_thread.start()
192196

193-
ping_thread = threading.Thread(target=self._ping_loop, daemon=True)
194-
ping_thread.start()
197+
for message in ws:
198+
self.on_message(ws, message)
195199

196-
try:
197-
for message in ws:
198-
self.on_message(ws, message)
200+
except Exception as e:
201+
print(f"Connection terminated unexpectedly: {e}")
199202
finally:
200-
self.stop_event.set()
203+
stop_event.set()
201204
self.ws = None
205+
if ping_thread:
206+
ping_thread.join(timeout=1)
202207

203208
async def run_async(self):
204-
self.stop_event.clear()
205-
ws = await connect_async(self.base_url)
206-
self.ws = ws
209+
stop_event = asyncio.Event()
210+
ping_task = None
211+
try:
212+
async with connect_async(self.base_url) as ws:
213+
self.ws = ws
214+
ping_task = asyncio.create_task(self._ping_loop_async(stop_event))
207215

208-
ping_task = asyncio.create_task(self._ping_loop_async())
216+
async for message in ws:
217+
await self.on_message_async(ws, message)
209218

210-
try:
211-
async for message in ws:
212-
await self.on_message_async(ws, message)
219+
except Exception as e:
220+
print(f"Connection terminated unexpectedly: {e}")
213221
finally:
214-
self.stop_event.set()
215-
ping_task.cancel()
222+
stop_event.set()
223+
if ping_task:
224+
ping_task.cancel()
225+
# Wait for the task to acknowledge cancellation
226+
await asyncio.gather(ping_task, return_exceptions=True)
216227
self.ws = None
228+

0 commit comments

Comments
 (0)