@@ -76,24 +76,17 @@ def __init__(self, key, options={}):
7676 self .setLevel (logging .DEBUG )
7777 self .lock = threading .RLock ()
7878
79- # Start the flusher
80- self .flusher_stopped = threading .Event ()
81- self .flusher = threading .Thread (
82- target = self .flush_timer_worker , daemon = True )
83- self .flusher .start ()
84-
85- def flush_timer_worker (self ):
86- while not self .flusher_stopped .wait (self .flush_interval_secs ):
87- try :
88- self .flush ()
89- except Exception as e :
90- self .internalLogger .exception (
91- f'Error in flush_timer_worker: { e } ' )
79+ self .flusher = None
80+
81+ def start_flusher (self ):
82+ if not self .flusher :
83+ self .flusher = threading .Timer (
84+ self .flush_interval_secs , self .flush )
85+ self .flusher .start ()
9286
9387 def close_flusher (self ):
9488 if self .flusher :
95- self .flusher_stopped .set ()
96- self .flusher .join ()
89+ self .flusher .cancel ()
9790 self .flusher = None
9891
9992 def buffer_log (self , message ):
@@ -119,7 +112,10 @@ def buffer_log_sync(self, message):
119112 self .buf_retention_limit )
120113
121114 if self .buf_size >= self .flush_limit :
115+ self .close_flusher ()
122116 self .flush ()
117+ else :
118+ self .start_flusher ()
123119 except Exception as e :
124120 self .internalLogger .exception (f'Error in buffer_log_sync: { e } ' )
125121 finally :
@@ -143,13 +139,15 @@ def try_lock_and_do_flush_request(self, should_block=False):
143139 local_buf = []
144140 if self .lock .acquire (blocking = should_block ):
145141 if not self .buf :
142+ self .close_flusher ()
146143 self .lock .release ()
147144 return
148145
149- if self .buf :
150- local_buf = self .buf .copy ()
151- self .buf .clear ()
152- self .buf_size = 0
146+ local_buf = self .buf .copy ()
147+ self .buf .clear ()
148+ self .buf_size = 0
149+ if local_buf :
150+ self .close_flusher ()
153151 self .lock .release ()
154152
155153 if local_buf :
@@ -301,10 +299,10 @@ def emit(self, record):
301299 'line' : msg ,
302300 'level' : record ['levelname' ] or self .loglevel ,
303301 'app' : self .app or record ['module' ],
304- 'env' : self .env
302+ 'env' : self .env ,
303+ 'meta' : {}
305304 }
306305
307- message ['meta' ] = {}
308306 for key in self .custom_fields :
309307 if key in record :
310308 if isinstance (record [key ], tuple ):
@@ -325,6 +323,9 @@ def emit(self, record):
325323 self .buffer_log (message )
326324
327325 def close (self ):
326+ # Close the flusher
327+ self .close_flusher ()
328+
328329 # First gracefully shut down any threads that are still attempting
329330 # to add log messages to the buffer. This ensures that we don't lose
330331 # any log messages that are in the process of being added to the
@@ -333,10 +334,6 @@ def close(self):
333334 self .worker_thread_pool .shutdown (wait = True )
334335 self .worker_thread_pool = None
335336
336- # Now that we've shut down the worker threads, we can safely close
337- # the flusher thread.
338- self .close_flusher ()
339-
340337 # Manually force a flush of any remaining log messages in the buffer.
341338 # We block here to ensure that the flush completes prior to the
342339 # application exiting and because the probability of this
0 commit comments