diff --git a/installer/conf/container.conf b/installer/conf/container.conf
index 9eaed9b47..a20fdbe5a 100755
--- a/installer/conf/container.conf
+++ b/installer/conf/container.conf
@@ -50,6 +50,18 @@
]
+# Container log
+# Example line which matches the format:
+# {"log"=>"Test 9th January\n", "stream"=>"stdout", "time"=>"2018-01-09T23:14:39.273429353Z", "ContainerID"=>"ee1ec26aa974af81b21fff24cef8ec78bf7ac1558b5de6f1eb1a5b28ecd6d559", "Image"=>"ubuntu", "Name"=>"determined_wilson", "SourceSystem"=>"Containers"}
+# NOTE: The LogEntryTimeStamp is just being appended in the begining of the LogEntry field. This is the actual time the log was generated and the TimeGenerated field in Kusto is different
+
+ type containerlog_sudo_tail
+ pos_file /var/opt/microsoft/docker-cimprov/state/ContainerLogFile.pos.log
+ tag oms.container.log
+ format /\"log\"=>\"(?.*)", \"stream\"=>\"(?.*)", \"time\"=>\"(?.*)", \"ContainerID\"=>\"(?.*)", \"Image\"=>\"(?.*)", \"Name\"=>\"(?.*)", \"SourceSystem\"=>\"(?.*)"}/
+ run_interval 60s
+
+
# Container host inventory
type omi
@@ -83,6 +95,11 @@
type filter_container
+# Seperate filter for container log
+
+ type filter_container_log
+
+
type out_oms_api
log_level debug
@@ -135,6 +152,21 @@
max_retry_wait 9m
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 20m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_log*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 15s
+ max_retry_wait 9m
+
+
type out_oms
log_level info
diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data
index ec0728c01..c49a8d1d0 100644
--- a/installer/datafiles/base_container.data
+++ b/installer/datafiles/base_container.data
@@ -23,11 +23,14 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/filter_docker_log.rb; source/code/plugin/filter_docker_log.rb; 644; root; root
/opt/microsoft/omsagent/plugin/filter_container.rb; source/code/plugin/filter_container.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/filter_container_log.rb; source/code/plugin/filter_container_log.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_podinventory.rb; source/code/plugin/in_kube_podinventory.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_events.rb; source/code/plugin/in_kube_events.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_logs.rb; source/code/plugin/in_kube_logs.rb; 644; root; root
/opt/microsoft/omsagent/plugin/KubernetesApiClient.rb; source/code/plugin/KubernetesApiClient.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/in_containerlog_sudo_tail.rb; source/code/plugin/in_containerlog_sudo_tail.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/containerlogtailfilereader.rb; source/code/plugin/containerlogtailfilereader.rb; 744; root; root
/etc/opt/microsoft/docker-cimprov/container.conf; installer/conf/container.conf; 644; root; root
@@ -85,6 +88,15 @@ WriteInstallInfo() {
}
WriteInstallInfo
+#Setup sudo permission for containerlogtailfilereader
+if [ -z $(cat /etc/sudoers.d/omsagent | grep /containerlogtailfilereader.rb) ]
+then
+ chmod +w /etc/sudoers.d/omsagent
+ echo "#run containerlogtailfilereader.rb for docker-provider" >> /etc/sudoers.d/omsagent
+ echo "omsagent ALL=(ALL) NOPASSWD: /opt/microsoft/omsagent/ruby/bin/ruby /opt/microsoft/omsagent/plugin/containerlogtailfilereader.rb *" >> /etc/sudoers.d/omsagent
+ chmod 440 /etc/sudoers.d/omsagent
+fi
+
# Get the state file in place with proper permissions
touch /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt
chmod 644 /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt
diff --git a/source/code/plugin/containerlogtailfilereader.rb b/source/code/plugin/containerlogtailfilereader.rb
new file mode 100644
index 000000000..2d55b1d73
--- /dev/null
+++ b/source/code/plugin/containerlogtailfilereader.rb
@@ -0,0 +1,396 @@
+
+require 'optparse'
+require 'json'
+require 'logger'
+require_relative 'omslog'
+require 'fluent/filter'
+
+module ContainerLogTailscript
+
+ class ContainerLogNewTail
+ def initialize(paths)
+ @paths = paths
+ @tails = {}
+ @pos_file = $options[:pos_file]
+ @read_from_head = $options[:read_from_head]
+ @pf = nil
+ @pf_file = nil
+
+ @log = Logger.new(STDERR)
+ @log.formatter = proc do |severity, time, progname, msg|
+ "#{severity} #{msg}\n"
+ end
+ end
+
+ attr_reader :paths
+
+ def start
+ start_watchers(@paths) unless @paths.empty?
+ end
+
+ def shutdown
+ @pf_file.close if @pf_file
+ end
+
+ def setup_watcher(path, pe)
+ tw = TailWatcher.new(path, pe, @read_from_head, @log, &method(:receive_lines))
+ tw.on_notify
+ tw
+ end
+
+ def start_watchers(paths)
+ if @pos_file
+ @pf_file = File.open(@pos_file, File::RDWR|File::CREAT)
+ @pf_file.sync = true
+ @pf = PositionFile.parse(@pf_file)
+ end
+ paths.each { |path|
+ pe = nil
+ if @pf
+ pe = @pf[path] #pe is FilePositionEntry instance
+ if pe.read_inode.zero?
+ begin
+ pe.update(File::Stat.new(path).ino, 0)
+ rescue Errno::ENOENT
+ @log.warn "#{path} not found. Continuing without tailing it."
+ end
+ end
+ end
+
+ @tails[path] = setup_watcher(path, pe)
+ }
+ end
+
+ def receive_lines(lines, tail_watcher)
+ unless lines.empty?
+ puts lines
+ end
+ return true
+ end
+
+ class TailWatcher
+ def initialize(path, pe, read_from_head, log, &receive_lines)
+ @path = path
+ @pe = pe || MemoryPositionEntry.new
+ @read_from_head = read_from_head
+ @log = log
+ @receive_lines = receive_lines
+ @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate))
+ @io_handler = nil
+ @containerIDFilePath = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/"
+ end
+
+ attr_reader :path
+
+ def wrap_receive_lines(lines)
+ newLines = []
+ containerID = @path.split('/').last.chomp('-json.log')
+ containerInspectInformation = @containerIDFilePath + containerID
+ tempContainerInfo = {}
+ begin
+ File.open(containerInspectInformation) { |f| tempContainerInfo = JSON.parse(f.readline)}
+ lines.each { |line|
+ unless line.empty?
+ newLine = {}
+ newLine = JSON.parse(line)
+ newLine["ContainerID"] = containerID
+ newLine["Image"] = tempContainerInfo["Image"]
+ newLine["Name"] = tempContainerInfo["ElementName"]
+ newLine["SourceSystem"] = "Containers"
+ newLines.push(newLine)
+ end
+ }
+ rescue Exception => e
+ #File doesn't exist or error in reading the data
+ @log.error "Caught exception when opening file -> #{e}"
+ end
+ @receive_lines.call(newLines, self)
+ end
+
+ def on_notify
+ @rotate_handler.on_notify if @rotate_handler
+ return unless @io_handler
+ @io_handler.on_notify
+ end
+
+ def on_rotate(io)
+ if io
+ # first time
+ stat = io.stat
+ fsize = stat.size
+ inode = stat.ino
+
+ last_inode = @pe.read_inode
+ if @read_from_head
+ pos = 0
+ @pe.update(inode, pos)
+ elsif inode == last_inode
+ # rotated file has the same inode number as the pos_file.
+ # seek to the saved position
+ pos = @pe.read_pos
+ elsif last_inode != 0
+ # read data from the head of the rotated file.
+ pos = 0
+ @pe.update(inode, pos)
+ else
+ # this is the first MemoryPositionEntry for the first time fluentd started.
+ # seeks to the end of the file to know where to start tailing
+ pos = fsize
+ @pe.update(inode, pos)
+ end
+ io.seek(pos)
+ @io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines))
+ else
+ @io_handler = NullIOHandler.new
+ end
+ end
+
+ class IOHandler
+ def initialize(io, pe, log, &receive_lines)
+ @log = log
+ @io = io
+ @pe = pe
+ @log = log
+ @read_lines_limit = 100
+ @receive_lines = receive_lines
+ @buffer = ''.force_encoding('ASCII-8BIT')
+ @iobuf = ''.force_encoding('ASCII-8BIT')
+ @lines = []
+ end
+
+ attr_reader :io
+
+ def on_notify
+ begin
+ read_more = false
+ if @lines.empty?
+ begin
+ while true
+ if @buffer.empty?
+ @io.readpartial(512, @buffer)
+ else
+ @buffer << @io.readpartial(512, @iobuf)
+ end
+ while line = @buffer.slice!(/.*?\n/m)
+ @lines << line
+ end
+ if @lines.size >= @read_lines_limit
+ # not to use too much memory in case the file is very large
+ read_more = true
+ break
+ end
+ end
+ rescue EOFError
+ end
+ end
+
+ unless @lines.empty?
+ if @receive_lines.call(@lines)
+ @pe.update_pos(@io.pos - @buffer.bytesize)
+ @lines.clear
+ else
+ read_more = false
+ end
+ end
+ end while read_more
+
+ rescue
+ @log.error "#{$!.to_s}"
+ close
+ end
+
+ def close
+ @io.close unless @io.closed?
+ end
+ end
+
+ class NullIOHandler
+ def initialize
+ end
+
+ def io
+ end
+
+ def on_notify
+ end
+
+ def close
+ end
+ end
+
+ class RotateHandler
+ def initialize(path, log, &on_rotate)
+ @path = path
+ @inode = nil
+ @fsize = -1 # first
+ @on_rotate = on_rotate
+ @log = log
+ end
+
+ def on_notify
+ begin
+ stat = File.stat(@path) #returns a File::Stat object for the file named @path
+ inode = stat.ino
+ fsize = stat.size
+ rescue Errno::ENOENT
+ # moved or deleted
+ inode = nil
+ fsize = 0
+ end
+
+ begin
+ if @inode != inode || fsize < @fsize
+ # rotated or truncated
+ begin
+ io = File.open(@path)
+ rescue Errno::ENOENT
+ end
+ @on_rotate.call(io)
+ end
+ @inode = inode
+ @fsize = fsize
+ end
+
+ rescue
+ @log.error "#{$!.to_s}"
+ end
+ end
+ end
+
+
+ class PositionFile
+ UNWATCHED_POSITION = 0xffffffffffffffff
+
+ def initialize(file, map, last_pos)
+ @file = file
+ @map = map
+ @last_pos = last_pos
+ end
+
+ def [](path)
+ if m = @map[path]
+ return m
+ end
+
+ @file.pos = @last_pos
+ @file.write path
+ @file.write "\t"
+ seek = @file.pos
+ @file.write "0000000000000000\t0000000000000000\n"
+ @last_pos = @file.pos
+
+ @map[path] = FilePositionEntry.new(@file, seek)
+ end
+
+ def self.parse(file)
+ compact(file)
+
+ map = {}
+ file.pos = 0
+ file.each_line {|line|
+ m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
+ next unless m
+ path = m[1]
+ seek = file.pos - line.bytesize + path.bytesize + 1
+ map[path] = FilePositionEntry.new(file, seek)
+ }
+ new(file, map, file.pos)
+ end
+
+ # Clean up unwatched file entries
+ def self.compact(file)
+ file.pos = 0
+ existent_entries = file.each_line.map { |line|
+ m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
+ next unless m
+ path = m[1]
+ pos = m[2].to_i(16)
+ ino = m[3].to_i(16)
+ # 32bit inode converted to 64bit at this phase
+ pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\n" % [path, pos, ino])
+ }.compact
+
+ file.pos = 0
+ file.truncate(0)
+ file.write(existent_entries.join)
+ end
+ end
+
+ # pos inode
+ # ffffffffffffffff\tffffffffffffffff\n
+ class FilePositionEntry
+ POS_SIZE = 16
+ INO_OFFSET = 17
+ INO_SIZE = 16
+ LN_OFFSET = 33
+ SIZE = 34
+
+ def initialize(file, seek)
+ @file = file
+ @seek = seek
+ end
+
+ def update(ino, pos)
+ @file.pos = @seek
+ @file.write "%016x\t%016x" % [pos, ino]
+ end
+
+ def update_pos(pos)
+ @file.pos = @seek
+ @file.write "%016x" % pos
+ end
+
+ def read_inode
+ @file.pos = @seek + INO_OFFSET
+ raw = @file.read(INO_SIZE)
+ raw ? raw.to_i(16) : 0
+ end
+
+ def read_pos
+ @file.pos = @seek
+ raw = @file.read(POS_SIZE)
+ raw ? raw.to_i(16) : 0
+ end
+ end
+
+ class MemoryPositionEntry
+ def initialize
+ @pos = 0
+ @inode = 0
+ end
+
+ def update(ino, pos)
+ @inode = ino
+ @pos = pos
+ end
+
+ def update_pos(pos)
+ @pos = pos
+ end
+
+ def read_pos
+ @pos
+ end
+
+ def read_inode
+ @inode
+ end
+ end
+ end
+end
+
+if __FILE__ == $0
+ $options = {:read_from_head => false}
+ OptionParser.new do |opts|
+ opts.on("-p", "--posfile [POSFILE]") do |p|
+ $options[:pos_file] = p
+ end
+ opts.on("-h", "--[no-]readfromhead") do |h|
+ $options[:read_from_head] = h
+ end
+ end.parse!
+ a = ContainerLogTailscript::ContainerLogNewTail.new(ARGV)
+ a.start
+ a.shutdown
+end
+
diff --git a/source/code/plugin/filter_container_log.rb b/source/code/plugin/filter_container_log.rb
new file mode 100644
index 000000000..21e146a35
--- /dev/null
+++ b/source/code/plugin/filter_container_log.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+require 'fluent/filter'
+
+module Fluent
+ require 'logger'
+ class PassThruFilter < Filter
+ Fluent::Plugin.register_filter('filter_container_log', self)
+
+ def configure(conf)
+ super
+ end
+
+ def start
+ super
+ @hostname = OMS::Common.get_hostname or "Unknown host"
+ end
+
+ def shutdown
+ super
+ end
+
+ def filter(tag, time, record)
+ begin
+ #Try to force utf-8 encoding on the string so that all characters can flow through to
+ #$log.info "before : #{record['LogEntry']}"
+ record['LogEntry'].force_encoding('UTF-8')
+ rescue
+ $log.error "Failed to convert record['LogEntry'] : '#{record['LogEntry']}' to UTF-8 using force_encoding."
+ $log.error "Current string encoding for record['LogEntry'] is #{record['LogEntry'].encoding}"
+ end
+
+ record['Computer'] = @hostname
+ wrapper = {
+ "DataType"=>"CONTAINER_LOG_BLOB",
+ "IPName"=>"Containers",
+ "DataItems"=>[record.each{|k,v| record[k]=v}]
+ }
+ wrapper
+ end
+ end
+end