1414# limitations under the License.
1515
1616import os
17-
1817import eventlet
1918
2019from logshipper .tail import Tail
@@ -27,44 +26,46 @@ def __init__(self, sensor_service, config=None):
2726 super (FileWatchSensor , self ).__init__ (
2827 sensor_service = sensor_service , config = config
2928 )
30- self ._trigger = None
31- self ._logger = self . _sensor_service . get_logger ( __name__ )
32- self ._tail = None
29+ self .log = self . _sensor_service . get_logger ( __name__ )
30+ self .tail = None
31+ self .file_ref = {}
3332
3433 def setup (self ):
35- self ._tail = Tail (filenames = [])
36- self ._tail .handler = self ._handle_line
37- self ._tail .should_run = True
34+ self .tail = Tail (filenames = [])
35+ self .tail .handler = self ._handle_line
36+ self .tail .should_run = True
3837
3938 def run (self ):
40- self ._tail .run ()
39+ self .tail .run ()
4140
4241 def cleanup (self ):
43- if self ._tail :
44- self ._tail .should_run = False
42+ if self .tail :
43+ self .tail .should_run = False
4544
4645 try :
47- self ._tail .notifier .stop ()
46+ self .tail .notifier .stop ()
4847 except Exception :
49- self ._logger .exception ("Unable to stop the tail notifier" )
48+ self .log .exception ("Unable to stop the tail notifier" )
5049
5150 def add_trigger (self , trigger ):
5251 file_path = trigger ["parameters" ].get ("file_path" , None )
5352
5453 if not file_path :
55- self ._logger .error ('Received trigger type without "file_path" field.' )
54+ self .log .error ('Received trigger type without "file_path" field.' )
5655 return
5756
58- self . _trigger = trigger .get ("ref" , None )
57+ trigger = trigger .get ("ref" , None )
5958
60- if not self . _trigger :
61- raise Exception ("Trigger %s did not contain a ref." % trigger )
59+ if not trigger :
60+ raise Exception (f "Trigger { trigger } did not contain a ref." )
6261
6362 # Wait a bit to avoid initialization race in logshipper library
6463 eventlet .sleep (1.0 )
6564
66- self ._tail .add_file (filename = file_path )
67- self ._logger .info ('Added file "%s"' % (file_path ))
65+ self .tail .add_file (filename = file_path )
66+ self .file_ref [file_path ] = trigger
67+
68+ self .log .info (f"Added file '{ file_path } ' ({ trigger } ) to watch list." )
6869
6970 def update_trigger (self , trigger ):
7071 pass
@@ -73,22 +74,28 @@ def remove_trigger(self, trigger):
7374 file_path = trigger ["parameters" ].get ("file_path" , None )
7475
7576 if not file_path :
76- self ._logger .error (' Received trigger type without " file_path" field.' )
77+ self .log .error (" Received trigger type without ' file_path' field." )
7778 return
7879
79- self ._tail .remove_file (filename = file_path )
80- self ._trigger = None
80+ self .tail .remove_file (filename = file_path )
81+ self .file_ref . pop ( file_path )
8182
82- self ._logger .info (' Removed file "%s"' % ( file_path ) )
83+ self .log .info (f" Removed file ' { file_path } ' ( { trigger } ) from watch list." )
8384
8485 def _handle_line (self , file_path , line ):
85- trigger = self ._trigger
86+ if file_path not in self .file_ref :
87+ self .log .error (
88+ f"No reference found for { file_path } , unable to emit trigger!"
89+ )
90+ return
91+
92+ trigger = self .file_ref [file_path ]
8693 payload = {
8794 "file_path" : file_path ,
8895 "file_name" : os .path .basename (file_path ),
8996 "line" : line ,
9097 }
91- self ._logger .debug (
92- "Sending payload %s for trigger %s to sensor_service." , payload , trigger
98+ self .log .debug (
99+ f "Sending payload { payload } for trigger { trigger } to sensor_service."
93100 )
94101 self .sensor_service .dispatch (trigger = trigger , payload = payload )
0 commit comments