From 7ab68fd7eb6ddf8b8d2ffc08fe72d5781ccc6f2c Mon Sep 17 00:00:00 2001 From: Dilip Raghunathan Date: Tue, 4 Sep 2018 13:08:52 -0700 Subject: [PATCH] Revert "Dilipr/fluentd config updates" --- installer/conf/container.conf | 32 ++ installer/datafiles/base_container.data | 12 + .../code/plugin/containerlogtailfilereader.rb | 396 ++++++++++++++++++ source/code/plugin/filter_container_log.rb | 42 ++ 4 files changed, 482 insertions(+) create mode 100644 source/code/plugin/containerlogtailfilereader.rb create mode 100644 source/code/plugin/filter_container_log.rb diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 9eaed9b47..a20fdbe5a 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -50,6 +50,18 @@ ] +# Container log +# Example line which matches the format: +# {"log"=>"Test 9th January\n", "stream"=>"stdout", "time"=>"2018-01-09T23:14:39.273429353Z", "ContainerID"=>"ee1ec26aa974af81b21fff24cef8ec78bf7ac1558b5de6f1eb1a5b28ecd6d559", "Image"=>"ubuntu", "Name"=>"determined_wilson", "SourceSystem"=>"Containers"} +# NOTE: The LogEntryTimeStamp is just being appended in the begining of the LogEntry field. This is the actual time the log was generated and the TimeGenerated field in Kusto is different + + type containerlog_sudo_tail + pos_file /var/opt/microsoft/docker-cimprov/state/ContainerLogFile.pos.log + tag oms.container.log + format /\"log\"=>\"(?.*)", \"stream\"=>\"(?.*)", \"time\"=>\"(?.*)", \"ContainerID\"=>\"(?.*)", \"Image\"=>\"(?.*)", \"Name\"=>\"(?.*)", \"SourceSystem\"=>\"(?.*)"}/ + run_interval 60s + + # Container host inventory type omi @@ -83,6 +95,11 @@ type filter_container +# Seperate filter for container log + + type filter_container_log + + type out_oms_api log_level debug @@ -135,6 +152,21 @@ max_retry_wait 9m + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 20m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_log*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 15s + max_retry_wait 9m + + type out_oms log_level info diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index ec0728c01..c49a8d1d0 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -23,11 +23,14 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/filter_docker_log.rb; source/code/plugin/filter_docker_log.rb; 644; root; root /opt/microsoft/omsagent/plugin/filter_container.rb; source/code/plugin/filter_container.rb; 644; root; root +/opt/microsoft/omsagent/plugin/filter_container_log.rb; source/code/plugin/filter_container_log.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_podinventory.rb; source/code/plugin/in_kube_podinventory.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_events.rb; source/code/plugin/in_kube_events.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_logs.rb; source/code/plugin/in_kube_logs.rb; 644; root; root /opt/microsoft/omsagent/plugin/KubernetesApiClient.rb; source/code/plugin/KubernetesApiClient.rb; 644; root; root +/opt/microsoft/omsagent/plugin/in_containerlog_sudo_tail.rb; source/code/plugin/in_containerlog_sudo_tail.rb; 644; root; root +/opt/microsoft/omsagent/plugin/containerlogtailfilereader.rb; source/code/plugin/containerlogtailfilereader.rb; 744; root; root /etc/opt/microsoft/docker-cimprov/container.conf; installer/conf/container.conf; 644; root; root @@ -85,6 +88,15 @@ WriteInstallInfo() { } WriteInstallInfo +#Setup sudo permission for containerlogtailfilereader +if [ -z $(cat /etc/sudoers.d/omsagent | grep /containerlogtailfilereader.rb) ] +then + chmod +w /etc/sudoers.d/omsagent + echo "#run containerlogtailfilereader.rb for docker-provider" >> /etc/sudoers.d/omsagent + echo "omsagent ALL=(ALL) NOPASSWD: /opt/microsoft/omsagent/ruby/bin/ruby /opt/microsoft/omsagent/plugin/containerlogtailfilereader.rb *" >> /etc/sudoers.d/omsagent + chmod 440 /etc/sudoers.d/omsagent +fi + # Get the state file in place with proper permissions touch /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt chmod 644 /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt diff --git a/source/code/plugin/containerlogtailfilereader.rb b/source/code/plugin/containerlogtailfilereader.rb new file mode 100644 index 000000000..2d55b1d73 --- /dev/null +++ b/source/code/plugin/containerlogtailfilereader.rb @@ -0,0 +1,396 @@ + +require 'optparse' +require 'json' +require 'logger' +require_relative 'omslog' +require 'fluent/filter' + +module ContainerLogTailscript + + class ContainerLogNewTail + def initialize(paths) + @paths = paths + @tails = {} + @pos_file = $options[:pos_file] + @read_from_head = $options[:read_from_head] + @pf = nil + @pf_file = nil + + @log = Logger.new(STDERR) + @log.formatter = proc do |severity, time, progname, msg| + "#{severity} #{msg}\n" + end + end + + attr_reader :paths + + def start + start_watchers(@paths) unless @paths.empty? + end + + def shutdown + @pf_file.close if @pf_file + end + + def setup_watcher(path, pe) + tw = TailWatcher.new(path, pe, @read_from_head, @log, &method(:receive_lines)) + tw.on_notify + tw + end + + def start_watchers(paths) + if @pos_file + @pf_file = File.open(@pos_file, File::RDWR|File::CREAT) + @pf_file.sync = true + @pf = PositionFile.parse(@pf_file) + end + paths.each { |path| + pe = nil + if @pf + pe = @pf[path] #pe is FilePositionEntry instance + if pe.read_inode.zero? + begin + pe.update(File::Stat.new(path).ino, 0) + rescue Errno::ENOENT + @log.warn "#{path} not found. Continuing without tailing it." + end + end + end + + @tails[path] = setup_watcher(path, pe) + } + end + + def receive_lines(lines, tail_watcher) + unless lines.empty? + puts lines + end + return true + end + + class TailWatcher + def initialize(path, pe, read_from_head, log, &receive_lines) + @path = path + @pe = pe || MemoryPositionEntry.new + @read_from_head = read_from_head + @log = log + @receive_lines = receive_lines + @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) + @io_handler = nil + @containerIDFilePath = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/" + end + + attr_reader :path + + def wrap_receive_lines(lines) + newLines = [] + containerID = @path.split('/').last.chomp('-json.log') + containerInspectInformation = @containerIDFilePath + containerID + tempContainerInfo = {} + begin + File.open(containerInspectInformation) { |f| tempContainerInfo = JSON.parse(f.readline)} + lines.each { |line| + unless line.empty? + newLine = {} + newLine = JSON.parse(line) + newLine["ContainerID"] = containerID + newLine["Image"] = tempContainerInfo["Image"] + newLine["Name"] = tempContainerInfo["ElementName"] + newLine["SourceSystem"] = "Containers" + newLines.push(newLine) + end + } + rescue Exception => e + #File doesn't exist or error in reading the data + @log.error "Caught exception when opening file -> #{e}" + end + @receive_lines.call(newLines, self) + end + + def on_notify + @rotate_handler.on_notify if @rotate_handler + return unless @io_handler + @io_handler.on_notify + end + + def on_rotate(io) + if io + # first time + stat = io.stat + fsize = stat.size + inode = stat.ino + + last_inode = @pe.read_inode + if @read_from_head + pos = 0 + @pe.update(inode, pos) + elsif inode == last_inode + # rotated file has the same inode number as the pos_file. + # seek to the saved position + pos = @pe.read_pos + elsif last_inode != 0 + # read data from the head of the rotated file. + pos = 0 + @pe.update(inode, pos) + else + # this is the first MemoryPositionEntry for the first time fluentd started. + # seeks to the end of the file to know where to start tailing + pos = fsize + @pe.update(inode, pos) + end + io.seek(pos) + @io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines)) + else + @io_handler = NullIOHandler.new + end + end + + class IOHandler + def initialize(io, pe, log, &receive_lines) + @log = log + @io = io + @pe = pe + @log = log + @read_lines_limit = 100 + @receive_lines = receive_lines + @buffer = ''.force_encoding('ASCII-8BIT') + @iobuf = ''.force_encoding('ASCII-8BIT') + @lines = [] + end + + attr_reader :io + + def on_notify + begin + read_more = false + if @lines.empty? + begin + while true + if @buffer.empty? + @io.readpartial(512, @buffer) + else + @buffer << @io.readpartial(512, @iobuf) + end + while line = @buffer.slice!(/.*?\n/m) + @lines << line + end + if @lines.size >= @read_lines_limit + # not to use too much memory in case the file is very large + read_more = true + break + end + end + rescue EOFError + end + end + + unless @lines.empty? + if @receive_lines.call(@lines) + @pe.update_pos(@io.pos - @buffer.bytesize) + @lines.clear + else + read_more = false + end + end + end while read_more + + rescue + @log.error "#{$!.to_s}" + close + end + + def close + @io.close unless @io.closed? + end + end + + class NullIOHandler + def initialize + end + + def io + end + + def on_notify + end + + def close + end + end + + class RotateHandler + def initialize(path, log, &on_rotate) + @path = path + @inode = nil + @fsize = -1 # first + @on_rotate = on_rotate + @log = log + end + + def on_notify + begin + stat = File.stat(@path) #returns a File::Stat object for the file named @path + inode = stat.ino + fsize = stat.size + rescue Errno::ENOENT + # moved or deleted + inode = nil + fsize = 0 + end + + begin + if @inode != inode || fsize < @fsize + # rotated or truncated + begin + io = File.open(@path) + rescue Errno::ENOENT + end + @on_rotate.call(io) + end + @inode = inode + @fsize = fsize + end + + rescue + @log.error "#{$!.to_s}" + end + end + end + + + class PositionFile + UNWATCHED_POSITION = 0xffffffffffffffff + + def initialize(file, map, last_pos) + @file = file + @map = map + @last_pos = last_pos + end + + def [](path) + if m = @map[path] + return m + end + + @file.pos = @last_pos + @file.write path + @file.write "\t" + seek = @file.pos + @file.write "0000000000000000\t0000000000000000\n" + @last_pos = @file.pos + + @map[path] = FilePositionEntry.new(@file, seek) + end + + def self.parse(file) + compact(file) + + map = {} + file.pos = 0 + file.each_line {|line| + m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) + next unless m + path = m[1] + seek = file.pos - line.bytesize + path.bytesize + 1 + map[path] = FilePositionEntry.new(file, seek) + } + new(file, map, file.pos) + end + + # Clean up unwatched file entries + def self.compact(file) + file.pos = 0 + existent_entries = file.each_line.map { |line| + m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) + next unless m + path = m[1] + pos = m[2].to_i(16) + ino = m[3].to_i(16) + # 32bit inode converted to 64bit at this phase + pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\n" % [path, pos, ino]) + }.compact + + file.pos = 0 + file.truncate(0) + file.write(existent_entries.join) + end + end + + # pos inode + # ffffffffffffffff\tffffffffffffffff\n + class FilePositionEntry + POS_SIZE = 16 + INO_OFFSET = 17 + INO_SIZE = 16 + LN_OFFSET = 33 + SIZE = 34 + + def initialize(file, seek) + @file = file + @seek = seek + end + + def update(ino, pos) + @file.pos = @seek + @file.write "%016x\t%016x" % [pos, ino] + end + + def update_pos(pos) + @file.pos = @seek + @file.write "%016x" % pos + end + + def read_inode + @file.pos = @seek + INO_OFFSET + raw = @file.read(INO_SIZE) + raw ? raw.to_i(16) : 0 + end + + def read_pos + @file.pos = @seek + raw = @file.read(POS_SIZE) + raw ? raw.to_i(16) : 0 + end + end + + class MemoryPositionEntry + def initialize + @pos = 0 + @inode = 0 + end + + def update(ino, pos) + @inode = ino + @pos = pos + end + + def update_pos(pos) + @pos = pos + end + + def read_pos + @pos + end + + def read_inode + @inode + end + end + end +end + +if __FILE__ == $0 + $options = {:read_from_head => false} + OptionParser.new do |opts| + opts.on("-p", "--posfile [POSFILE]") do |p| + $options[:pos_file] = p + end + opts.on("-h", "--[no-]readfromhead") do |h| + $options[:read_from_head] = h + end + end.parse! + a = ContainerLogTailscript::ContainerLogNewTail.new(ARGV) + a.start + a.shutdown +end + diff --git a/source/code/plugin/filter_container_log.rb b/source/code/plugin/filter_container_log.rb new file mode 100644 index 000000000..21e146a35 --- /dev/null +++ b/source/code/plugin/filter_container_log.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require 'fluent/filter' + +module Fluent + require 'logger' + class PassThruFilter < Filter + Fluent::Plugin.register_filter('filter_container_log', self) + + def configure(conf) + super + end + + def start + super + @hostname = OMS::Common.get_hostname or "Unknown host" + end + + def shutdown + super + end + + def filter(tag, time, record) + begin + #Try to force utf-8 encoding on the string so that all characters can flow through to + #$log.info "before : #{record['LogEntry']}" + record['LogEntry'].force_encoding('UTF-8') + rescue + $log.error "Failed to convert record['LogEntry'] : '#{record['LogEntry']}' to UTF-8 using force_encoding." + $log.error "Current string encoding for record['LogEntry'] is #{record['LogEntry'].encoding}" + end + + record['Computer'] = @hostname + wrapper = { + "DataType"=>"CONTAINER_LOG_BLOB", + "IPName"=>"Containers", + "DataItems"=>[record.each{|k,v| record[k]=v}] + } + wrapper + end + end +end