diff --git a/installer/conf/container.conf b/installer/conf/container.conf
index 091753230..f41bd6f98 100755
--- a/installer/conf/container.conf
+++ b/installer/conf/container.conf
@@ -23,6 +23,14 @@
log_level debug
+#custom_metrics_mdm filter plugin
+
+ type filter_cadvisor2mdm
+ custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope
+ metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes
+ log_level info
+
+
type out_oms
log_level debug
@@ -52,3 +60,19 @@
retry_wait 30s
max_retry_wait 9m
+
+
+ type out_mdm
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 20m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 30s
+ max_retry_wait 9m
+ retry_mdm_post_wait_minutes 60
+
diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf
index d0ef0517d..50a88295e 100644
--- a/installer/conf/kube.conf
+++ b/installer/conf/kube.conf
@@ -47,6 +47,12 @@
log_level debug
+
+ type filter_inventory2mdm
+ custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope
+ log_level info
+
+
type out_oms
log_level debug
@@ -145,4 +151,21 @@
retry_limit 10
retry_wait 30s
max_retry_wait 9m
-
\ No newline at end of file
+
+
+
+ type out_mdm
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 20m
+ buffer_type file
+ buffer_path /var/opt/microsoft/omsagent/6bb1e963-b08c-43a8-b708-1628305e964a/state/out_mdm_*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 30s
+ max_retry_wait 9m
+ retry_mdm_post_wait_minutes 60
+
+
diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data
index 7181929e2..c263aa505 100644
--- a/installer/datafiles/base_container.data
+++ b/installer/datafiles/base_container.data
@@ -36,6 +36,9 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/in_cadvisor_perf.rb; source/code/plugin/in_cadvisor_perf.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_services.rb; source/code/plugin/in_kube_services.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root
+
/opt/microsoft/omsagent/plugin/ApplicationInsightsUtility.rb; source/code/plugin/ApplicationInsightsUtility.rb; 644; root; root
/opt/microsoft/omsagent/plugin/ContainerInventoryState.rb; source/code/plugin/ContainerInventoryState.rb; 644; root; root
@@ -43,6 +46,9 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/DockerApiRestHelper.rb; source/code/plugin/DockerApiRestHelper.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_containerinventory.rb; source/code/plugin/in_containerinventory.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/out_mdm.rb; source/code/plugin/out_mdm.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/filter_cadvisor2mdm.rb; source/code/plugin/filter_cadvisor2mdm.rb; 644; root; root
+
/opt/microsoft/omsagent/plugin/lib/application_insights/version.rb; source/code/plugin/lib/application_insights/version.rb; 644; root; root
/opt/microsoft/omsagent/plugin/lib/application_insights/rack/track_request.rb; source/code/plugin/lib/application_insights/rack/track_request.rb; 644; root; root
/opt/microsoft/omsagent/plugin/lib/application_insights/unhandled_exception.rb; source/code/plugin/lib/application_insights/unhandled_exception.rb; 644; root; root
@@ -170,6 +176,14 @@ touch /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt
chmod 666 /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt
chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt
+touch /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log
+chmod 666 /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log
+chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log
+
+touch /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log
+chmod 666 /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log
+chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log
+
mv /etc/opt/microsoft/docker-cimprov/container.conf /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf
chown omsagent:omsagent /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf
diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go
index 49e91f87f..27ae6df5c 100644
--- a/source/code/go/src/plugins/oms.go
+++ b/source/code/go/src/plugins/oms.go
@@ -140,7 +140,7 @@ func updateContainerImageNameMaps() {
listOptions := metav1.ListOptions{}
listOptions.FieldSelector = fmt.Sprintf("spec.nodeName=%s", Computer)
pods, err := ClientSet.CoreV1().Pods("").List(listOptions)
-
+
if err != nil {
message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
Log(message)
diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb
index 683be0db4..5c5e92a6c 100644
--- a/source/code/plugin/ApplicationInsightsUtility.rb
+++ b/source/code/plugin/ApplicationInsightsUtility.rb
@@ -98,7 +98,7 @@ def sendHeartBeatEvent(pluginName)
end
end
- def sendCustomMetric(pluginName, properties)
+ def sendLastProcessedContainerInventoryCountMetric(pluginName, properties)
begin
if !(@@Tc.nil?)
@@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'],
@@ -112,6 +112,21 @@ def sendCustomMetric(pluginName, properties)
end
end
+ def sendCustomEvent(eventName, properties)
+ begin
+ if @@CustomProperties.empty? || @@CustomProperties.nil?
+ initializeUtility()
+ end
+ if !(@@Tc.nil?)
+ @@Tc.track_event eventName, :properties => @@CustomProperties
+ @@Tc.flush
+ $log.info("AppInsights Custom Event #{eventName} sent successfully")
+ end
+ rescue => errorStr
+ $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}")
+ end
+ end
+
def sendExceptionTelemetry(errorStr)
begin
if @@CustomProperties.empty? || @@CustomProperties.nil?
@@ -139,7 +154,7 @@ def sendTelemetry(pluginName, properties)
end
@@CustomProperties['Computer'] = properties['Computer']
sendHeartBeatEvent(pluginName)
- sendCustomMetric(pluginName, properties)
+ sendLastProcessedContainerInventoryCountMetric(pluginName, properties)
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}")
end
diff --git a/source/code/plugin/CustomMetricsUtils.rb b/source/code/plugin/CustomMetricsUtils.rb
new file mode 100644
index 000000000..d06c9ad91
--- /dev/null
+++ b/source/code/plugin/CustomMetricsUtils.rb
@@ -0,0 +1,26 @@
+#!/usr/local/bin/ruby
+# frozen_string_literal: true
+
+class CustomMetricsUtils
+ def initialize
+ end
+
+ class << self
+ def check_custom_metrics_availability(custom_metric_regions)
+ aks_region = ENV['AKS_REGION']
+ aks_resource_id = ENV['AKS_RESOURCE_ID']
+ if aks_region.to_s.empty? && aks_resource_id.to_s.empty?
+ false # This will also take care of AKS-Engine Scenario. AKS_REGION/AKS_RESOURCE_ID is not set for AKS-Engine. Only ACS_RESOURCE_NAME is set
+ end
+
+ custom_metrics_regions_arr = custom_metric_regions.split(',')
+ custom_metrics_regions_hash = custom_metrics_regions_arr.map {|x| [x.downcase,true]}.to_h
+
+ if custom_metrics_regions_hash.key?(aks_region.downcase)
+ true
+ else
+ false
+ end
+ end
+ end
+end
\ No newline at end of file
diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb
new file mode 100644
index 000000000..85f9f688e
--- /dev/null
+++ b/source/code/plugin/filter_cadvisor2mdm.rb
@@ -0,0 +1,215 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+
+# frozen_string_literal: true
+
+module Fluent
+ require 'logger'
+ require 'json'
+ require_relative 'oms_common'
+ require_relative 'CustomMetricsUtils'
+
+ class CAdvisor2MdmFilter < Filter
+ Fluent::Plugin.register_filter('filter_cadvisor2mdm', self)
+
+ config_param :enable_log, :integer, :default => 0
+ config_param :log_path, :string, :default => '/var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log'
+ config_param :custom_metrics_azure_regions, :string
+ config_param :metrics_to_collect, :string, :default => 'cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes'
+
+ @@cpu_usage_milli_cores = 'cpuUsageMillicores'
+ @@cpu_usage_nano_cores = 'cpuusagenanocores'
+ @@object_name_k8s_node = 'K8SNode'
+ @@hostName = (OMS::Common.get_hostname)
+ @@custom_metrics_template = '
+ {
+ "time": "%{timestamp}",
+ "data": {
+ "baseData": {
+ "metric": "%{metricName}",
+ "namespace": "Insights.Container/nodes",
+ "dimNames": [
+ "host"
+ ],
+ "series": [
+ {
+ "dimValues": [
+ "%{hostvalue}"
+ ],
+ "min": %{metricminvalue},
+ "max": %{metricmaxvalue},
+ "sum": %{metricsumvalue},
+ "count": 1
+ }
+ ]
+ }
+ }
+ }'
+
+ @@metric_name_metric_percentage_name_hash = {
+ @@cpu_usage_milli_cores => "cpuUsagePercentage",
+ "memoryRssBytes" => "memoryRssPercentage",
+ "memoryWorkingSetBytes" => "memoryWorkingSetPercentage"
+ }
+
+ @process_incoming_stream = true
+ @metrics_to_collect_hash = {}
+
+ def initialize
+ super
+ end
+
+ def configure(conf)
+ super
+ @log = nil
+
+ if @enable_log
+ @log = Logger.new(@log_path, 'weekly')
+ @log.debug {'Starting filter_cadvisor2mdm plugin'}
+ end
+ end
+
+ def start
+ super
+ @process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(@custom_metrics_azure_regions)
+ @metrics_to_collect_hash = build_metrics_hash
+ @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}"
+
+ # initialize cpu and memory limit
+ if @process_incoming_stream
+ @cpu_capacity = 0.0
+ @memory_capacity = 0.0
+ ensure_cpu_memory_capacity_set
+ end
+ end
+
+ def build_metrics_hash
+ @log.debug "Building Hash of Metrics to Collect"
+ metrics_to_collect_arr = @metrics_to_collect.split(',').map(&:strip)
+ metrics_hash = metrics_to_collect_arr.map {|x| [x.downcase,true]}.to_h
+ @log.info "Metrics Collected : #{metrics_hash}"
+ return metrics_hash
+ end
+
+ def shutdown
+ super
+ end
+
+ def filter(tag, time, record)
+ begin
+ if @process_incoming_stream
+ object_name = record['DataItems'][0]['ObjectName']
+ counter_name = record['DataItems'][0]['Collections'][0]['CounterName']
+ if object_name == @@object_name_k8s_node && @metrics_to_collect_hash.key?(counter_name.downcase)
+ percentage_metric_value = 0.0
+
+ # Compute and send % CPU and Memory
+ metric_value = record['DataItems'][0]['Collections'][0]['Value']
+ if counter_name.downcase == @@cpu_usage_nano_cores
+ metric_name = @@cpu_usage_milli_cores
+ metric_value = metric_value/1000000
+ if @cpu_capacity != 0.0
+ percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity
+ end
+ end
+
+ if counter_name.start_with?("memory")
+ metric_name = counter_name
+ if @memory_capacity != 0.0
+ percentage_metric_value = metric_value*100/@memory_capacity
+ end
+ end
+ return get_metric_records(record, metric_name, metric_value, percentage_metric_value)
+ else
+ return []
+ end
+ else
+ return []
+ end
+ rescue Exception => e
+ @log.info "Error processing cadvisor record Exception: #{e.class} Message: #{e.message}"
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ return []
+ end
+ end
+
+ def ensure_cpu_memory_capacity_set
+
+ @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}"
+ if @cpu_capacity != 0.0 && @memory_capacity != 0.0
+ @log.info "CPU And Memory Capacity are already set"
+ return
+ end
+
+ begin
+ nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body)
+ rescue Exception => e
+ @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ end
+ if !nodeInventory.nil?
+ cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
+ if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
+ @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
+ @log.info "CPU Limit #{@cpu_capacity}"
+ else
+ @log.info "Error getting cpu_capacity"
+ end
+ memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes")
+ if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
+ @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
+ @log.info "Memory Limit #{@memory_capacity}"
+ else
+ @log.info "Error getting memory_capacity"
+ end
+ end
+ end
+
+ def get_metric_records(record, metric_name, metric_value, percentage_metric_value)
+ records = []
+ custommetricrecord = @@custom_metrics_template % {
+ timestamp: record['DataItems'][0]['Timestamp'],
+ metricName: metric_name,
+ hostvalue: record['DataItems'][0]['Host'],
+ objectnamevalue: record['DataItems'][0]['ObjectName'],
+ instancenamevalue: record['DataItems'][0]['InstanceName'],
+ metricminvalue: metric_value,
+ metricmaxvalue: metric_value,
+ metricsumvalue: metric_value
+ }
+ records.push(JSON.parse(custommetricrecord))
+
+ if !percentage_metric_value.nil?
+ additional_record = @@custom_metrics_template % {
+ timestamp: record['DataItems'][0]['Timestamp'],
+ metricName: @@metric_name_metric_percentage_name_hash[metric_name],
+ hostvalue: record['DataItems'][0]['Host'],
+ objectnamevalue: record['DataItems'][0]['ObjectName'],
+ instancenamevalue: record['DataItems'][0]['InstanceName'],
+ metricminvalue: percentage_metric_value,
+ metricmaxvalue: percentage_metric_value,
+ metricsumvalue: percentage_metric_value
+ }
+ records.push(JSON.parse(additional_record))
+ end
+ @log.info "Metric Name: #{metric_name} Metric Value: #{metric_value} Percentage Metric Value: #{percentage_metric_value}"
+ return records
+ end
+
+
+ def filter_stream(tag, es)
+ new_es = MultiEventStream.new
+ ensure_cpu_memory_capacity_set
+ es.each { |time, record|
+ begin
+ filtered_records = filter(tag, time, record)
+ filtered_records.each {|filtered_record|
+ new_es.add(time, filtered_record) if filtered_record
+ } if filtered_records
+ rescue => e
+ router.emit_error_event(tag, time, record, e)
+ end
+ }
+ new_es
+ end
+ end
+end
diff --git a/source/code/plugin/filter_inventory2mdm.rb b/source/code/plugin/filter_inventory2mdm.rb
new file mode 100644
index 000000000..d9864bc1a
--- /dev/null
+++ b/source/code/plugin/filter_inventory2mdm.rb
@@ -0,0 +1,235 @@
+# Copyright (c) Microsoft Corporation. All rights reserved.
+
+# frozen_string_literal: true
+
+module Fluent
+ require 'logger'
+ require 'json'
+ require_relative 'oms_common'
+ require_relative 'CustomMetricsUtils'
+
+ class Inventory2MdmFilter < Filter
+ Fluent::Plugin.register_filter('filter_inventory2mdm', self)
+
+ config_param :enable_log, :integer, :default => 0
+ config_param :log_path, :string, :default => '/var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log'
+ config_param :custom_metrics_azure_regions, :string
+
+ @@node_count_metric_name = 'nodesCount'
+ @@pod_count_metric_name = 'podCount'
+ @@pod_inventory_tag = 'mdm.kubepodinventory'
+ @@node_inventory_tag = 'mdm.kubenodeinventory'
+ @@node_status_ready = 'Ready'
+ @@node_status_not_ready = 'NotReady'
+
+ @@node_inventory_custom_metrics_template = '
+ {
+ "time": "%{timestamp}",
+ "data": {
+ "baseData": {
+ "metric": "%{metricName}",
+ "namespace": "insights.container/nodes",
+ "dimNames": [
+ "status"
+ ],
+ "series": [
+ {
+ "dimValues": [
+ "%{statusValue}"
+ ],
+ "min": %{node_status_count},
+ "max": %{node_status_count},
+ "sum": %{node_status_count},
+ "count": 1
+ }
+ ]
+ }
+ }
+ }'
+
+ @@pod_inventory_custom_metrics_template = '
+ {
+ "time": "%{timestamp}",
+ "data": {
+ "baseData": {
+ "metric": "%{metricName}",
+ "namespace": "insights.container/pods",
+ "dimNames": [
+ "phase",
+ "namespace",
+ "node",
+ "controllerName"
+ ],
+ "series": [
+ {
+ "dimValues": [
+ "%{phaseDimValue}",
+ "%{namespaceDimValue}",
+ "%{nodeDimValue}",
+ "%{controllerNameDimValue}"
+ ],
+ "min": %{podCountMetricValue},
+ "max": %{podCountMetricValue},
+ "sum": %{podCountMetricValue},
+ "count": 1
+ }
+ ]
+ }
+ }
+ }'
+
+ @process_incoming_stream = true
+
+ def initialize
+ super
+ end
+
+ def configure(conf)
+ super
+ @log = nil
+
+ if @enable_log
+ @log = Logger.new(@log_path, 'weekly')
+ @log.debug {'Starting filter_inventory2mdm plugin'}
+ end
+ end
+
+ def start
+ super
+ @process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(@custom_metrics_azure_regions)
+ @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}"
+ end
+
+ def shutdown
+ super
+ end
+
+ def process_node_inventory_records(es)
+ timestamp = DateTime.now
+
+ begin
+ node_ready_count = 0
+ node_not_ready_count = 0
+ records = []
+
+ es.each{|time,record|
+ begin
+ timestamp = record['DataItems'][0]['CollectionTime']
+ node_status = record['DataItems'][0]['Status']
+ if node_status.downcase == @@node_status_ready.downcase
+ node_ready_count = node_ready_count+1
+ else
+ node_not_ready_count = node_not_ready_count + 1
+ end
+ rescue => e
+ end
+ }
+
+ ready_record = @@node_inventory_custom_metrics_template % {
+ timestamp: timestamp,
+ metricName: @@node_count_metric_name,
+ statusValue: @@node_status_ready,
+ node_status_count: node_ready_count
+ }
+ records.push(JSON.parse(ready_record))
+
+ not_ready_record = @@node_inventory_custom_metrics_template % {
+ timestamp: timestamp,
+ metricName: @@node_count_metric_name,
+ statusValue: @@node_status_not_ready,
+ node_status_count: node_not_ready_count
+ }
+ records.push(JSON.parse(not_ready_record))
+ rescue Exception => e
+ @log.info "Error processing node inventory records Exception: #{e.class} Message: #{e.message}"
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ return [],timestamp
+ end
+ return records,timestamp
+ end
+
+ def process_pod_inventory_records(es)
+ timestamp = DateTime.now
+ pod_count_hash = Hash.new
+
+ begin
+ records = []
+ es.each{|time,record|
+
+ timestamp = record['DataItems'][0]['CollectionTime']
+ podPhaseDimValue = record['DataItems'][0]['PodStatus']
+ podNamespaceDimValue = record['DataItems'][0]['Namespace']
+ podControllerNameDimValue = record['DataItems'][0]['ControllerName']
+ podNodeDimValue = record['DataItems'][0]['Computer']
+
+ # group by distinct dimension values
+ pod_key = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue, podPhaseDimValue].join('~~')
+
+ if pod_count_hash.key?(pod_key)
+ pod_count = pod_count_hash[pod_key]
+ pod_count = pod_count + 1
+ pod_count_hash[pod_key] = pod_count
+ else
+ pod_count = 1
+ pod_count_hash[pod_key] = pod_count
+ end
+ }
+
+ pod_count_hash.each {|key, value|
+
+ key_elements = key.split('~~')
+ if key_elements.length != 4
+ next
+ end
+
+ # get dimension values by key
+ podNodeDimValue = key_elements[0]
+ podNamespaceDimValue = key_elements[1]
+ podControllerNameDimValue = key_elements[2]
+ podPhaseDimValue = key_elements[3]
+
+ record = @@pod_inventory_custom_metrics_template % {
+ timestamp: timestamp,
+ metricName: @@pod_count_metric_name,
+ phaseDimValue: podPhaseDimValue,
+ namespaceDimValue: podNamespaceDimValue,
+ nodeDimValue: podNodeDimValue,
+ controllerNameDimValue: podControllerNameDimValue,
+ podCountMetricValue: value
+ }
+ records.push(JSON.parse(record))
+ }
+ rescue Exception => e
+ @log.info "Error processing pod inventory record Exception: #{e.class} Message: #{e.message}"
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ return [],timestamp
+ end
+ return records, timestamp
+ end
+
+ def filter_stream(tag, es)
+ new_es = MultiEventStream.new
+ filtered_records = []
+ time = DateTime.now
+ begin
+ if @process_incoming_stream
+ @log.info 'Processing NODE inventory records in filter plugin to send to MDM'
+ if tag.downcase.start_with?(@@node_inventory_tag)
+ filtered_records, time = process_node_inventory_records(es)
+ elsif tag.downcase.start_with?(@@pod_inventory_tag)
+ @log.info 'Processing POD inventory records in filter plugin to send to MDM'
+ filtered_records, time = process_pod_inventory_records(es)
+ else
+ filtered_records = []
+ end
+ end
+ filtered_records.each {|filtered_record|
+ new_es.add(time, filtered_record) if filtered_record
+ } if filtered_records
+ rescue => e
+ @log.info "Exception in filter_stream #{e}"
+ end
+ new_es
+ end
+ end
+end
diff --git a/source/code/plugin/in_cadvisor_perf.rb b/source/code/plugin/in_cadvisor_perf.rb
index 5b551f74e..a857aa6b9 100644
--- a/source/code/plugin/in_cadvisor_perf.rb
+++ b/source/code/plugin/in_cadvisor_perf.rb
@@ -18,6 +18,7 @@ def initialize
config_param :run_interval, :time, :default => '1m'
config_param :tag, :string, :default => "oms.api.cadvisorperf"
+ config_param :mdmtag, :string, :default => "mdm.cadvisorperf"
def configure (conf)
super
@@ -55,6 +56,7 @@ def enumerate()
end
router.emit_stream(@tag, eventStream) if eventStream
+ router.emit_stream(@mdmtag, eventStream) if eventStream
@@istestvar = ENV['ISTEST']
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0)
$log.info("cAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}")
diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb
index 2e48e3f1f..ba1dacbe0 100644
--- a/source/code/plugin/in_kube_nodes.rb
+++ b/source/code/plugin/in_kube_nodes.rb
@@ -7,6 +7,7 @@ class Kube_nodeInventory_Input < Input
Plugin.register_input('kubenodeinventory', self)
@@ContainerNodeInventoryTag = 'oms.containerinsights.ContainerNodeInventory'
+ @@MDMKubeNodeInventoryTag = 'mdm.kubenodeinventory'
def initialize
super
@@ -136,6 +137,7 @@ def enumerate
end
end
router.emit_stream(@tag, eventStream) if eventStream
+ router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream
router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
if telemetrySent == true
@@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb
index eaf14b035..dee3df30b 100644
--- a/source/code/plugin/in_kube_podinventory.rb
+++ b/source/code/plugin/in_kube_podinventory.rb
@@ -6,6 +6,8 @@ module Fluent
class Kube_PodInventory_Input < Input
Plugin.register_input('kubepodinventory', self)
+ @@MDMKubePodInventoryTag = 'mdm.kubepodinventory'
+
def initialize
super
require 'yaml'
@@ -208,6 +210,7 @@ def parse_and_emit_records(podInventory, serviceList)
end
end #podInventory block end
router.emit_stream(@tag, eventStream) if eventStream
+ router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream
if telemetryFlush == true
ApplicationInsightsUtility.sendHeartBeatEvent("KubePodInventory")
ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory['items'].length , {})
diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb
new file mode 100644
index 000000000..2f36ea7d5
--- /dev/null
+++ b/source/code/plugin/out_mdm.rb
@@ -0,0 +1,239 @@
+module Fluent
+
+ class OutputMDM < BufferedOutput
+
+ config_param :retry_mdm_post_wait_minutes, :integer
+
+ Plugin.register_output('out_mdm', self)
+
+ def initialize
+ super
+ require 'net/http'
+ require 'net/https'
+ require 'uri'
+ require 'json'
+ require_relative 'KubernetesApiClient'
+ require_relative 'ApplicationInsightsUtility'
+
+ @@token_resource_url = 'https://monitoring.azure.com/'
+ @@grant_type = 'client_credentials'
+ @@azure_json_path = '/etc/kubernetes/host/azure.json'
+ @@post_request_url_template = "https://%{aks_region}.monitoring.azure.com%{aks_resource_id}/metrics"
+ @@token_url_template = "https://login.microsoftonline.com/%{tenant_id}/oauth2/token"
+ @@plugin_name = "AKSCustomMetricsMDM"
+
+ @data_hash = {}
+ @token_url = nil
+ @http_client = nil
+ @token_expiry_time = Time.now
+ @cached_access_token = String.new
+ @last_post_attempt_time = Time.now
+ @first_post_attempt_made = false
+ end
+
+ def configure(conf)
+ s = conf.add_element("secondary")
+ s["type"] = ChunkErrorHandler::SecondaryName
+ super
+ end
+
+ def start
+ super
+ file = File.read(@@azure_json_path)
+ # Handle the case where the file read fails. Send Telemetry and exit the plugin?
+ @data_hash = JSON.parse(file)
+ @token_url = @@token_url_template % {tenant_id: @data_hash['tenantId']}
+ @cached_access_token = get_access_token
+ aks_resource_id = ENV['AKS_RESOURCE_ID']
+ aks_region = ENV['AKS_REGION']
+ if aks_resource_id.to_s.empty?
+ @log.info "Environment Variable AKS_RESOURCE_ID is not set.. "
+ raise Exception.new "Environment Variable AKS_RESOURCE_ID is not set!!"
+ end
+ if aks_region.to_s.empty?
+ @log.info "Environment Variable AKS_REGION is not set.. "
+ raise Exception.new "Environment Variable AKS_REGION is not set!!"
+ end
+
+ @@post_request_url = @@post_request_url_template % {aks_region: aks_region, aks_resource_id: aks_resource_id}
+ @post_request_uri = URI.parse(@@post_request_url)
+ @http_client = Net::HTTP.new(@post_request_uri.host, @post_request_uri.port)
+ @http_client.use_ssl = true
+ @log.info "POST Request url: #{@@post_request_url}"
+ ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMPluginStart", {})
+ end
+
+ # get the access token only if the time to expiry is less than 5 minutes
+ def get_access_token
+ if @cached_access_token.to_s.empty? || (Time.now + 5*60 > @token_expiry_time) # token is valid for 60 minutes. Refresh token 5 minutes from expiration
+ @log.info "Refreshing access token for out_mdm plugin.."
+ token_uri = URI.parse(@token_url)
+ http_access_token = Net::HTTP.new(token_uri.host, token_uri.port)
+ http_access_token.use_ssl = true
+ token_request = Net::HTTP::Post.new(token_uri.request_uri)
+ token_request.set_form_data(
+ {
+ 'grant_type' => @@grant_type,
+ 'client_id' => @data_hash['aadClientId'],
+ 'client_secret' => @data_hash['aadClientSecret'],
+ 'resource' => @@token_resource_url
+ }
+ )
+
+ token_response = http_access_token.request(token_request)
+ # Handle the case where the response is not 200
+ parsed_json = JSON.parse(token_response.body)
+ @token_expiry_time = Time.now + 59*60 # set the expiry time to be ~one hour from current time
+ @cached_access_token = parsed_json['access_token']
+ end
+ @cached_access_token
+ end
+
+ def write_status_file(success, message)
+ fn = '/var/opt/microsoft/omsagent/log/MDMIngestion.status'
+ status = '{ "operation": "MDMIngestion", "success": "%s", "message": "%s" }' % [success, message]
+ begin
+ File.open(fn,'w') { |file| file.write(status) }
+ rescue => e
+ @log.debug "Error:'#{e}'"
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ end
+ end
+
+ # This method is called when an event reaches to Fluentd.
+ # Convert the event to a raw string.
+ def format(tag, time, record)
+ if record != {}
+ @log.trace "Buffering #{tag}"
+ return [tag, record].to_msgpack
+ else
+ return ""
+ end
+ end
+
+ # This method is called every flush interval. Send the buffer chunk to MDM.
+ # 'chunk' is a buffer chunk that includes multiple formatted records
+ def write(chunk)
+ begin
+ if !@first_post_attempt_made || (Time.now > @last_post_attempt_time + retry_mdm_post_wait_minutes*60)
+ post_body = []
+ chunk.msgpack_each {|(tag, record)|
+ post_body.push(record.to_json)
+ }
+ send_to_mdm post_body
+ else
+ @log.info "Last Failed POST attempt to MDM was made #{((Time.now - @last_post_attempt_time)/60).round(1)} min ago. This is less than the current retry threshold of #{@retry_mdm_post_wait_minutes} min. NO-OP"
+ end
+ rescue Exception => e
+ @log.info "Exception when writing to MDM: #{e}"
+ end
+ end
+
+ def send_to_mdm(post_body)
+ begin
+ access_token = get_access_token
+ request = Net::HTTP::Post.new(@post_request_uri.request_uri)
+ request['Content-Type'] = "application/x-ndjson"
+ request['Authorization'] = "Bearer #{access_token}"
+ request.body = post_body.join("\n")
+ response = @http_client.request(request)
+ response.value # this throws for non 200 HTTP response code
+ @log.info "HTTP Post Response Code : #{response.code}"
+ ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMSendSuccessful", {})
+ rescue Net::HTTPServerException => e
+ @log.info "Failed to Post Metrics to MDM : #{e} Response: #{response}"
+ @log.debug_backtrace(e.backtrace)
+ if !response.code.empty? && response.code == 403.to_s
+ @log.info "Response Code #{response.code} Updating @last_post_attempt_time"
+ @last_post_attempt_time = Time.now
+ @first_post_attempt_made = true
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ # Not raising exception, as that will cause retries to happen
+ else
+ @log.info "HTTPServerException when POSTing Metrics to MDM #{e} Response: #{response}"
+ raise e
+ end
+ rescue Errno::ETIMEDOUT => e
+ @log.info "Timed out when POSTing Metrics to MDM : #{e} Response: #{response}"
+ @log.debug_backtrace(e.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ raise e
+ rescue Exception => e
+ @log.info "Exception POSTing Metrics to MDM : #{e} Response: #{response}"
+ @log.debug_backtrace(e.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
+ raise e
+ end
+ end
+ private
+
+ class ChunkErrorHandler
+ include Configurable
+ include PluginId
+ include PluginLoggerMixin
+
+ SecondaryName = "__ChunkErrorHandler__"
+
+ Plugin.register_output(SecondaryName, self)
+
+ def initialize
+ @router = nil
+ end
+
+ def secondary_init(primary)
+ @error_handlers = create_error_handlers @router
+ end
+
+ def start
+ # NOP
+ end
+
+ def shutdown
+ # NOP
+ end
+
+ def router=(r)
+ @router = r
+ end
+
+ def write(chunk)
+ chunk.msgpack_each {|(tag, record)|
+ @error_handlers[tag].emit(record)
+ }
+ end
+
+ private
+
+ def create_error_handlers(router)
+ nop_handler = NopErrorHandler.new
+ Hash.new() { |hash, tag|
+ etag = OMS::Common.create_error_tag tag
+ hash[tag] = router.match?(etag) ?
+ ErrorHandler.new(router, etag) :
+ nop_handler
+ }
+ end
+
+ class ErrorHandler
+ def initialize(router, etag)
+ @router = router
+ @etag = etag
+ end
+
+ def emit(record)
+ @router.emit(@etag, Fluent::Engine.now, record)
+ end
+ end
+
+ class NopErrorHandler
+ def emit(record)
+ # NOP
+ end
+ end
+
+ end
+
+ end # class OutputMDM
+
+end # module Fluent
+