|
| 1 | +# Copyright © 2018 Clarity Movement Co. All rights reserved. |
| 2 | +from pymongo import monitoring |
| 3 | +from aws_xray_sdk.core import xray_recorder |
| 4 | + |
| 5 | + |
| 6 | +class XrayCommandListener(monitoring.CommandListener): |
| 7 | + """ |
| 8 | + A listener that traces all pymongo db commands to AWS Xray. |
| 9 | + Creates a subsegment for each mongo db conmmand. |
| 10 | +
|
| 11 | + name: 'mydb@127.0.0.1:27017' |
| 12 | + records all available information provided by pymongo, |
| 13 | + except for `command` and `reply`. They may contain business secrets. |
| 14 | + If you insist to record them, specify `record_full_documents=True`. |
| 15 | + """ |
| 16 | + |
| 17 | + def __init__(self, record_full_documents): |
| 18 | + super(XrayCommandListener, self).__init__() |
| 19 | + self.record_full_documents = record_full_documents |
| 20 | + |
| 21 | + def started(self, event): |
| 22 | + host, port = event.connection_id |
| 23 | + host_and_port_str = f'{host}:{port}' |
| 24 | + |
| 25 | + subsegment = xray_recorder.begin_subsegment( |
| 26 | + f'{event.database_name}@{host_and_port_str}', 'remote') |
| 27 | + subsegment.put_annotation('mongodb_command_name', event.command_name) |
| 28 | + subsegment.put_annotation('mongodb_connection_id', host_and_port_str) |
| 29 | + subsegment.put_annotation('mongodb_database_name', event.database_name) |
| 30 | + subsegment.put_annotation('mongodb_operation_id', event.operation_id) |
| 31 | + subsegment.put_annotation('mongodb_request_id', event.request_id) |
| 32 | + if self.record_full_documents: |
| 33 | + subsegment.put_metadata('mongodb_command', event.command) |
| 34 | + |
| 35 | + def succeeded(self, event): |
| 36 | + subsegment = xray_recorder.current_subsegment() |
| 37 | + subsegment.put_annotation('mongodb_duration_micros', event.duration_micros) |
| 38 | + if self.record_full_documents: |
| 39 | + subsegment.put_metadata('mongodb_reply', event.reply) |
| 40 | + xray_recorder.end_subsegment() |
| 41 | + |
| 42 | + def failed(self, event): |
| 43 | + subsegment = xray_recorder.current_subsegment() |
| 44 | + subsegment.add_fault_flag() |
| 45 | + subsegment.put_annotation('mongodb_duration_micros', event.duration_micros) |
| 46 | + subsegment.put_metadata('failure', event.failure) |
| 47 | + xray_recorder.end_subsegment() |
| 48 | + |
| 49 | + |
| 50 | +def patch(record_full_documents=False): |
| 51 | + # ensure `patch()` is idempotent |
| 52 | + if hasattr(monitoring, '_xray_enabled'): |
| 53 | + return |
| 54 | + setattr(monitoring, '_xray_enabled', True) |
| 55 | + monitoring.register(XrayCommandListener(record_full_documents)) |
0 commit comments