Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions installer/conf/container.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
log_level debug
</source>

#custom_metrics_mdm filter plugin
<filter mdm.cadvisorperf**>
type filter_cadvisor2mdm
custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope
metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes
log_level info
</filter>

<match oms.containerinsights.containerinventory**>
type out_oms
log_level debug
Expand Down Expand Up @@ -52,3 +60,19 @@
retry_wait 30s
max_retry_wait 9m
</match>

<match mdm.cadvisorperf**>
type out_mdm
log_level debug
num_threads 5
buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer
buffer_queue_limit 20
buffer_queue_full_action drop_oldest_chunk
flush_interval 20s
retry_limit 10
retry_wait 30s
max_retry_wait 9m
retry_mdm_post_wait_minutes 60
</match>
25 changes: 24 additions & 1 deletion installer/conf/kube.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
log_level debug
</source>

<filter mdm.kubepodinventory** mdm.kubenodeinventory**>
type filter_inventory2mdm
custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope
log_level info
</filter>

<match oms.containerinsights.KubePodInventory**>
type out_oms
log_level debug
Expand Down Expand Up @@ -145,4 +151,21 @@
retry_limit 10
retry_wait 30s
max_retry_wait 9m
</match>
</match>

<match mdm.kubepodinventory** mdm.kubenodeinventory** >
type out_mdm
log_level debug
num_threads 5
buffer_chunk_limit 20m
buffer_type file
buffer_path /var/opt/microsoft/omsagent/6bb1e963-b08c-43a8-b708-1628305e964a/state/out_mdm_*.buffer
buffer_queue_limit 20
buffer_queue_full_action drop_oldest_chunk
flush_interval 20s
retry_limit 10
retry_wait 30s
max_retry_wait 9m
retry_mdm_post_wait_minutes 60
</match>

14 changes: 14 additions & 0 deletions installer/datafiles/base_container.data
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/in_cadvisor_perf.rb; source/code/plugin/in_cadvisor_perf.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_services.rb; source/code/plugin/in_kube_services.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root
/opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root
/opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root


/opt/microsoft/omsagent/plugin/ApplicationInsightsUtility.rb; source/code/plugin/ApplicationInsightsUtility.rb; 644; root; root
/opt/microsoft/omsagent/plugin/ContainerInventoryState.rb; source/code/plugin/ContainerInventoryState.rb; 644; root; root
/opt/microsoft/omsagent/plugin/DockerApiClient.rb; source/code/plugin/DockerApiClient.rb; 644; root; root
/opt/microsoft/omsagent/plugin/DockerApiRestHelper.rb; source/code/plugin/DockerApiRestHelper.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_containerinventory.rb; source/code/plugin/in_containerinventory.rb; 644; root; root

/opt/microsoft/omsagent/plugin/out_mdm.rb; source/code/plugin/out_mdm.rb; 644; root; root
/opt/microsoft/omsagent/plugin/filter_cadvisor2mdm.rb; source/code/plugin/filter_cadvisor2mdm.rb; 644; root; root

/opt/microsoft/omsagent/plugin/lib/application_insights/version.rb; source/code/plugin/lib/application_insights/version.rb; 644; root; root
/opt/microsoft/omsagent/plugin/lib/application_insights/rack/track_request.rb; source/code/plugin/lib/application_insights/rack/track_request.rb; 644; root; root
/opt/microsoft/omsagent/plugin/lib/application_insights/unhandled_exception.rb; source/code/plugin/lib/application_insights/unhandled_exception.rb; 644; root; root
Expand Down Expand Up @@ -170,6 +176,14 @@ touch /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt
chmod 666 /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt
chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt

touch /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log
chmod 666 /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log
chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log

touch /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log
chmod 666 /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log
chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log

mv /etc/opt/microsoft/docker-cimprov/container.conf /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf
chown omsagent:omsagent /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf

