-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
805 lines (668 loc) · 38.3 KB
/
main.py
File metadata and controls
805 lines (668 loc) · 38.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
import asyncio
import aiohttp
import json
import random
import time
from datetime import datetime
from msal import ConfidentialClientApplication
import logging
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import threading
import os
import sys
# Configure logging for console output and main application events
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Create separate logger for detailed Graph API request/response logging
# This captures all HTTP interactions for debugging and analysis
api_logger = logging.getLogger('graph_api')
api_logger.setLevel(logging.DEBUG)
api_log_handler = logging.FileHandler('graph_api_log.txt', mode='w', encoding='utf-8')
api_log_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
api_logger.addHandler(api_log_handler)
class GraphMailTester:
"""
Main class for testing Microsoft Graph API sendMail rate limits.
Uses MSAL for app-only authentication with client credentials and supports
concurrent sending across multiple mailboxes to test Exchange Online and
Graph API throttling limits.
Key limits:
- Graph API: 4 concurrent requests per application (hard limit)
- Graph API: 10,000 requests per 10 minutes per mailbox (1,000/min average)
- Exchange Online: ~30 messages per minute per mailbox (separate limit)
"""
def __init__(self, tenant_id, client_id, client_secret, recipient_email, num_mailboxes=1, sender_mailboxes=None):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.recipient_email = recipient_email
self.num_mailboxes = min(num_mailboxes, 5) # Cap at 5 mailboxes for testing
self.access_token = None
self.mailbox_users = sender_mailboxes or []
self.test_running = False
# Sample email content for testing - randomized to simulate real usage
self.subjects = [
"Test Email - Rate Limit Testing",
"Automated Test Message",
"Graph API Mail Test",
"Performance Testing Email",
"Rate Limit Validation"
]
self.bodies = [
"This is a test email for Graph API rate limit testing.",
"Automated message sent to validate Exchange Online limits.",
"Testing mail throughput with Graph API.",
"Performance validation email - please ignore.",
"Rate limiting test in progress."
]
async def authenticate(self):
"""
Authenticate using MSAL client credentials flow (app-only context).
Requires Azure AD app registration with Mail.Send application permission.
Returns access token valid for Graph API requests.
"""
try:
app = ConfidentialClientApplication(
client_id=self.client_id,
client_credential=self.client_secret,
authority=f"https://login.microsoftonline.com/{self.tenant_id}"
)
# Use application permissions for app-only context (not delegated)
result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"])
if "access_token" in result:
self.access_token = result["access_token"]
logger.info("Successfully authenticated with app-only credentials")
return True
else:
logger.error(f"Authentication failed: {result.get('error_description')}")
return False
except Exception as e:
logger.error(f"Authentication error: {str(e)}")
return False
async def send_email(self, session, mailbox_user):
"""
Send a single email via Graph API using the /users/{id}/sendMail endpoint.
Args:
session: aiohttp.ClientSession for connection pooling
mailbox_user: Email address of the sender mailbox
Returns:
tuple: (success: bool, message: str) indicating result
Logs all HTTP requests and responses to graph_api_log.txt for analysis.
"""
if not self.access_token:
return False, "No access token available"
# Generate random subject and body with timestamp for uniqueness
subject = random.choice(self.subjects) + f" - {datetime.now().strftime('%H:%M:%S.%f')[:-3]}"
body = random.choice(self.bodies) + f" Sent at {datetime.now()}"
email_data = {
"message": {
"subject": subject,
"body": {
"contentType": "Text",
"content": body
},
"toRecipients": [
{
"emailAddress": {
"address": self.recipient_email
}
}
]
}
}
headers = {
"Authorization": f"Bearer {self.access_token[:20]}...", # Truncated for logging security
"Content-Type": "application/json"
}
# Use mailbox delegation - send as specific user with app-only permissions
url = f"https://graph.microsoft.com/v1.0/users/{mailbox_user}/sendMail"
# Log complete API request for debugging (with truncated token for security)
api_logger.debug("="*80)
api_logger.debug(f"REQUEST: POST {url}")
api_logger.debug(f"Headers: {json.dumps(headers, indent=2)}")
api_logger.debug(f"Body: {json.dumps(email_data, indent=2)}")
# Use full token for actual HTTP request
request_headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
# Track request timing with high-precision counter for latency analysis
request_start = time.perf_counter()
try:
async with session.post(url, json=email_data, headers=request_headers) as response:
request_duration = time.perf_counter() - request_start
# Capture complete response for debugging
response_headers = dict(response.headers)
response_text = await response.text()
# Log detailed response information
api_logger.debug(f"RESPONSE: {response.status} {response.reason}")
api_logger.debug(f"Duration: {request_duration:.3f}s")
api_logger.debug(f"Headers: {json.dumps(response_headers, indent=2)}")
if response_text:
api_logger.debug(f"Body: {response_text}")
api_logger.debug("="*80 + "\n")
# Handle different response codes
if response.status == 202: # Success - email queued for delivery
logger.info(f"✓ Email sent successfully from {mailbox_user} ({request_duration:.3f}s)")
return True, "Success"
elif response.status == 429: # Rate limited - Graph API throttling
retry_after = response.headers.get('Retry-After', 60)
logger.warning(f"⚠ Rate limited for {mailbox_user}. Retry after: {retry_after}s")
return False, f"Rate limited - retry after {retry_after}s"
else: # Other error
logger.error(f"✗ Failed to send email from {mailbox_user}: {response.status} - {response_text}")
return False, f"HTTP {response.status}"
except Exception as e:
request_duration = time.perf_counter() - request_start
api_logger.debug(f"ERROR after {request_duration:.3f}s: {str(e)}")
api_logger.debug("="*80 + "\n")
logger.error(f"✗ Error sending email from {mailbox_user}: {str(e)}")
return False, str(e)
async def run_rate_limit_test(self, emails_per_minute=30, test_duration_minutes=5, progress_callback=None):
"""
Execute rate limit test with concurrent sending across multiple mailboxes.
Automatically selects sending mode based on target rate:
- Low-rate mode (≤100/min): Sequential sending with precise timing
- High-volume mode (>100/min): Batch concurrent sending up to API limits
Args:
emails_per_minute: Target rate per mailbox (not aggregate)
test_duration_minutes: How long to run the test
progress_callback: Optional GUI callback for progress updates
Key constraints respected:
- Graph API: 4 concurrent requests per application (shared across all mailboxes)
- Graph API: 10,000 requests/10min per mailbox (1,000/min average)
- Typical latency: 200-250ms median, 487ms P95
"""
self.test_running = True
logger.info(f"Starting rate limit test with {len(self.mailbox_users)} mailboxes")
logger.info(f"Target rate: {emails_per_minute} emails/minute PER MAILBOX")
logger.info(f"Total target rate: {emails_per_minute * len(self.mailbox_users)} emails/minute AGGREGATE")
# Validate mailbox users
if not self.mailbox_users:
logger.error("No sender mailboxes configured")
self.test_running = False
return
# Authenticate
auth_success = await self.authenticate()
if not auth_success:
logger.error("Authentication failed. Cannot proceed.")
self.test_running = False
return
logger.info(f"Successfully authenticated. Using {len(self.mailbox_users)} sender mailbox(es)")
# Calculate total emails - each mailbox sends at the same rate
emails_per_mailbox = int(emails_per_minute * test_duration_minutes)
total_emails = emails_per_mailbox * len(self.mailbox_users)
# Graph API has a hard limit of 4 concurrent requests per application
# This is THE critical constraint - violating it causes immediate throttling
MAX_CONCURRENT_REQUESTS = 4
# Determine concurrency strategy based on target rate
# For high rates (>100/min), use parallel requests up to the API limit
if emails_per_minute > 100:
concurrent_requests_per_mailbox = min(MAX_CONCURRENT_REQUESTS, max(1, int(emails_per_minute / 200)))
logger.info(f"High-volume mode: {concurrent_requests_per_mailbox} concurrent requests per mailbox")
logger.info(f"Graph API allows max {MAX_CONCURRENT_REQUESTS} concurrent requests per app")
else:
# Low rate - use sequential sending for precision
concurrent_requests_per_mailbox = 1
# Statistics tracking - must be thread-safe for concurrent updates
stats = {
"sent": 0, # Successfully sent (HTTP 202)
"failed": 0, # Failed (non-202, non-429)
"rate_limited": 0, # Throttled (HTTP 429)
"start_time": time.time(),
"rate_limit_times": [], # Track when throttling occurs
"lock": asyncio.Lock() # Ensure thread-safe stat updates
}
async def send_from_mailbox(mailbox_user, mailbox_index):
"""
Send emails from a single mailbox at the target rate.
Creates dedicated aiohttp session with connection pooling.
Uses either sequential (low-rate) or batch concurrent (high-volume) mode.
"""
# Create session with connection pool optimized for Graph API limits
# Limit connections to avoid overwhelming the API
connector = aiohttp.TCPConnector(limit_per_host=10, limit=20)
async with aiohttp.ClientSession(connector=connector) as session:
# For high rates, use batch sending with semaphore to control concurrency
if emails_per_minute > 100:
# Semaphore limits concurrent requests to respect Graph API's 4-request limit
semaphore = asyncio.Semaphore(concurrent_requests_per_mailbox)
async def send_with_semaphore(email_index):
"""Send single email with semaphore-controlled concurrency"""
async with semaphore: # Acquire semaphore slot
success, message = await self.send_email(session, mailbox_user)
# Thread-safe statistics update
async with stats["lock"]:
if success:
stats["sent"] += 1
elif "rate limited" in message.lower():
stats["rate_limited"] += 1
stats["rate_limit_times"].append(time.time() - stats["start_time"])
else:
stats["failed"] += 1
# Progress reporting - every 50 emails for high volume
total_processed = stats["sent"] + stats["failed"] + stats["rate_limited"]
if total_processed % 50 == 0:
elapsed = time.time() - stats["start_time"]
current_rate = stats["sent"] / (elapsed / 60) if elapsed > 0 else 0
progress_msg = (f"Progress: {total_processed}/{total_emails} - "
f"Rate: {current_rate:.1f}/min - "
f"Sent: {stats['sent']}, Failed: {stats['failed']}, "
f"Rate Limited: {stats['rate_limited']}")
logger.info(progress_msg)
if progress_callback:
progress_callback(progress_msg, stats, elapsed, total_emails)
# Calculate batch timing to maintain target rate
# With median latency of ~200ms, we can send bursts then wait
emails_per_second = emails_per_minute / 60
# Send in bursts with delays between bursts to maintain target rate
batch_size = concurrent_requests_per_mailbox # Typically 4
num_batches = (emails_per_mailbox + batch_size - 1) // batch_size
interval_between_batches = batch_size / emails_per_second if emails_per_second > 0 else 1
logger.info(f"[{mailbox_user}] Sending {emails_per_mailbox} emails in {num_batches} batches of {batch_size}")
# Execute batches with timing control
for batch_num in range(num_batches):
if not self.test_running: # Allow manual stop
break
batch_start_time = time.perf_counter()
# Send batch concurrently (all requests in parallel)
start_idx = batch_num * batch_size
end_idx = min(start_idx + batch_size, emails_per_mailbox)
tasks = [send_with_semaphore(i) for i in range(start_idx, end_idx)]
await asyncio.gather(*tasks) # Wait for all in batch to complete
# Delay before next batch to maintain target rate
# Compensate for actual batch duration
if batch_num < num_batches - 1 and self.test_running:
batch_duration = time.perf_counter() - batch_start_time
sleep_time = interval_between_batches - batch_duration
if sleep_time > 0:
await asyncio.sleep(sleep_time)
else:
# Low-rate mode: sequential sending with precise timing
# Used for ≤100 emails/min where accuracy is prioritized over throughput
logger.info(f"[{mailbox_user}] Low-rate mode: Sequential sending")
emails_per_second = emails_per_minute / 60
interval_between_emails = 1 / emails_per_second if emails_per_second > 0 else 1
next_send_time = time.perf_counter() # Pre-schedule to avoid drift
for email_count in range(emails_per_mailbox):
if not self.test_running: # Allow manual stop
break
# Wait until scheduled send time
now = time.perf_counter()
sleep_time = next_send_time - now
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Send email and measure duration
send_start = time.perf_counter()
success, message = await self.send_email(session, mailbox_user)
send_duration = time.perf_counter() - send_start
# Schedule next send (compensate for actual request time)
next_send_time += interval_between_emails
# If we're falling behind, reset to current time to catch up
if next_send_time < time.perf_counter():
next_send_time = time.perf_counter()
# Thread-safe statistics update
async with stats["lock"]:
if success:
stats["sent"] += 1
elif "rate limited" in message.lower():
stats["rate_limited"] += 1
stats["rate_limit_times"].append(time.time() - stats["start_time"])
else:
stats["failed"] += 1
# Progress reporting every 10 emails for low volume
total_processed = stats["sent"] + stats["failed"] + stats["rate_limited"]
if total_processed % 10 == 0:
elapsed = time.time() - stats["start_time"]
current_rate = stats["sent"] / (elapsed / 60) if elapsed > 0 else 0
progress_msg = (f"Progress: {total_processed}/{total_emails} - "
f"Rate: {current_rate:.1f}/min - "
f"Sent: {stats['sent']}, Failed: {stats['failed']}, "
f"Rate Limited: {stats['rate_limited']}")
logger.info(progress_msg)
if progress_callback:
progress_callback(progress_msg, stats, elapsed, total_emails)
# Create concurrent tasks for all mailboxes
# Each mailbox sends independently in parallel using asyncio.gather
logger.info(f"Starting {len(self.mailbox_users)} concurrent sender tasks...")
tasks = [
send_from_mailbox(mailbox, idx)
for idx, mailbox in enumerate(self.mailbox_users)
]
# Run all mailbox tasks concurrently - this is mailbox-level parallelism
# Within each mailbox, the semaphore controls request-level concurrency
await asyncio.gather(*tasks)
# Calculate and log final statistics
total_time = time.time() - stats["start_time"]
actual_rate = stats["sent"] / (total_time / 60) if total_time > 0 else 0
logger.info("=" * 70)
logger.info("RATE LIMIT TEST RESULTS")
logger.info("=" * 70)
logger.info(f"Test Duration: {total_time:.1f} seconds ({total_time/60:.2f} minutes)")
logger.info(f"Emails Sent: {stats['sent']}")
logger.info(f"Emails Failed: {stats['failed']}")
logger.info(f"Rate Limited Count: {stats['rate_limited']}")
if stats['rate_limit_times']:
logger.info(f"First Rate Limit At: {stats['rate_limit_times'][0]:.1f} seconds")
logger.info(f"Target Rate Per Mailbox: {emails_per_minute} emails/minute")
logger.info(f"Total Target Rate: {emails_per_minute * len(self.mailbox_users)} emails/minute")
logger.info(f"Actual Aggregate Rate: {actual_rate:.1f} emails/minute")
logger.info(f"Sender Mailboxes Used: {len(self.mailbox_users)}")
logger.info(f"Recipient: {self.recipient_email}")
logger.info("=" * 70)
self.test_running = False
if progress_callback:
progress_callback("Test completed", stats, total_time, total_emails)
return stats
def stop_test(self):
"""Stop the running test"""
self.test_running = False
logger.info("Stopping test...")
class MailTesterGUI:
"""
Tkinter GUI for the Graph Mail API Rate Limit Tester.
Provides:
- Configuration management (save/load to JSON)
- Multi-mailbox support (up to 5 sender mailboxes)
- Rate preset buttons for quick testing (30, 100, 500, 1000/min)
- Real-time progress tracking and statistics
- API log viewer
"""
def __init__(self, root):
self.root = root
self.root.title("Microsoft Graph Mail API Rate Limit Tester")
self.root.geometry("900x700")
self.tester = None
self.test_thread = None
# Create main frame
main_frame = ttk.Frame(root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Configuration Section
config_frame = ttk.LabelFrame(main_frame, text="Configuration", padding="10")
config_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5)
# Tenant ID
ttk.Label(config_frame, text="Tenant ID:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.tenant_id_var = tk.StringVar(value="")
ttk.Entry(config_frame, textvariable=self.tenant_id_var, width=50).grid(row=0, column=1, pady=2, padx=5)
# Client ID
ttk.Label(config_frame, text="Client ID:").grid(row=1, column=0, sticky=tk.W, pady=2)
self.client_id_var = tk.StringVar(value="")
ttk.Entry(config_frame, textvariable=self.client_id_var, width=50).grid(row=1, column=1, pady=2, padx=5)
# Client Secret
ttk.Label(config_frame, text="Client Secret:").grid(row=2, column=0, sticky=tk.W, pady=2)
self.client_secret_var = tk.StringVar(value="")
secret_entry = ttk.Entry(config_frame, textvariable=self.client_secret_var, width=50, show="*")
secret_entry.grid(row=2, column=1, pady=2, padx=5)
# Recipient Email
ttk.Label(config_frame, text="Recipient Email:").grid(row=3, column=0, sticky=tk.W, pady=2)
self.recipient_var = tk.StringVar(value="")
ttk.Entry(config_frame, textvariable=self.recipient_var, width=50).grid(row=3, column=1, pady=2, padx=5)
# Sender Mailboxes
ttk.Label(config_frame, text="Sender Mailboxes:").grid(row=4, column=0, sticky=tk.W, pady=2)
ttk.Label(config_frame, text="(one per line, max 5)", font=('', 8, 'italic')).grid(row=5, column=0, sticky=tk.W)
self.sender_mailboxes_text = tk.Text(config_frame, height=5, width=38)
self.sender_mailboxes_text.grid(row=4, column=1, rowspan=2, pady=2, padx=5)
# Test Parameters Section
params_frame = ttk.LabelFrame(main_frame, text="Test Parameters", padding="10")
params_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5)
# Emails per minute
ttk.Label(params_frame, text="Target Emails/Minute (per mailbox):").grid(row=0, column=0, sticky=tk.W, pady=2)
self.emails_per_min_var = tk.StringVar(value="30")
ttk.Entry(params_frame, textvariable=self.emails_per_min_var, width=20).grid(row=0, column=1, pady=2, padx=5, sticky=tk.W)
# Rate presets
preset_frame = ttk.Frame(params_frame)
preset_frame.grid(row=0, column=2, padx=10, sticky=tk.W)
ttk.Label(preset_frame, text="Quick presets:", font=('', 8)).grid(row=0, column=0, padx=(0,5))
ttk.Button(preset_frame, text="30/min", command=lambda: self.emails_per_min_var.set("30"), width=8).grid(row=0, column=1, padx=2)
ttk.Button(preset_frame, text="100/min", command=lambda: self.emails_per_min_var.set("100"), width=8).grid(row=0, column=2, padx=2)
ttk.Button(preset_frame, text="500/min", command=lambda: self.emails_per_min_var.set("500"), width=8).grid(row=0, column=3, padx=2)
ttk.Button(preset_frame, text="1000/min", command=lambda: self.emails_per_min_var.set("1000"), width=8).grid(row=0, column=4, padx=2)
# Test duration
ttk.Label(params_frame, text="Test Duration (minutes):").grid(row=1, column=0, sticky=tk.W, pady=2)
self.duration_var = tk.StringVar(value="5")
ttk.Entry(params_frame, textvariable=self.duration_var, width=20).grid(row=1, column=1, pady=2, padx=5, sticky=tk.W)
# Info label
info_label = ttk.Label(params_frame,
text="Graph API limit: 10,000 requests/10min per mailbox = 1,000/min max",
font=('', 8, 'italic'),
foreground='blue')
info_label.grid(row=2, column=0, columnspan=3, sticky=tk.W, pady=(5,0))
# Control Buttons
button_frame = ttk.Frame(main_frame)
button_frame.grid(row=2, column=0, columnspan=2, pady=10)
self.start_button = ttk.Button(button_frame, text="Start Test", command=self.start_test)
self.start_button.grid(row=0, column=0, padx=5)
self.stop_button = ttk.Button(button_frame, text="Stop Test", command=self.stop_test, state=tk.DISABLED)
self.stop_button.grid(row=0, column=1, padx=5)
ttk.Button(button_frame, text="Save Config", command=self.save_config).grid(row=0, column=2, padx=5)
ttk.Button(button_frame, text="Load Config", command=self.load_config).grid(row=0, column=3, padx=5)
ttk.Button(button_frame, text="View API Log", command=self.open_api_log).grid(row=0, column=4, padx=5)
# Progress Section
progress_frame = ttk.LabelFrame(main_frame, text="Progress", padding="10")
progress_frame.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
self.progress_var = tk.StringVar(value="Ready to start test...")
ttk.Label(progress_frame, textvariable=self.progress_var, wraplength=800).grid(row=0, column=0, sticky=tk.W)
self.progress_bar = ttk.Progressbar(progress_frame, mode='determinate', length=800)
self.progress_bar.grid(row=1, column=0, pady=5)
# Stats display
stats_frame = ttk.Frame(progress_frame)
stats_frame.grid(row=2, column=0, pady=5)
ttk.Label(stats_frame, text="Sent:").grid(row=0, column=0, padx=5)
self.sent_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.sent_var, font=('', 10, 'bold')).grid(row=0, column=1, padx=5)
ttk.Label(stats_frame, text="Failed:").grid(row=0, column=2, padx=5)
self.failed_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.failed_var, font=('', 10, 'bold')).grid(row=0, column=3, padx=5)
ttk.Label(stats_frame, text="Rate Limited:").grid(row=0, column=4, padx=5)
self.rate_limited_var = tk.StringVar(value="0")
ttk.Label(stats_frame, textvariable=self.rate_limited_var, font=('', 10, 'bold')).grid(row=0, column=5, padx=5)
ttk.Label(stats_frame, text="Actual Rate:").grid(row=0, column=6, padx=5)
self.actual_rate_var = tk.StringVar(value="0.0/min")
ttk.Label(stats_frame, textvariable=self.actual_rate_var, font=('', 10, 'bold')).grid(row=0, column=7, padx=5)
# Log Section
log_frame = ttk.LabelFrame(main_frame, text="Log", padding="10")
log_frame.grid(row=4, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=5)
self.log_text = scrolledtext.ScrolledText(log_frame, height=15, width=100)
self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# Configure grid weights
main_frame.rowconfigure(4, weight=1)
main_frame.columnconfigure(0, weight=1)
log_frame.rowconfigure(0, weight=1)
log_frame.columnconfigure(0, weight=1)
# Redirect logging to GUI
self.setup_logging()
# Try to load config
self.load_config()
def open_api_log(self):
"""Open the API log file in the default text editor"""
log_file = "graph_api_log.txt"
if os.path.exists(log_file):
if os.name == 'nt': # Windows
os.startfile(log_file)
elif os.name == 'posix': # macOS/Linux
import subprocess
subprocess.call(('open' if sys.platform == 'darwin' else 'xdg-open', log_file))
logger.info(f"Opened API log file: {log_file}")
else:
messagebox.showinfo("No Log File", "No API log file found. Start a test to generate API logs.")
def setup_logging(self):
"""Redirect logging to GUI text widget"""
class TextHandler(logging.Handler):
def __init__(self, text_widget):
super().__init__()
self.text_widget = text_widget
def emit(self, record):
msg = self.format(record)
def append():
self.text_widget.insert(tk.END, msg + '\n')
self.text_widget.see(tk.END)
self.text_widget.after(0, append)
text_handler = TextHandler(self.log_text)
text_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(text_handler)
def save_config(self):
"""Save configuration to file"""
config = {
"tenant_id": self.tenant_id_var.get(),
"client_id": self.client_id_var.get(),
"client_secret": self.client_secret_var.get(),
"recipient_email": self.recipient_var.get(),
"sender_mailboxes": self.sender_mailboxes_text.get("1.0", tk.END).strip().split('\n'),
"emails_per_minute": int(self.emails_per_min_var.get()),
"test_duration_minutes": int(self.duration_var.get())
}
try:
with open("mail_tester_config.json", "w") as f:
json.dump(config, f, indent=2)
messagebox.showinfo("Success", "Configuration saved to mail_tester_config.json")
logger.info("Configuration saved successfully")
except Exception as e:
messagebox.showerror("Error", f"Failed to save configuration: {str(e)}")
logger.error(f"Failed to save configuration: {str(e)}")
def load_config(self):
"""Load configuration from file"""
try:
with open("mail_tester_config.json", "r") as f:
config = json.load(f)
self.tenant_id_var.set(config.get("tenant_id", ""))
self.client_id_var.set(config.get("client_id", ""))
self.client_secret_var.set(config.get("client_secret", ""))
self.recipient_var.set(config.get("recipient_email", ""))
sender_mailboxes = config.get("sender_mailboxes", [])
self.sender_mailboxes_text.delete("1.0", tk.END)
self.sender_mailboxes_text.insert("1.0", '\n'.join([mb for mb in sender_mailboxes if mb]))
self.emails_per_min_var.set(str(config.get("emails_per_minute", 30)))
self.duration_var.set(str(config.get("test_duration_minutes", 5)))
logger.info("Configuration loaded successfully")
except FileNotFoundError:
logger.info("No saved configuration found")
except Exception as e:
messagebox.showerror("Error", f"Failed to load configuration: {str(e)}")
logger.error(f"Failed to load configuration: {str(e)}")
def update_progress(self, message, stats, elapsed, total_emails):
"""Update progress bar and stats"""
def update():
self.progress_var.set(message)
self.sent_var.set(str(stats["sent"]))
self.failed_var.set(str(stats["failed"]))
self.rate_limited_var.set(str(stats["rate_limited"]))
actual_rate = stats["sent"] / (elapsed / 60) if elapsed > 0 else 0
self.actual_rate_var.set(f"{actual_rate:.1f}/min")
progress_percent = (stats["sent"] + stats["failed"] + stats["rate_limited"]) / total_emails * 100
self.progress_bar['value'] = progress_percent
self.root.after(0, update)
def start_test(self):
"""Start the rate limit test"""
# Validate inputs
if not all([self.tenant_id_var.get(), self.client_id_var.get(),
self.client_secret_var.get(), self.recipient_var.get()]):
messagebox.showerror("Error", "Please fill in all configuration fields")
return
sender_mailboxes = [mb.strip() for mb in self.sender_mailboxes_text.get("1.0", tk.END).strip().split('\n') if mb.strip()]
if not sender_mailboxes:
messagebox.showerror("Error", "Please provide at least one sender mailbox")
return
if len(sender_mailboxes) > 5:
messagebox.showerror("Error", "Maximum 5 sender mailboxes allowed")
return
try:
emails_per_min = int(self.emails_per_min_var.get())
duration = int(self.duration_var.get())
except ValueError:
messagebox.showerror("Error", "Invalid test parameters")
return
# Calculate and show aggregate target
aggregate_rate = emails_per_min * len(sender_mailboxes)
total_emails = aggregate_rate * duration
# Confirm with user
confirm_msg = (f"Starting test with {len(sender_mailboxes)} mailboxes:\n\n"
f"• Target rate per mailbox: {emails_per_min} emails/minute\n"
f"• Aggregate target rate: {aggregate_rate} emails/minute\n"
f"• Test duration: {duration} minutes\n"
f"• Total emails to send: {total_emails}\n\n"
f"Click OK to proceed.")
if not messagebox.askokcancel("Confirm Test", confirm_msg):
return
# Reset stats
self.sent_var.set("0")
self.failed_var.set("0")
self.rate_limited_var.set("0")
self.actual_rate_var.set("0.0/min")
self.progress_bar['value'] = 0
self.log_text.delete("1.0", tk.END)
# Log API log file location
api_log_path = os.path.abspath("graph_api_log.txt")
logger.info(f"API request/response log: {api_log_path}")
logger.info("Click 'View API Log' button to see detailed Graph API calls")
# Create tester
self.tester = GraphMailTester(
tenant_id=self.tenant_id_var.get(),
client_id=self.client_id_var.get(),
client_secret=self.client_secret_var.get(),
recipient_email=self.recipient_var.get(),
num_mailboxes=len(sender_mailboxes),
sender_mailboxes=sender_mailboxes
)
# Disable start button, enable stop button
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
# Run test in separate thread
def run_test():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
self.tester.run_rate_limit_test(
emails_per_minute=emails_per_min,
test_duration_minutes=duration,
progress_callback=self.update_progress
)
)
finally:
loop.close()
# Re-enable start button
self.root.after(0, lambda: self.start_button.config(state=tk.NORMAL))
self.root.after(0, lambda: self.stop_button.config(state=tk.DISABLED))
self.test_thread = threading.Thread(target=run_test, daemon=True)
self.test_thread.start()
def stop_test(self):
"""Stop the running test"""
if self.tester:
self.tester.stop_test()
self.stop_button.config(state=tk.DISABLED)
async def main():
# Configuration - Update these values
TENANT_ID = "your-tenant-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
RECIPIENT_EMAIL = "recipient@yourdomain.com"
# Sender mailboxes (up to 5)
SENDER_MAILBOXES = [
"sender1@yourdomain.com",
# "sender2@yourdomain.com",
# "sender3@yourdomain.com",
# "sender4@yourdomain.com",
# "sender5@yourdomain.com",
]
# Test parameters
EMAILS_PER_MINUTE = 30 # Expected Exchange Online limit
TEST_DURATION_MINUTES = 5
# Create and run the tester
tester = GraphMailTester(
tenant_id=TENANT_ID,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
recipient_email=RECIPIENT_EMAIL,
num_mailboxes=len(SENDER_MAILBOXES),
sender_mailboxes=SENDER_MAILBOXES
)
await tester.run_rate_limit_test(
emails_per_minute=EMAILS_PER_MINUTE,
test_duration_minutes=TEST_DURATION_MINUTES
)
if __name__ == "__main__":
# Launch GUI
root = tk.Tk()
app = MailTesterGUI(root)
root.mainloop()