forked from DataDog/datadogpy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_lambda_wrapper_thread_safety.py
More file actions
51 lines (35 loc) · 1.58 KB
/
test_lambda_wrapper_thread_safety.py
File metadata and controls
51 lines (35 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Unless explicitly stated otherwise all files in this repository are licensed under the BSD-3-Clause License.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2015-Present Datadog, Inc
import time
# import unittest
import threading
from datadog import lambda_metric, datadog_lambda_wrapper
from datadog.threadstats.aws_lambda import _lambda_stats
TOTAL_NUMBER_OF_THREADS = 1000
class MemoryReporter(object):
""" A reporting class that reports to memory for testing. """
def __init__(self):
self.distributions = []
self.dist_flush_counter = 0
def flush_distributions(self, dists):
self.distributions += dists
self.dist_flush_counter = self.dist_flush_counter + 1
@datadog_lambda_wrapper
def wrapped_function(id):
lambda_metric("dist_" + str(id), 42)
# sleep makes the os continue another thread
time.sleep(0.001)
lambda_metric("common_dist", 42)
# Lambda wrapper - mute thread safety test, python 2.7 issues
# class TestWrapperThreadSafety(unittest.TestCase):
# def test_wrapper_thread_safety(self):
# _lambda_stats.reporter = MemoryReporter()
# for i in range(TOTAL_NUMBER_OF_THREADS):
# threading.Thread(target=wrapped_function, args=[i]).start()
# # Wait all threads to finish
# time.sleep(10)
# # Check that at least one flush happened
# self.assertGreater(_lambda_stats.reporter.dist_flush_counter, 0)
# dists = _lambda_stats.reporter.distributions
# self.assertEqual(len(dists), TOTAL_NUMBER_OF_THREADS + 1)