From 99da1bc59e90911a0ac07307a095498915df389c Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 13 Jul 2023 09:08:08 -0700 Subject: [PATCH] Added telemetry memory storage async class --- splitio/storage/inmemmory.py | 330 ++++++++++++++++++++++++- tests/storage/test_inmemory_storage.py | 290 +++++++++++++++++++++- 2 files changed, 608 insertions(+), 12 deletions(-) diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index 8dd35cef..5b8238c2 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -5,8 +5,10 @@ from collections import Counter from splitio.models.segments import Segment -from splitio.models.telemetry import HTTPErrors, HTTPLatencies, MethodExceptions, MethodLatencies, LastSynchronization, StreamingEvents, TelemetryConfig, TelemetryCounters, CounterConstants +from splitio.models.telemetry import HTTPErrors, HTTPLatencies, MethodExceptions, MethodLatencies, LastSynchronization, StreamingEvents, TelemetryConfig, TelemetryCounters, CounterConstants, \ + HTTPErrorsAsync, HTTPLatenciesAsync, MethodExceptionsAsync, MethodLatenciesAsync, LastSynchronizationAsync, StreamingEventsAsync, TelemetryConfigAsync, TelemetryCountersAsync from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage +from splitio.optional.loaders import asyncio MAX_SIZE_BYTES = 5 * 1024 * 1024 MAX_TAGS = 10 @@ -462,14 +464,158 @@ def clear(self): with self._lock: self._events = queue.Queue(maxsize=self._queue_size) -class InMemoryTelemetryStorage(TelemetryStorage): +class InMemoryTelemetryStorageBase(TelemetryStorage): + """In-memory telemetry storage base.""" + + def _reset_tags(self): + self._tags = [] + + def _reset_config_tags(self): + self._config_tags = [] + + def record_config(self, config, extra_config): + """Record configurations.""" + pass + + def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count): + """Record active and redundant factories.""" + pass + + def record_ready_time(self, ready_time): + """Record ready time.""" + pass + + def add_tag(self, tag): + """Record tag string.""" + pass + + def add_config_tag(self, tag): + """Record tag string.""" + pass + + def record_bur_time_out(self): + """Record block until ready timeout.""" + pass + + def record_not_ready_usage(self): + """record non-ready usage.""" + pass + + def record_latency(self, method, latency): + """Record method latency time.""" + pass + + def record_exception(self, method): + """Record method exception.""" + pass + + def record_impression_stats(self, data_type, count): + """Record impressions stats.""" + pass + + def record_event_stats(self, data_type, count): + """Record events stats.""" + pass + + def record_successful_sync(self, resource, time): + """Record successful sync.""" + pass + + def record_sync_error(self, resource, status): + """Record sync http error.""" + pass + + def record_sync_latency(self, resource, latency): + """Record latency time.""" + pass + + def record_auth_rejections(self): + """Record auth rejection.""" + pass + + def record_token_refreshes(self): + """Record sse token refresh.""" + pass + + def record_streaming_event(self, streaming_event): + """Record incoming streaming event.""" + pass + + def record_session_length(self, session): + """Record session length.""" + pass + + def get_bur_time_outs(self): + """Get block until ready timeout.""" + pass + + def get_non_ready_usage(self): + """Get non-ready usage.""" + pass + + def get_config_stats(self): + """Get all config info.""" + pass + + def pop_exceptions(self): + """Get and reset method exceptions.""" + pass + + def pop_tags(self): + """Get and reset tags.""" + pass + + def pop_config_tags(self): + """Get and reset tags.""" + pass + + def pop_latencies(self): + """Get and reset eval latencies.""" + pass + + def get_impressions_stats(self, type): + """Get impressions stats""" + pass + + def get_events_stats(self, type): + """Get events stats""" + pass + + def get_last_synchronization(self): + """Get last sync""" + pass + + def pop_http_errors(self): + """Get and reset http errors.""" + pass + + def pop_http_latencies(self): + """Get and reset http latencies.""" + pass + + def pop_auth_rejections(self): + """Get and reset auth rejections.""" + pass + + def pop_token_refreshes(self): + """Get and reset token refreshes.""" + pass + + def pop_streaming_events(self): + """Get and reset streaming events""" + pass + + def get_session_length(self): + """Get session length""" + pass + + +class InMemoryTelemetryStorage(InMemoryTelemetryStorageBase): """In-memory telemetry storage.""" def __init__(self): """Constructor""" self._lock = threading.RLock() - self._reset_tags() - self._reset_config_tags() self._method_exceptions = MethodExceptions() self._last_synchronization = LastSynchronization() self._counters = TelemetryCounters() @@ -478,14 +624,9 @@ def __init__(self): self._http_latencies = HTTPLatencies() self._streaming_events = StreamingEvents() self._tel_config = TelemetryConfig() - - def _reset_tags(self): - with self._lock: - self._tags = [] - - def _reset_config_tags(self): with self._lock: - self._config_tags = [] + self._reset_tags() + self._reset_config_tags() def record_config(self, config, extra_config): """Record configurations.""" @@ -632,6 +773,173 @@ def get_session_length(self): """Get session length""" return self._counters.get_session_length() + +class InMemoryTelemetryStorageAsync(InMemoryTelemetryStorageBase): + """In-memory telemetry async storage.""" + + async def create(): + """Constructor""" + self = InMemoryTelemetryStorageAsync() + self._lock = asyncio.Lock() + self._method_exceptions = await MethodExceptionsAsync.create() + self._last_synchronization = await LastSynchronizationAsync.create() + self._counters = await TelemetryCountersAsync.create() + self._http_sync_errors = await HTTPErrorsAsync.create() + self._method_latencies = await MethodLatenciesAsync.create() + self._http_latencies = await HTTPLatenciesAsync.create() + self._streaming_events = await StreamingEventsAsync.create() + self._tel_config = await TelemetryConfigAsync.create() + async with self._lock: + self._reset_tags() + self._reset_config_tags() + return self + + async def record_config(self, config, extra_config): + """Record configurations.""" + await self._tel_config.record_config(config, extra_config) + + async def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count): + """Record active and redundant factories.""" + await self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count) + + async def record_ready_time(self, ready_time): + """Record ready time.""" + await self._tel_config.record_ready_time(ready_time) + + async def add_tag(self, tag): + """Record tag string.""" + async with self._lock: + if len(self._tags) < MAX_TAGS: + self._tags.append(tag) + + async def add_config_tag(self, tag): + """Record tag string.""" + async with self._lock: + if len(self._config_tags) < MAX_TAGS: + self._config_tags.append(tag) + + async def record_bur_time_out(self): + """Record block until ready timeout.""" + await self._tel_config.record_bur_time_out() + + async def record_not_ready_usage(self): + """record non-ready usage.""" + await self._tel_config.record_not_ready_usage() + + async def record_latency(self, method, latency): + """Record method latency time.""" + await self._method_latencies.add_latency(method,latency) + + async def record_exception(self, method): + """Record method exception.""" + await self._method_exceptions.add_exception(method) + + async def record_impression_stats(self, data_type, count): + """Record impressions stats.""" + await self._counters.record_impressions_value(data_type, count) + + async def record_event_stats(self, data_type, count): + """Record events stats.""" + await self._counters.record_events_value(data_type, count) + + async def record_successful_sync(self, resource, time): + """Record successful sync.""" + await self._last_synchronization.add_latency(resource, time) + + async def record_sync_error(self, resource, status): + """Record sync http error.""" + await self._http_sync_errors.add_error(resource, status) + + async def record_sync_latency(self, resource, latency): + """Record latency time.""" + await self._http_latencies.add_latency(resource, latency) + + async def record_auth_rejections(self): + """Record auth rejection.""" + await self._counters.record_auth_rejections() + + async def record_token_refreshes(self): + """Record sse token refresh.""" + await self._counters.record_token_refreshes() + + async def record_streaming_event(self, streaming_event): + """Record incoming streaming event.""" + await self._streaming_events.record_streaming_event(streaming_event) + + async def record_session_length(self, session): + """Record session length.""" + await self._counters.record_session_length(session) + + async def get_bur_time_outs(self): + """Get block until ready timeout.""" + return await self._tel_config.get_bur_time_outs() + + async def get_non_ready_usage(self): + """Get non-ready usage.""" + return await self._tel_config.get_non_ready_usage() + + async def get_config_stats(self): + """Get all config info.""" + return await self._tel_config.get_stats() + + async def pop_exceptions(self): + """Get and reset method exceptions.""" + return await self._method_exceptions.pop_all() + + async def pop_tags(self): + """Get and reset tags.""" + async with self._lock: + tags = self._tags + self._reset_tags() + return tags + + async def pop_config_tags(self): + """Get and reset tags.""" + async with self._lock: + tags = self._config_tags + self._reset_config_tags() + return tags + + async def pop_latencies(self): + """Get and reset eval latencies.""" + return await self._method_latencies.pop_all() + + async def get_impressions_stats(self, type): + """Get impressions stats""" + return await self._counters.get_counter_stats(type) + + async def get_events_stats(self, type): + """Get events stats""" + return await self._counters.get_counter_stats(type) + + async def get_last_synchronization(self): + """Get last sync""" + return await self._last_synchronization.get_all() + + async def pop_http_errors(self): + """Get and reset http errors.""" + return await self._http_sync_errors.pop_all() + + async def pop_http_latencies(self): + """Get and reset http latencies.""" + return await self._http_latencies.pop_all() + + async def pop_auth_rejections(self): + """Get and reset auth rejections.""" + return await self._counters.pop_auth_rejections() + + async def pop_token_refreshes(self): + """Get and reset token refreshes.""" + return await self._counters.pop_token_refreshes() + + async def pop_streaming_events(self): + return await self._streaming_events.pop_streaming_events() + + async def get_session_length(self): + """Get session length""" + return await self._counters.get_session_length() + + class LocalhostTelemetryStorage(): """Localhost telemetry storage.""" def do_nothing(*_, **__): diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index 7319548d..05b23721 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -11,7 +11,7 @@ from splitio.engine.telemetry import TelemetryStorageProducer from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ - InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage + InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage, InMemoryTelemetryStorageAsync class InMemorySplitStorageTests(object): @@ -715,3 +715,291 @@ def test_pop_latencies(self): assert(sync_latency == {'httpLatencies': {'split': [4] + [0] * 22, 'segment': [4] + [0] * 22, 'impression': [2] + [0] * 22, 'impressionCount': [2] + [0] * 22, 'event': [2] + [0] * 22, 'telemetry': [3] + [0] * 22, 'token': [3] + [0] * 22}}) + + +class InMemoryTelemetryStorageAsyncTests(object): + """InMemory telemetry async storage test cases.""" + + @pytest.mark.asyncio + async def test_resets(self): + storage = await InMemoryTelemetryStorageAsync.create() + + assert(storage._counters._impressions_queued == 0) + assert(storage._counters._impressions_deduped == 0) + assert(storage._counters._impressions_dropped == 0) + assert(storage._counters._events_dropped == 0) + assert(storage._counters._events_queued == 0) + assert(storage._counters._auth_rejections == 0) + assert(storage._counters._token_refreshes == 0) + + assert(await storage._method_exceptions.pop_all() == {'methodExceptions': {'treatment': 0, 'treatments': 0, 'treatment_with_config': 0, 'treatments_with_config': 0, 'track': 0}}) + assert(await storage._last_synchronization.get_all() == {'lastSynchronizations': {'split': 0, 'segment': 0, 'impression': 0, 'impressionCount': 0, 'event': 0, 'telemetry': 0, 'token': 0}}) + assert(await storage._http_sync_errors.pop_all() == {'httpErrors': {'split': {}, 'segment': {}, 'impression': {}, 'impressionCount': {}, 'event': {}, 'telemetry': {}, 'token': {}}}) + assert(await storage._tel_config.get_stats() == { + 'bT':0, + 'nR':0, + 'tR': 0, + 'oM': None, + 'sT': None, + 'sE': None, + 'rR': {'sp': 0, 'se': 0, 'im': 0, 'ev': 0, 'te': 0}, + 'uO': {'s': False, 'e': False, 'a': False, 'st': False, 't': False}, + 'iQ': 0, + 'eQ': 0, + 'iM': None, + 'iL': False, + 'hp': None, + 'aF': 0, + 'rF': 0 + }) + assert(await storage._streaming_events.pop_streaming_events() == {'streamingEvents': []}) + assert(storage._tags == []) + + assert(await storage._method_latencies.pop_all() == {'methodLatencies': {'treatment': [0] * 23, 'treatments': [0] * 23, 'treatment_with_config': [0] * 23, 'treatments_with_config': [0] * 23, 'track': [0] * 23}}) + assert(await storage._http_latencies.pop_all() == {'httpLatencies': {'split': [0] * 23, 'segment': [0] * 23, 'impression': [0] * 23, 'impressionCount': [0] * 23, 'event': [0] * 23, 'telemetry': [0] * 23, 'token': [0] * 23}}) + + @pytest.mark.asyncio + async def test_record_config(self): + storage = await InMemoryTelemetryStorageAsync.create() + config = {'operationMode': 'standalone', + 'streamingEnabled': True, + 'impressionsQueueSize': 100, + 'eventsQueueSize': 200, + 'impressionsMode': 'DEBUG','' + 'impressionListener': None, + 'featuresRefreshRate': 30, + 'segmentsRefreshRate': 30, + 'impressionsRefreshRate': 60, + 'eventsPushRate': 60, + 'metricsRefreshRate': 10, + 'storageType': None + } + await storage.record_config(config, {}) + await storage.record_active_and_redundant_factories(1, 0) + assert(await storage._tel_config.get_stats() == {'oM': 0, + 'sT': storage._tel_config._get_storage_type(config['operationMode'], config['storageType']), + 'sE': config['streamingEnabled'], + 'rR': {'sp': 30, 'se': 30, 'im': 60, 'ev': 60, 'te': 10}, + 'uO': {'s': False, 'e': False, 'a': False, 'st': False, 't': False}, + 'iQ': config['impressionsQueueSize'], + 'eQ': config['eventsQueueSize'], + 'iM': storage._tel_config._get_impressions_mode(config['impressionsMode']), + 'iL': True if config['impressionListener'] is not None else False, + 'hp': storage._tel_config._check_if_proxy_detected(), + 'bT': 0, + 'tR': 0, + 'nR': 0, + 'aF': 1, + 'rF': 0} + ) + + @pytest.mark.asyncio + async def test_record_counters(self): + storage = await InMemoryTelemetryStorageAsync.create() + + await storage.record_ready_time(10) + assert(storage._tel_config._time_until_ready == 10) + + await storage.add_tag('tag') + assert('tag' in storage._tags) + [await storage.add_tag('tag') for i in range(1, 25)] + assert(len(storage._tags) == 10) + + await storage.record_bur_time_out() + await storage.record_bur_time_out() + assert(await storage._tel_config.get_bur_time_outs() == 2) + + await storage.record_not_ready_usage() + await storage.record_not_ready_usage() + assert(await storage._tel_config.get_non_ready_usage() == 2) + + await storage.record_exception(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENT) + assert(storage._method_exceptions._treatment == 1) + + await storage.record_impression_stats(ModelTelemetry.CounterConstants.IMPRESSIONS_QUEUED, 5) + assert(await storage._counters.get_counter_stats(ModelTelemetry.CounterConstants.IMPRESSIONS_QUEUED) == 5) + + await storage.record_event_stats(ModelTelemetry.CounterConstants.EVENTS_DROPPED, 6) + assert(await storage._counters.get_counter_stats(ModelTelemetry.CounterConstants.EVENTS_DROPPED) == 6) + + await storage.record_successful_sync(ModelTelemetry.HTTPExceptionsAndLatencies.SEGMENT, 10) + assert(storage._last_synchronization._segment == 10) + + await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.SEGMENT, '500') + assert(storage._http_sync_errors._segment['500'] == 1) + + await storage.record_auth_rejections() + await storage.record_auth_rejections() + assert(await storage._counters.pop_auth_rejections() == 2) + + await storage.record_token_refreshes() + await storage.record_token_refreshes() + assert(await storage._counters.pop_token_refreshes() == 2) + + await storage.record_streaming_event((ModelTelemetry.StreamingEventTypes.CONNECTION_ESTABLISHED, 'split', 1234)) + assert(await storage._streaming_events.pop_streaming_events() == {'streamingEvents': [{'e': ModelTelemetry.StreamingEventTypes.CONNECTION_ESTABLISHED.value, 'd': 'split', 't': 1234}]}) + [await storage.record_streaming_event((ModelTelemetry.StreamingEventTypes.CONNECTION_ESTABLISHED, 'split', 1234)) for i in range(1, 25)] + assert(len(storage._streaming_events._streaming_events) == 20) + + await storage.record_session_length(20) + assert(await storage._counters.get_session_length() == 20) + + @pytest.mark.asyncio + async def test_record_latencies(self): + storage = await InMemoryTelemetryStorageAsync.create() + + for method in ModelTelemetry.MethodExceptionsAndLatencies: + if self._get_method_latency(method, storage) == None: + continue + await storage.record_latency(method, 50) + assert(self._get_method_latency(method, storage)[ModelTelemetry.get_latency_bucket_index(50)] == 1) + await storage.record_latency(method, 50000000) + assert(self._get_method_latency(method, storage)[ModelTelemetry.get_latency_bucket_index(50000000)] == 1) + for j in range(10): + latency = random.randint(1001, 4987885) + current_count = self._get_method_latency(method, storage)[ModelTelemetry.get_latency_bucket_index(latency)] + [await storage.record_latency(method, latency) for i in range(2)] + assert(self._get_method_latency(method, storage)[ModelTelemetry.get_latency_bucket_index(latency)] == 2 + current_count) + + for resource in ModelTelemetry.HTTPExceptionsAndLatencies: + if self._get_http_latency(resource, storage) == None: + continue + await storage.record_sync_latency(resource, 50) + assert(self._get_http_latency(resource, storage)[ModelTelemetry.get_latency_bucket_index(50)] == 1) + await storage.record_sync_latency(resource, 50000000) + assert(self._get_http_latency(resource, storage)[ModelTelemetry.get_latency_bucket_index(50000000)] == 1) + for j in range(10): + latency = random.randint(1001, 4987885) + current_count = self._get_http_latency(resource, storage)[ModelTelemetry.get_latency_bucket_index(latency)] + [await storage.record_sync_latency(resource, latency) for i in range(2)] + assert(self._get_http_latency(resource, storage)[ModelTelemetry.get_latency_bucket_index(latency)] == 2 + current_count) + + def _get_method_latency(self, resource, storage): + if resource == ModelTelemetry.MethodExceptionsAndLatencies.TREATMENT: + return storage._method_latencies._treatment + elif resource == ModelTelemetry.MethodExceptionsAndLatencies.TREATMENTS: + return storage._method_latencies._treatments + elif resource == ModelTelemetry.MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG: + return storage._method_latencies._treatment_with_config + elif resource == ModelTelemetry.MethodExceptionsAndLatencies.TREATMENTS_WITH_CONFIG: + return storage._method_latencies._treatments_with_config + elif resource == ModelTelemetry.MethodExceptionsAndLatencies.TRACK: + return storage._method_latencies._track + else: + return + + def _get_http_latency(self, resource, storage): + if resource == ModelTelemetry.HTTPExceptionsAndLatencies.SPLIT: + return storage._http_latencies._split + elif resource == ModelTelemetry.HTTPExceptionsAndLatencies.SEGMENT: + return storage._http_latencies._segment + elif resource == ModelTelemetry.HTTPExceptionsAndLatencies.IMPRESSION: + return storage._http_latencies._impression + elif resource == ModelTelemetry.HTTPExceptionsAndLatencies.IMPRESSION_COUNT: + return storage._http_latencies._impression_count + elif resource == ModelTelemetry.HTTPExceptionsAndLatencies.EVENT: + return storage._http_latencies._event + elif resource == ModelTelemetry.HTTPExceptionsAndLatencies.TELEMETRY: + return storage._http_latencies._telemetry + elif resource == ModelTelemetry.HTTPExceptionsAndLatencies.TOKEN: + return storage._http_latencies._token + else: + return + + @pytest.mark.asyncio + async def test_pop_counters(self): + storage = await InMemoryTelemetryStorageAsync.create() + + [await storage.record_exception(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENT) for i in range(2)] + await storage.record_exception(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENTS) + await storage.record_exception(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG) + [await storage.record_exception(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENTS_WITH_CONFIG) for i in range(5)] + [await storage.record_exception(ModelTelemetry.MethodExceptionsAndLatencies.TRACK) for i in range(3)] + exceptions = await storage.pop_exceptions() + assert(storage._method_exceptions._treatment == 0) + assert(storage._method_exceptions._treatments == 0) + assert(storage._method_exceptions._treatment_with_config == 0) + assert(storage._method_exceptions._treatments_with_config == 0) + assert(storage._method_exceptions._track == 0) + assert(exceptions == {'methodExceptions': {'treatment': 2, 'treatments': 1, 'treatment_with_config': 1, 'treatments_with_config': 5, 'track': 3}}) + + await storage.add_tag('tag1') + await storage.add_tag('tag2') + tags = await storage.pop_tags() + assert(storage._tags == []) + assert(tags == ['tag1', 'tag2']) + + [await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.SEGMENT, str(i)) for i in [500, 501, 502]] + [await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.SPLIT, str(i)) for i in [400, 401, 402]] + await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.IMPRESSION, '502') + [await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.IMPRESSION_COUNT, str(i)) for i in [501, 502]] + await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.EVENT, '501') + await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.TELEMETRY, '505') + [await storage.record_sync_error(ModelTelemetry.HTTPExceptionsAndLatencies.TOKEN, '502') for i in range(5)] + http_errors = await storage.pop_http_errors() + assert(http_errors == {'httpErrors': {'split': {'400': 1, '401': 1, '402': 1}, 'segment': {'500': 1, '501': 1, '502': 1}, + 'impression': {'502': 1}, 'impressionCount': {'501': 1, '502': 1}, + 'event': {'501': 1}, 'telemetry': {'505': 1}, 'token': {'502': 5}}}) + assert(storage._http_sync_errors._split == {}) + assert(storage._http_sync_errors._segment == {}) + assert(storage._http_sync_errors._impression == {}) + assert(storage._http_sync_errors._impression_count == {}) + assert(storage._http_sync_errors._event == {}) + assert(storage._http_sync_errors._telemetry == {}) + + await storage.record_auth_rejections() + await storage.record_auth_rejections() + auth_rejections = await storage.pop_auth_rejections() + assert(storage._counters._auth_rejections == 0) + assert(auth_rejections == 2) + + await storage.record_token_refreshes() + await storage.record_token_refreshes() + token_refreshes = await storage.pop_token_refreshes() + assert(storage._counters._token_refreshes == 0) + assert(token_refreshes == 2) + + await storage.record_streaming_event((ModelTelemetry.StreamingEventTypes.CONNECTION_ESTABLISHED, 'split', 1234)) + await storage.record_streaming_event((ModelTelemetry.StreamingEventTypes.OCCUPANCY_PRI, 'split', 1234)) + streaming_events = await storage.pop_streaming_events() + assert(storage._streaming_events._streaming_events == []) + assert(streaming_events == {'streamingEvents': [{'e': ModelTelemetry.StreamingEventTypes.CONNECTION_ESTABLISHED.value, 'd': 'split', 't': 1234}, + {'e': ModelTelemetry.StreamingEventTypes.OCCUPANCY_PRI.value, 'd': 'split', 't': 1234}]}) + + @pytest.mark.asyncio + async def test_pop_latencies(self): + storage = await InMemoryTelemetryStorageAsync.create() + + [await storage.record_latency(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENT, i) for i in [5, 10, 10, 10]] + [await storage.record_latency(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENTS, i) for i in [7, 10, 14, 13]] + [await storage.record_latency(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENT_WITH_CONFIG, i) for i in [200]] + [await storage.record_latency(ModelTelemetry.MethodExceptionsAndLatencies.TREATMENTS_WITH_CONFIG, i) for i in [50, 40]] + [await storage.record_latency(ModelTelemetry.MethodExceptionsAndLatencies.TRACK, i) for i in [1, 10, 100]] + latencies = await storage.pop_latencies() + + assert(storage._method_latencies._treatment == [0] * 23) + assert(storage._method_latencies._treatments == [0] * 23) + assert(storage._method_latencies._treatment_with_config == [0] * 23) + assert(storage._method_latencies._treatments_with_config == [0] * 23) + assert(storage._method_latencies._track == [0] * 23) + assert(latencies == {'methodLatencies': {'treatment': [4] + [0] * 22, 'treatments': [4] + [0] * 22, + 'treatment_with_config': [1] + [0] * 22, 'treatments_with_config': [2] + [0] * 22, 'track': [3] + [0] * 22}}) + + [await storage.record_sync_latency(ModelTelemetry.HTTPExceptionsAndLatencies.SPLIT, i) for i in [50, 10, 20, 40]] + [await storage.record_sync_latency(ModelTelemetry.HTTPExceptionsAndLatencies.SEGMENT, i) for i in [70, 100, 40, 30]] + [await storage.record_sync_latency(ModelTelemetry.HTTPExceptionsAndLatencies.IMPRESSION, i) for i in [10, 20]] + [await storage.record_sync_latency(ModelTelemetry.HTTPExceptionsAndLatencies.IMPRESSION_COUNT, i) for i in [5, 10]] + [await storage.record_sync_latency(ModelTelemetry.HTTPExceptionsAndLatencies.EVENT, i) for i in [50, 40]] + [await storage.record_sync_latency(ModelTelemetry.HTTPExceptionsAndLatencies.TELEMETRY, i) for i in [100, 50, 160]] + [await storage.record_sync_latency(ModelTelemetry.HTTPExceptionsAndLatencies.TOKEN, i) for i in [10, 15, 100]] + sync_latency = await storage.pop_http_latencies() + + assert(storage._http_latencies._split == [0] * 23) + assert(storage._http_latencies._segment == [0] * 23) + assert(storage._http_latencies._impression == [0] * 23) + assert(storage._http_latencies._impression_count == [0] * 23) + assert(storage._http_latencies._telemetry == [0] * 23) + assert(storage._http_latencies._token == [0] * 23) + assert(sync_latency == {'httpLatencies': {'split': [4] + [0] * 22, 'segment': [4] + [0] * 22, + 'impression': [2] + [0] * 22, 'impressionCount': [2] + [0] * 22, 'event': [2] + [0] * 22, + 'telemetry': [3] + [0] * 22, 'token': [3] + [0] * 22}})