Expand Down
2 changes: 1 addition & 1 deletion source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func updateContainerImageNameMaps() {
listOptions := metav1.ListOptions{}
listOptions.FieldSelector = fmt.Sprintf("spec.nodeName=%s", Computer)
pods, err := ClientSet.CoreV1().Pods("").List(listOptions)

if err != nil {
message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
Log(message)
Expand Down
19 changes: 17 additions & 2 deletions source/code/plugin/ApplicationInsightsUtility.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def sendHeartBeatEvent(pluginName)
end
end

def sendCustomMetric(pluginName, properties)
def sendLastProcessedContainerInventoryCountMetric(pluginName, properties)
begin
if !(@@Tc.nil?)
@@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'],
Expand All @@ -112,6 +112,21 @@ def sendCustomMetric(pluginName, properties)
end
end

def sendCustomEvent(eventName, properties)
begin
if @@CustomProperties.empty? || @@CustomProperties.nil?
initializeUtility()
end
if !(@@Tc.nil?)
@@Tc.track_event eventName, :properties => @@CustomProperties
@@Tc.flush
$log.info("AppInsights Custom Event #{eventName} sent successfully")
end
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}")
end
end

def sendExceptionTelemetry(errorStr)
begin
if @@CustomProperties.empty? || @@CustomProperties.nil?
Expand Down Expand Up @@ -139,7 +154,7 @@ def sendTelemetry(pluginName, properties)
end
@@CustomProperties['Computer'] = properties['Computer']
sendHeartBeatEvent(pluginName)
sendCustomMetric(pluginName, properties)
sendLastProcessedContainerInventoryCountMetric(pluginName, properties)
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}")
end
Expand Down
26 changes: 26 additions & 0 deletions source/code/plugin/CustomMetricsUtils.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/local/bin/ruby
# frozen_string_literal: true

class CustomMetricsUtils
def initialize
end

class << self
def check_custom_metrics_availability(custom_metric_regions)
aks_region = ENV['AKS_REGION']
aks_resource_id = ENV['AKS_RESOURCE_ID']
if aks_region.to_s.empty? && aks_resource_id.to_s.empty?
false # This will also take care of AKS-Engine Scenario. AKS_REGION/AKS_RESOURCE_ID is not set for AKS-Engine. Only ACS_RESOURCE_NAME is set
end

custom_metrics_regions_arr = custom_metric_regions.split(',')
custom_metrics_regions_hash = custom_metrics_regions_arr.map {|x| [x.downcase,true]}.to_h

if custom_metrics_regions_hash.key?(aks_region.downcase)
true
else
false
end
end
end
end
215 changes: 215 additions & 0 deletions source/code/plugin/filter_cadvisor2mdm.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Copyright (c) Microsoft Corporation. All rights reserved.

# frozen_string_literal: true

module Fluent
require 'logger'
require 'json'
require_relative 'oms_common'
require_relative 'CustomMetricsUtils'

class CAdvisor2MdmFilter < Filter
Fluent::Plugin.register_filter('filter_cadvisor2mdm', self)

config_param :enable_log, :integer, :default => 0
config_param :log_path, :string, :default => '/var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log'
config_param :custom_metrics_azure_regions, :string
config_param :metrics_to_collect, :string, :default => 'cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes'

@@cpu_usage_milli_cores = 'cpuUsageMillicores'
@@cpu_usage_nano_cores = 'cpuusagenanocores'
@@object_name_k8s_node = 'K8SNode'
@@hostName = (OMS::Common.get_hostname)
@@custom_metrics_template = '
{
"time": "%{timestamp}",
"data": {
"baseData": {
"metric": "%{metricName}",
"namespace": "Insights.Container/nodes",
"dimNames": [
"host"
],
"series": [
{
"dimValues": [
"%{hostvalue}"
],
"min": %{metricminvalue},
"max": %{metricmaxvalue},
"sum": %{metricsumvalue},
"count": 1
}
]
}
}
}'

@@metric_name_metric_percentage_name_hash = {
@@cpu_usage_milli_cores => "cpuUsagePercentage",
"memoryRssBytes" => "memoryRssPercentage",
"memoryWorkingSetBytes" => "memoryWorkingSetPercentage"
}

@process_incoming_stream = true
@metrics_to_collect_hash = {}

def initialize
super
end

def configure(conf)
super
@log = nil

