From 7c7bbcd47a0bb534610e191858bccc830b96dfcc Mon Sep 17 00:00:00 2001 From: r-dilip Date: Fri, 25 Jan 2019 15:53:02 -0800 Subject: [PATCH 1/4] Custom Metrics Plugin Changes --- installer/conf/container.conf | 24 ++ installer/datafiles/base_container.data | 3 + .../code/plugin/ApplicationInsightsUtility.rb | 19 +- source/code/plugin/filter_cadvisor2mdm.rb | 214 +++++++++++++++++ source/code/plugin/in_cadvisor_perf.rb | 2 + source/code/plugin/out_mdm.rb | 219 ++++++++++++++++++ 6 files changed, 479 insertions(+), 2 deletions(-) create mode 100644 source/code/plugin/filter_cadvisor2mdm.rb create mode 100644 source/code/plugin/out_mdm.rb diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 091753230..f41bd6f98 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -23,6 +23,14 @@ log_level debug +#custom_metrics_mdm filter plugin + + type filter_cadvisor2mdm + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope + metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes + log_level info + + type out_oms log_level debug @@ -52,3 +60,19 @@ retry_wait 30s max_retry_wait 9m + + + type out_mdm + log_level debug + num_threads 5 + buffer_chunk_limit 20m + buffer_type file + buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 30s + max_retry_wait 9m + retry_mdm_post_wait_minutes 60 + diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 7181929e2..fbd0ab46b 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -43,6 +43,9 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/DockerApiRestHelper.rb; source/code/plugin/DockerApiRestHelper.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_containerinventory.rb; source/code/plugin/in_containerinventory.rb; 644; root; root +/opt/microsoft/omsagent/plugin/out_mdm.rb; source/code/plugin/out_mdm.rb; 644; root; root +/opt/microsoft/omsagent/plugin/filter_cadvisor2mdm.rb; source/code/plugin/filter_cadvisor2mdm.rb; 644; root; root + /opt/microsoft/omsagent/plugin/lib/application_insights/version.rb; source/code/plugin/lib/application_insights/version.rb; 644; root; root /opt/microsoft/omsagent/plugin/lib/application_insights/rack/track_request.rb; source/code/plugin/lib/application_insights/rack/track_request.rb; 644; root; root /opt/microsoft/omsagent/plugin/lib/application_insights/unhandled_exception.rb; source/code/plugin/lib/application_insights/unhandled_exception.rb; 644; root; root diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb index 27660d708..fef48c0bf 100644 --- a/source/code/plugin/ApplicationInsightsUtility.rb +++ b/source/code/plugin/ApplicationInsightsUtility.rb @@ -91,7 +91,7 @@ def sendHeartBeatEvent(pluginName) end end - def sendCustomMetric(pluginName, properties) + def sendLastProcessedContainerInventoryCountMetric(pluginName, properties) begin if !(@@Tc.nil?) @@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'], @@ -105,6 +105,21 @@ def sendCustomMetric(pluginName, properties) end end + def sendCustomEvent(eventName, properties) + begin + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility() + end + if !(@@Tc.nil?) + @@Tc.track_event eventName, :properties => @@CustomProperties + @@Tc.flush + $log.info("AppInsights Custom Event #{eventName} sent successfully") + end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}") + end + end + def sendExceptionTelemetry(errorStr) begin if @@CustomProperties.empty? || @@CustomProperties.nil? @@ -132,7 +147,7 @@ def sendTelemetry(pluginName, properties) end @@CustomProperties['Computer'] = properties['Computer'] sendHeartBeatEvent(pluginName) - sendCustomMetric(pluginName, properties) + sendLastProcessedContainerInventoryCountMetric(pluginName, properties) rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}") end diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb new file mode 100644 index 000000000..648d001b2 --- /dev/null +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -0,0 +1,214 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. + +# frozen_string_literal: true + +module Fluent + require 'logger' + require 'json' + + class CAdvisor2MdmFilter < Filter + Fluent::Plugin.register_filter('filter_cadvisor2mdm', self) + + config_param :enable_log, :integer, :default => 0 + config_param :log_path, :string, :default => '/var/opt/microsoft/omsagent/log/filter_cadvisor2mdm.log' + config_param :custom_metrics_azure_regions, :string + config_param :metrics_to_collect, :string, :default => 'cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes' + + @@cpu_usage_milli_cores = 'cpuUsageMilliCores' + @@cpu_usage_nano_cores = 'cpuusagenanocores' + @@object_name_k8s_node = 'K8SNode' + @process_incoming_stream = true + @metrics_to_collect_hash = {} + def initialize + super + end + + def configure(conf) + super + @log = nil + + if @enable_log + @log = Logger.new(@log_path, 'weekly') + @log.debug {'Starting filter_cadvisor2mdm plugin'} + end + end + + def start + super + @@custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "Insights.Containers/node", + "dimNames": [ + "host" + ], + "series": [ + { + "dimValues": [ + "%{hostvalue}" + ], + "min": %{metricminvalue}, + "max": %{metricmaxvalue}, + "sum": %{metricsumvalue}, + "count": 1 + } + ] + } + } + }' + + @process_incoming_stream = check_custom_metrics_availability + @metrics_to_collect_hash = build_metrics_hash + @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}" + + # initialize cpu and memory limit + if @process_incoming_stream + @cpu_limit = 0.0 + @memory_limit = 0.0 + + begin + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) + rescue Exception => e + @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + if !nodeInventory.nil? + cpu_limit_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") + if !cpu_limit_json.nil? + @cpu_limit = cpu_limit_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "CPU Limit #{@cpu_limit}" + else + @log.info "Error getting cpu_limit" + end + memory_limit_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") + if !memory_limit_json.nil? + @memory_limit = memory_limit_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "Memory Limit #{@memory_limit}" + else + @log.info "Error getting memory_limit" + end + end + end + end + + def build_metrics_hash + @log.debug "Building Hash of Metrics to Collect" + metrics_to_collect_arr = @metrics_to_collect.split(',').map(&:strip) + metrics_hash = metrics_to_collect_arr.map {|x| [x.downcase,true]}.to_h + @log.info "Metrics Collected : #{metrics_hash}" + return metrics_hash + end + + def check_custom_metrics_availability + aks_region = ENV['AKS_REGION'] + if aks_region.to_s.empty? + false + end + @log.debug "AKS_REGION #{aks_region}" + custom_metrics_regions_arr = @custom_metrics_azure_regions.split(',') + custom_metrics_regions_hash = custom_metrics_regions_arr.map {|x| [x.downcase,true]}.to_h + + @log.debug "Custom Metrics Regions Hash #{custom_metrics_regions_hash}" + + if custom_metrics_regions_hash.key?(aks_region.downcase) + @log.debug "Returning true for check_custom_metrics_availability" + true + else + @log.debug "Returning false for check_custom_metrics_availability" + false + end + end + + def shutdown + super + end + + def filter(tag, time, record) + if @process_incoming_stream + object_name = record['DataItems'][0]['ObjectName'] + counter_name = record['DataItems'][0]['Collections'][0]['CounterName'] + if object_name == @@object_name_k8s_node && @metrics_to_collect_hash.key?(counter_name.downcase) + percentage_metric_value = 0.0 + + # Compute and send % CPU and Memory + begin + metric_value = record['DataItems'][0]['Collections'][0]['Value'] + if counter_name.downcase == @@cpu_usage_nano_cores + metric_name = @@cpu_usage_milli_cores + metric_value = metric_value/1000000 + if @cpu_limit != 0.0 + percentage_metric_value = (metric_value*1000000)*100/@cpu_limit + end + end + + if counter_name.start_with?("memory") + metric_name = counter_name + if @memory_limit != 0.0 + percentage_metric_value = metric_value*100/@memory_limit + end + end + return get_metric_records(record, metric_name, metric_value, percentage_metric_value) + rescue Exception => e + @log.info "Error parsing cadvisor record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + return [] + end + else + return [] + end + else + return [] + end + end + + def get_metric_records(record, metric_name, metric_value, percentage_metric_value) + records = [] + custommetricrecord = @@custom_metrics_template % { + timestamp: record['DataItems'][0]['Timestamp'], + metricName: metric_name, + hostvalue: record['DataItems'][0]['Host'], + objectnamevalue: record['DataItems'][0]['ObjectName'], + instancenamevalue: record['DataItems'][0]['InstanceName'], + metricminvalue: metric_value, + metricmaxvalue: metric_value, + metricsumvalue: metric_value + } + records.push(JSON.parse(custommetricrecord)) + + if !percentage_metric_value.nil? + additional_record = @@custom_metrics_template % { + timestamp: record['DataItems'][0]['Timestamp'], + metricName: metric_name + "Percentage", + hostvalue: record['DataItems'][0]['Host'], + objectnamevalue: record['DataItems'][0]['ObjectName'], + instancenamevalue: record['DataItems'][0]['InstanceName'], + metricminvalue: percentage_metric_value, + metricmaxvalue: percentage_metric_value, + metricsumvalue: percentage_metric_value + } + records.push(JSON.parse(additional_record)) + end + @log.info "Metric Name: #{metric_name} Metric Value: #{metric_value} Percentage Metric Value: #{percentage_metric_value}" + return records + end + + + def filter_stream(tag, es) + new_es = MultiEventStream.new + es.each { |time, record| + begin + filtered_records = filter(tag, time, record) + filtered_records.each {|filtered_record| + new_es.add(time, filtered_record) if filtered_record + } if filtered_records + rescue => e + router.emit_error_event(tag, time, record, e) + end + } + new_es + end + end +end diff --git a/source/code/plugin/in_cadvisor_perf.rb b/source/code/plugin/in_cadvisor_perf.rb index 5b551f74e..a857aa6b9 100644 --- a/source/code/plugin/in_cadvisor_perf.rb +++ b/source/code/plugin/in_cadvisor_perf.rb @@ -18,6 +18,7 @@ def initialize config_param :run_interval, :time, :default => '1m' config_param :tag, :string, :default => "oms.api.cadvisorperf" + config_param :mdmtag, :string, :default => "mdm.cadvisorperf" def configure (conf) super @@ -55,6 +56,7 @@ def enumerate() end router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@mdmtag, eventStream) if eventStream @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) $log.info("cAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}") diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb new file mode 100644 index 000000000..2e2ccbcb5 --- /dev/null +++ b/source/code/plugin/out_mdm.rb @@ -0,0 +1,219 @@ +module Fluent + + class OutputMDM < BufferedOutput + + config_param :retry_mdm_post_wait_minutes, :integer + + Plugin.register_output('out_mdm', self) + + def initialize + super + require 'net/http' + require 'net/https' + require 'uri' + require 'json' + require_relative 'KubernetesApiClient' + require_relative 'ApplicationInsightsUtility' + + @@token_resource_url = 'https://monitoring.azure.com/' + @@grant_type = 'client_credentials' + @@azure_json_path = '/etc/kubernetes/azure.json' + @@post_request_url_template = "https://%{aks_region}.monitoring.azure.com%{aks_resource_id}/metrics" + @@token_url_template = "https://login.microsoftonline.com/%{tenant_id}/oauth2/token" + @@plugin_name = "AKSCustomMetricsMDM" + + @data_hash = {} + @token_url = nil + @token_expiry_time = Time.now + @cached_access_token = String.new + @last_post_attempt_time = Time.now + @first_post_attempt_made = false + end + + def configure(conf) + s = conf.add_element("secondary") + s["type"] = ChunkErrorHandler::SecondaryName + super + end + + def start + super + file = File.read(@@azure_json_path) + # Handle the case where the file read fails. Send Telemetry and exit the plugin? + @data_hash = JSON.parse(file) + @token_url = @@token_url_template % {tenant_id: @data_hash['tenantId']} + @cached_access_token = get_access_token + aks_resource_id = ENV['AKS_RESOURCE_ID'] + aks_region = ENV['AKS_REGION'] + if aks_resource_id.to_s.empty? + @log.info "Environment Variable AKS_RESOURCE_ID is not set.. " + raise Exception.new "Environment Variable AKS_RESOURCE_ID is not set!!" + end + if aks_region.to_s.empty? + @log.info "Environment Variable AKS_REGION is not set.. " + raise Exception.new "Environment Variable AKS_REGION is not set!!" + end + @@post_request_url = @@post_request_url_template % {aks_region: aks_region, aks_resource_id: aks_resource_id} + @log.info "POST Request url: #{@@post_request_url}" + ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMPluginStart", {}) + end + + # get the access token only if the time to expiry is less than 5 minutes + def get_access_token + if @cached_access_token.to_s.empty? || (Time.now + 5*60 > @token_expiry_time) # token is valid for 60 minutes. Refresh token 5 minutes from expiration + @log.info "Refreshing access token for out_mdm plugin.." + token_uri = URI.parse(@token_url) + http_access_token = Net::HTTP.new(token_uri.host, token_uri.port) + http_access_token.use_ssl = true + token_request = Net::HTTP::Post.new(token_uri.request_uri) + token_request.set_form_data( + { + 'grant_type' => @@grant_type, + 'client_id' => @data_hash['aadClientId'], + 'client_secret' => @data_hash['aadClientSecret'], + 'resource' => @@token_resource_url + } + ) + + token_response = http_access_token.request(token_request) + # Handle the case where the response is not 200 + parsed_json = JSON.parse(token_response.body) + @token_expiry_time = Time.now + 59*60 # set the expiry time to be ~one hour from current time + @cached_access_token = parsed_json['access_token'] + end + @cached_access_token + end + + def write_status_file(success, message) + fn = '/var/opt/microsoft/omsagent/log/MDMIngestion.status' + status = '{ "operation": "MDMIngestion", "success": "%s", "message": "%s" }' % [success, message] + begin + File.open(fn,'w') { |file| file.write(status) } + rescue => e + @log.debug "Error:'#{e}'" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + end + + # This method is called when an event reaches to Fluentd. + # Convert the event to a raw string. + def format(tag, time, record) + if record != {} + @log.trace "Buffering #{tag}" + return [tag, record].to_msgpack + else + return "" + end + end + + # This method is called every flush interval. Send the buffer chunk to MDM. + # 'chunk' is a buffer chunk that includes multiple formatted records + def write(chunk) + if !@first_post_attempt_made || (Time.now > @last_post_attempt_time + retry_mdm_post_wait_minutes*60) + post_body = [] + chunk.msgpack_each {|(tag, record)| + post_body.push(record.to_json) + } + send_to_mdm post_body + else + @log.info "Last Failed POST attempt to MDM was made #{((Time.now - @last_post_attempt_time)/60).round(1)} min ago. This is less than the current retry threshold of #{@retry_mdm_post_wait_minutes} min. NO-OP" + end + end + + def send_to_mdm(post_body) + begin + access_token = get_access_token + uri = URI.parse(@@post_request_url) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = true + request = Net::HTTP::Post.new(uri.request_uri) + request['Content-Type'] = "application/x-ndjson" + request['Authorization'] = "Bearer #{access_token}" + request.body = post_body.join("\n") + response = http.request(request) + response.value # this throws for non 200 HTTP response code + @log.info "HTTP Post Response Code : #{response.code}" + ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMSendSuccessful", {}) + rescue Net::HTTPServerException => e + @log.info "Failed in Post: #{e} Code: #{response.code}" + if response.code == 403.to_s + @log.info "Response Code #{response.code} Updating @last_post_attempt_time" + @last_post_attempt_time = Time.now + @first_post_attempt_made = true + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + @log.debug_backtrace(e.backtrace) + end + end + private + + class ChunkErrorHandler + include Configurable + include PluginId + include PluginLoggerMixin + + SecondaryName = "__ChunkErrorHandler__" + + Plugin.register_output(SecondaryName, self) + + def initialize + @router = nil + end + + def secondary_init(primary) + @error_handlers = create_error_handlers @router + end + + def start + # NOP + end + + def shutdown + # NOP + end + + def router=(r) + @router = r + end + + def write(chunk) + chunk.msgpack_each {|(tag, record)| + @error_handlers[tag].emit(record) + } + end + + private + + def create_error_handlers(router) + nop_handler = NopErrorHandler.new + Hash.new() { |hash, tag| + etag = OMS::Common.create_error_tag tag + hash[tag] = router.match?(etag) ? + ErrorHandler.new(router, etag) : + nop_handler + } + end + + class ErrorHandler + def initialize(router, etag) + @router = router + @etag = etag + end + + def emit(record) + @router.emit(@etag, Fluent::Engine.now, record) + end + end + + class NopErrorHandler + def emit(record) + # NOP + end + end + + end + + end # class OutputMDM + +end # module Fluent + From ff18a2f7ad77825a43ceee3f75498a1740728522 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Tue, 29 Jan 2019 21:04:28 -0800 Subject: [PATCH 2/4] Adding a comment for AKS-Engine Scenario --- source/code/plugin/filter_cadvisor2mdm.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index 648d001b2..79aea80ce 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -105,7 +105,7 @@ def build_metrics_hash def check_custom_metrics_availability aks_region = ENV['AKS_REGION'] if aks_region.to_s.empty? - false + false # This will also take care of AKS-Engine Scenario. AKS_REGION is not set for AKS-Engine. Only ACS_RESOURCE_NAME is set end @log.debug "AKS_REGION #{aks_region}" custom_metrics_regions_arr = @custom_metrics_azure_regions.split(',') From 060fee11abed18c13728591ce8046193516895b7 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Wed, 6 Feb 2019 13:14:41 -0800 Subject: [PATCH 3/4] Partial PR Feedback - 1 --- installer/datafiles/base_container.data | 4 ++ source/code/plugin/KubernetesApiClient.rb | 1 + source/code/plugin/filter_cadvisor2mdm.rb | 46 ++++++++++++----------- source/code/plugin/out_mdm.rb | 13 ++++--- 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index fbd0ab46b..570b3afcb 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -173,6 +173,10 @@ touch /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt chmod 666 /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt +touch /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log +chmod 666 /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt +chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt + mv /etc/opt/microsoft/docker-cimprov/container.conf /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf chown omsagent:omsagent /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index a1e143b15..f43dc9b04 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -51,6 +51,7 @@ def getKubeResourceInfo(resource) kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) kubeApiRequest['Authorization'] = "Bearer " + getTokenStr @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" + @Log.info "KubernetesAPIClient::getKubeResourceInfo : Query Parameters #{URI::decode_www_form(uri.query).to_h} @ #{Time.now.utc.iso8601}" response = http.request(kubeApiRequest) @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" end diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index 79aea80ce..68c9d8c0f 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -5,18 +5,21 @@ module Fluent require 'logger' require 'json' + require_relative 'oms_common' class CAdvisor2MdmFilter < Filter Fluent::Plugin.register_filter('filter_cadvisor2mdm', self) config_param :enable_log, :integer, :default => 0 - config_param :log_path, :string, :default => '/var/opt/microsoft/omsagent/log/filter_cadvisor2mdm.log' + config_param :log_path, :string, :default => '/var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log' config_param :custom_metrics_azure_regions, :string config_param :metrics_to_collect, :string, :default => 'cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes' @@cpu_usage_milli_cores = 'cpuUsageMilliCores' @@cpu_usage_nano_cores = 'cpuusagenanocores' @@object_name_k8s_node = 'K8SNode' + @@hostName = (OMS::Common.get_hostname) + @process_incoming_stream = true @metrics_to_collect_hash = {} def initialize @@ -41,7 +44,7 @@ def start "data": { "baseData": { "metric": "%{metricName}", - "namespace": "Insights.Containers/node", + "namespace": "Insights.Container/nodes", "dimNames": [ "host" ], @@ -66,29 +69,29 @@ def start # initialize cpu and memory limit if @process_incoming_stream - @cpu_limit = 0.0 - @memory_limit = 0.0 + @cpu_capacity = 0.0 + @memory_capacity = 0.0 begin - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body) rescue Exception => e @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) end if !nodeInventory.nil? - cpu_limit_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") - if !cpu_limit_json.nil? - @cpu_limit = cpu_limit_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "CPU Limit #{@cpu_limit}" + cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") + if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "CPU Limit #{@cpu_capacity}" else - @log.info "Error getting cpu_limit" + @log.info "Error getting cpu_capacity" end - memory_limit_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") - if !memory_limit_json.nil? - @memory_limit = memory_limit_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "Memory Limit #{@memory_limit}" + memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") + if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "Memory Limit #{@memory_capacity}" else - @log.info "Error getting memory_limit" + @log.info "Error getting memory_capacity" end end end @@ -104,8 +107,9 @@ def build_metrics_hash def check_custom_metrics_availability aks_region = ENV['AKS_REGION'] - if aks_region.to_s.empty? - false # This will also take care of AKS-Engine Scenario. AKS_REGION is not set for AKS-Engine. Only ACS_RESOURCE_NAME is set + aks_resource_id = ENV['AKS_RESOURCE_ID'] + if aks_region.to_s.empty? && aks_resource_id.to_s.empty? + false # This will also take care of AKS-Engine Scenario. AKS_REGION/AKS_RESOURCE_ID is not set for AKS-Engine. Only ACS_RESOURCE_NAME is set end @log.debug "AKS_REGION #{aks_region}" custom_metrics_regions_arr = @custom_metrics_azure_regions.split(',') @@ -139,15 +143,15 @@ def filter(tag, time, record) if counter_name.downcase == @@cpu_usage_nano_cores metric_name = @@cpu_usage_milli_cores metric_value = metric_value/1000000 - if @cpu_limit != 0.0 - percentage_metric_value = (metric_value*1000000)*100/@cpu_limit + if @cpu_capacity != 0.0 + percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity end end if counter_name.start_with?("memory") metric_name = counter_name - if @memory_limit != 0.0 - percentage_metric_value = metric_value*100/@memory_limit + if @memory_capacity != 0.0 + percentage_metric_value = metric_value*100/@memory_capacity end end return get_metric_records(record, metric_name, metric_value, percentage_metric_value) diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index 2e2ccbcb5..ebbbff6a0 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -24,6 +24,7 @@ def initialize @data_hash = {} @token_url = nil + @http_client = nil @token_expiry_time = Time.now @cached_access_token = String.new @last_post_attempt_time = Time.now @@ -53,7 +54,11 @@ def start @log.info "Environment Variable AKS_REGION is not set.. " raise Exception.new "Environment Variable AKS_REGION is not set!!" end + @@post_request_url = @@post_request_url_template % {aks_region: aks_region, aks_resource_id: aks_resource_id} + @post_request_uri = URI.parse(@@post_request_url) + @http_client = Net::HTTP.new(@post_request_uri.host, @post_request_uri.port) + @http_client.use_ssl = true @log.info "POST Request url: #{@@post_request_url}" ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMPluginStart", {}) end @@ -123,14 +128,11 @@ def write(chunk) def send_to_mdm(post_body) begin access_token = get_access_token - uri = URI.parse(@@post_request_url) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - request = Net::HTTP::Post.new(uri.request_uri) + request = Net::HTTP::Post.new(@post_request_uri.request_uri) request['Content-Type'] = "application/x-ndjson" request['Authorization'] = "Bearer #{access_token}" request.body = post_body.join("\n") - response = http.request(request) + response = @http_client.request(request) response.value # this throws for non 200 HTTP response code @log.info "HTTP Post Response Code : #{response.code}" ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMSendSuccessful", {}) @@ -143,6 +145,7 @@ def send_to_mdm(post_body) ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) end @log.debug_backtrace(e.backtrace) + raise e end end private From cf00c8fee3ed28a85f0918e7ecb0ae4e9b226ced Mon Sep 17 00:00:00 2001 From: r-dilip Date: Wed, 6 Feb 2019 17:42:46 -0800 Subject: [PATCH 4/4] PR Feedback - 2 --- installer/datafiles/base_container.data | 4 +- source/code/plugin/KubernetesApiClient.rb | 1 - source/code/plugin/filter_cadvisor2mdm.rb | 127 ++++++++++++---------- source/code/plugin/out_mdm.rb | 37 +++++-- 4 files changed, 98 insertions(+), 71 deletions(-) diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 570b3afcb..9a08081fb 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -174,8 +174,8 @@ chmod 666 /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt touch /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log -chmod 666 /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt -chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt +chmod 666 /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log +chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log mv /etc/opt/microsoft/docker-cimprov/container.conf /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf chown omsagent:omsagent /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index f43dc9b04..a1e143b15 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -51,7 +51,6 @@ def getKubeResourceInfo(resource) kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) kubeApiRequest['Authorization'] = "Bearer " + getTokenStr @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" - @Log.info "KubernetesAPIClient::getKubeResourceInfo : Query Parameters #{URI::decode_www_form(uri.query).to_h} @ #{Time.now.utc.iso8601}" response = http.request(kubeApiRequest) @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" end diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index 68c9d8c0f..6edaa9224 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -19,26 +19,7 @@ class CAdvisor2MdmFilter < Filter @@cpu_usage_nano_cores = 'cpuusagenanocores' @@object_name_k8s_node = 'K8SNode' @@hostName = (OMS::Common.get_hostname) - - @process_incoming_stream = true - @metrics_to_collect_hash = {} - def initialize - super - end - - def configure(conf) - super - @log = nil - - if @enable_log - @log = Logger.new(@log_path, 'weekly') - @log.debug {'Starting filter_cadvisor2mdm plugin'} - end - end - - def start - super - @@custom_metrics_template = ' + @@custom_metrics_template = ' { "time": "%{timestamp}", "data": { @@ -63,37 +44,34 @@ def start } }' + @process_incoming_stream = true + @metrics_to_collect_hash = {} + + def initialize + super + end + + def configure(conf) + super + @log = nil + + if @enable_log + @log = Logger.new(@log_path, 'weekly') + @log.debug {'Starting filter_cadvisor2mdm plugin'} + end + end + + def start + super @process_incoming_stream = check_custom_metrics_availability @metrics_to_collect_hash = build_metrics_hash @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}" - + # initialize cpu and memory limit if @process_incoming_stream @cpu_capacity = 0.0 - @memory_capacity = 0.0 - - begin - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body) - rescue Exception => e - @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " - ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) - end - if !nodeInventory.nil? - cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") - if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? - @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "CPU Limit #{@cpu_capacity}" - else - @log.info "Error getting cpu_capacity" - end - memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") - if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? - @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "Memory Limit #{@memory_capacity}" - else - @log.info "Error getting memory_capacity" - end - end + @memory_capacity = 0.0 + ensure_cpu_memory_capacity_set end end @@ -131,14 +109,14 @@ def shutdown end def filter(tag, time, record) - if @process_incoming_stream - object_name = record['DataItems'][0]['ObjectName'] - counter_name = record['DataItems'][0]['Collections'][0]['CounterName'] - if object_name == @@object_name_k8s_node && @metrics_to_collect_hash.key?(counter_name.downcase) - percentage_metric_value = 0.0 - - # Compute and send % CPU and Memory - begin + begin + if @process_incoming_stream + object_name = record['DataItems'][0]['ObjectName'] + counter_name = record['DataItems'][0]['Collections'][0]['CounterName'] + if object_name == @@object_name_k8s_node && @metrics_to_collect_hash.key?(counter_name.downcase) + percentage_metric_value = 0.0 + + # Compute and send % CPU and Memory metric_value = record['DataItems'][0]['Collections'][0]['Value'] if counter_name.downcase == @@cpu_usage_nano_cores metric_name = @@cpu_usage_milli_cores @@ -155,19 +133,51 @@ def filter(tag, time, record) end end return get_metric_records(record, metric_name, metric_value, percentage_metric_value) - rescue Exception => e - @log.info "Error parsing cadvisor record Exception: #{e.class} Message: #{e.message}" - ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + else return [] end - else + else return [] end - else + rescue Exception => e + @log.info "Error processing cadvisor record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) return [] end end + def ensure_cpu_memory_capacity_set + + @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}" + if @cpu_capacity != 0.0 && @memory_capacity != 0.0 + @log.info "CPU And Memory Capacity are already set" + return + end + + begin + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body) + rescue Exception => e + @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + if !nodeInventory.nil? + cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") + if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "CPU Limit #{@cpu_capacity}" + else + @log.info "Error getting cpu_capacity" + end + memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") + if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "Memory Limit #{@memory_capacity}" + else + @log.info "Error getting memory_capacity" + end + end + end + def get_metric_records(record, metric_name, metric_value, percentage_metric_value) records = [] custommetricrecord = @@custom_metrics_template % { @@ -202,6 +212,7 @@ def get_metric_records(record, metric_name, metric_value, percentage_metric_valu def filter_stream(tag, es) new_es = MultiEventStream.new + ensure_cpu_memory_capacity_set es.each { |time, record| begin filtered_records = filter(tag, time, record) diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index ebbbff6a0..147ae0969 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -114,14 +114,18 @@ def format(tag, time, record) # This method is called every flush interval. Send the buffer chunk to MDM. # 'chunk' is a buffer chunk that includes multiple formatted records def write(chunk) - if !@first_post_attempt_made || (Time.now > @last_post_attempt_time + retry_mdm_post_wait_minutes*60) - post_body = [] - chunk.msgpack_each {|(tag, record)| - post_body.push(record.to_json) - } - send_to_mdm post_body - else - @log.info "Last Failed POST attempt to MDM was made #{((Time.now - @last_post_attempt_time)/60).round(1)} min ago. This is less than the current retry threshold of #{@retry_mdm_post_wait_minutes} min. NO-OP" + begin + if !@first_post_attempt_made || (Time.now > @last_post_attempt_time + retry_mdm_post_wait_minutes*60) + post_body = [] + chunk.msgpack_each {|(tag, record)| + post_body.push(record.to_json) + } + send_to_mdm post_body + else + @log.info "Last Failed POST attempt to MDM was made #{((Time.now - @last_post_attempt_time)/60).round(1)} min ago. This is less than the current retry threshold of #{@retry_mdm_post_wait_minutes} min. NO-OP" + end + rescue Exception => e + @log.info "Exception when writing to MDM: #{e}" end end @@ -137,14 +141,27 @@ def send_to_mdm(post_body) @log.info "HTTP Post Response Code : #{response.code}" ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMSendSuccessful", {}) rescue Net::HTTPServerException => e - @log.info "Failed in Post: #{e} Code: #{response.code}" - if response.code == 403.to_s + @log.info "Failed to Post Metrics to MDM : #{e} Response: #{response}" + @log.debug_backtrace(e.backtrace) + if !response.code.empty? && response.code == 403.to_s @log.info "Response Code #{response.code} Updating @last_post_attempt_time" @last_post_attempt_time = Time.now @first_post_attempt_made = true ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + # Not raising exception, as that will cause retries to happen + else + @log.info "HTTPServerException when POSTing Metrics to MDM #{e} Response: #{response}" + raise e end + rescue Errno::ETIMEDOUT => e + @log.info "Timed out when POSTing Metrics to MDM : #{e} Response: #{response}" @log.debug_backtrace(e.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + raise e + rescue Exception => e + @log.info "Exception POSTing Metrics to MDM : #{e} Response: #{response}" + @log.debug_backtrace(e.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) raise e end end