if @enable_log
@log = Logger.new(@log_path, 'weekly')
@log.debug {'Starting filter_cadvisor2mdm plugin'}
end
end

def start
super
@process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(@custom_metrics_azure_regions)
@metrics_to_collect_hash = build_metrics_hash
@log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}"

# initialize cpu and memory limit
if @process_incoming_stream
@cpu_capacity = 0.0
@memory_capacity = 0.0
ensure_cpu_memory_capacity_set
end
end

def build_metrics_hash
@log.debug "Building Hash of Metrics to Collect"
metrics_to_collect_arr = @metrics_to_collect.split(',').map(&:strip)
metrics_hash = metrics_to_collect_arr.map {|x| [x.downcase,true]}.to_h
@log.info "Metrics Collected : #{metrics_hash}"
return metrics_hash
end

def shutdown
super
end

def filter(tag, time, record)
begin
if @process_incoming_stream
object_name = record['DataItems'][0]['ObjectName']
counter_name = record['DataItems'][0]['Collections'][0]['CounterName']
if object_name == @@object_name_k8s_node && @metrics_to_collect_hash.key?(counter_name.downcase)
percentage_metric_value = 0.0

# Compute and send % CPU and Memory
metric_value = record['DataItems'][0]['Collections'][0]['Value']
if counter_name.downcase == @@cpu_usage_nano_cores
metric_name = @@cpu_usage_milli_cores
metric_value = metric_value/1000000
if @cpu_capacity != 0.0
percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity
end
end

if counter_name.start_with?("memory")
metric_name = counter_name
if @memory_capacity != 0.0
percentage_metric_value = metric_value*100/@memory_capacity
end
end
return get_metric_records(record, metric_name, metric_value, percentage_metric_value)
else
return []
end
else
return []
end
rescue Exception => e
@log.info "Error processing cadvisor record Exception: #{e.class} Message: #{e.message}"
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
return []
end
end

def ensure_cpu_memory_capacity_set

@log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}"
if @cpu_capacity != 0.0 && @memory_capacity != 0.0
@log.info "CPU And Memory Capacity are already set"
return
end

begin
nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body)
rescue Exception => e
@log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
end
if !nodeInventory.nil?
cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
@cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
@log.info "CPU Limit #{@cpu_capacity}"
else
@log.info "Error getting cpu_capacity"
end
memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes")
if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
@memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
@log.info "Memory Limit #{@memory_capacity}"
else
@log.info "Error getting memory_capacity"
end
end
end

def get_metric_records(record, metric_name, metric_value, percentage_metric_value)
records = []
custommetricrecord = @@custom_metrics_template % {
timestamp: record['DataItems'][0]['Timestamp'],
metricName: metric_name,
hostvalue: record['DataItems'][0]['Host'],
objectnamevalue: record['DataItems'][0]['ObjectName'],
instancenamevalue: record['DataItems'][0]['InstanceName'],
metricminvalue: metric_value,
metricmaxvalue: metric_value,
metricsumvalue: metric_value
}
records.push(JSON.parse(custommetricrecord))

if !percentage_metric_value.nil?
additional_record = @@custom_metrics_template % {
timestamp: record['DataItems'][0]['Timestamp'],
metricName: @@metric_name_metric_percentage_name_hash[metric_name],
hostvalue: record['DataItems'][0]['Host'],
objectnamevalue: record['DataItems'][0]['ObjectName'],
instancenamevalue: record['DataItems'][0]['InstanceName'],
metricminvalue: percentage_metric_value,
metricmaxvalue: percentage_metric_value,
metricsumvalue: percentage_metric_value
}
records.push(JSON.parse(additional_record))
end
@log.info "Metric Name: #{metric_name} Metric Value: #{metric_value} Percentage Metric Value: #{percentage_metric_value}"
return records
end


def filter_stream(tag, es)
new_es = MultiEventStream.new
ensure_cpu_memory_capacity_set
es.each { |time, record|
begin
filtered_records = filter(tag, time, record)
filtered_records.each {|filtered_record|
new_es.add(time, filtered_record) if filtered_record
} if filtered_records
rescue => e
router.emit_error_event(tag, time, record, e)
end
}
new_es
end
end
end
Loading