-
Notifications
You must be signed in to change notification settings - Fork 571
Websockets #2865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
doomedraven
wants to merge
13
commits into
staging
Choose a base branch
from
websockets
base: staging
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Websockets #2865
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
b0a714e
Add real-time task status and log updates via WebSockets
doomedraven dfa47e9
Update changelog and database config for async and psycopg3
doomedraven 176a5ee
Switch to psycopg3 and add websockets/redis support
doomedraven 9aed109
Apply suggestion from @gemini-code-assist[bot]
doomedraven 9acb064
Apply suggestion from @gemini-code-assist[bot]
doomedraven 514d216
Ensure libvirt connection is properly closed in index view
doomedraven 37f1eb1
Update consumers.py
doomedraven 1bce791
Update mongo_provider.py
doomedraven e239bcd
sync
doomedraven 5f1b5a8
Update mongo_async.py
doomedraven 61364b5
Update views.py
doomedraven 4b23ea5
Update views.py
doomedraven eaf3ef1
Merge branch 'staging' into websockets
doomedraven File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,259 @@ | ||
| import logging | ||
| import itertools | ||
| from typing import List, Optional, Sequence | ||
|
|
||
| try: | ||
| # Native PyMongo Async support (Requires pymongo >= 4.9) | ||
| from pymongo.asynchronous import AsyncMongoClient | ||
| HAVE_PYMONGO_ASYNC = True | ||
| except ImportError: | ||
| HAVE_PYMONGO_ASYNC = False | ||
|
|
||
| from lib.cuckoo.common.config import Config | ||
|
|
||
| log = logging.getLogger(__name__) | ||
| repconf = Config("reporting") | ||
| mdb_name = repconf.mongodb.get("db", "cuckoo") | ||
|
|
||
| # Async Client Singleton | ||
| _async_client = None | ||
| _async_db = None | ||
|
|
||
| def get_async_db(): | ||
| """Returns the native async PyMongo database instance, initializing it if necessary.""" | ||
| global _async_client, _async_db | ||
|
|
||
| if not HAVE_PYMONGO_ASYNC: | ||
| raise ImportError("Native PyMongo Async API is not available. Please upgrade pymongo: pip install 'pymongo>=4.9'") | ||
|
|
||
| if _async_db is None: | ||
| try: | ||
| _async_client = AsyncMongoClient( | ||
| host=repconf.mongodb.get("host", "127.0.0.1"), | ||
| port=repconf.mongodb.get("port", 27017), | ||
| username=repconf.mongodb.get("username"), | ||
| password=repconf.mongodb.get("password"), | ||
| authSource=repconf.mongodb.get("authsource", "cuckoo"), | ||
| tlsCAFile=repconf.mongodb.get("tlscafile", None), | ||
| serverSelectionTimeoutMS=5000, | ||
| ) | ||
| _async_db = _async_client[mdb_name] | ||
| log.info("Native Async MongoDB connection initialized.") | ||
| except Exception as e: | ||
| log.error("Failed to initialize Native Async MongoDB connection: %s", e) | ||
| raise | ||
|
|
||
| return _async_db | ||
|
|
||
| async def mongo_find_one_async(collection: str, query: dict, projection: dict = None, sort: list = None) -> Optional[dict]: | ||
| """Async wrapper for find_one.""" | ||
| db = get_async_db() | ||
| if sort is None: | ||
| sort = [("_id", -1)] | ||
| try: | ||
| return await db[collection].find_one(query, projection, sort=sort) | ||
| except Exception as e: | ||
| log.error("Error in mongo_find_one_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_find_async(collection: str, query: dict, projection: dict = None, sort: list = None, limit: int = 0) -> List[dict]: | ||
| """Async wrapper for find (returns a list).""" | ||
| db = get_async_db() | ||
| if sort is None: | ||
| sort = [("_id", -1)] | ||
| try: | ||
| cursor = db[collection].find(query, projection, sort=sort) | ||
| if limit > 0: | ||
| cursor.limit(limit) | ||
| return await cursor.to_list(length=limit if limit else None) | ||
| except Exception as e: | ||
| log.error("Error in mongo_find_async: %s", e) | ||
| return [] | ||
|
|
||
| async def mongo_count_async(collection: str, query: dict) -> int: | ||
| """Async wrapper for count_documents.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db[collection].count_documents(query) | ||
| except Exception as e: | ||
| log.error("Error in mongo_count_async: %s", e) | ||
| return 0 | ||
|
|
||
| async def mongo_insert_one_async(collection: str, doc: dict): | ||
| """Async wrapper for insert_one.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db[collection].insert_one(doc) | ||
| except Exception as e: | ||
| log.error("Error in mongo_insert_one_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_bulk_write_async(collection: str, requests: list, **kwargs): | ||
| """Async wrapper for bulk_write.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db[collection].bulk_write(requests, **kwargs) | ||
| except Exception as e: | ||
| log.error("Error in mongo_bulk_write_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_create_index_async(collection: str, index, background: bool = True, name: str = None): | ||
| """Async wrapper for create_index.""" | ||
| db = get_async_db() | ||
| try: | ||
| if name: | ||
| await db[collection].create_index(index, background=background, name=name) | ||
| else: | ||
| await db[collection].create_index(index, background=background) | ||
| except Exception as e: | ||
| log.error("Error in mongo_create_index_async: %s", e) | ||
|
|
||
| async def mongo_delete_one_async(collection: str, query: dict): | ||
| """Async wrapper for delete_one.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db[collection].delete_one(query) | ||
| except Exception as e: | ||
| log.error("Error in mongo_delete_one_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_delete_many_async(collection: str, query: dict): | ||
| """Async wrapper for delete_many.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db[collection].delete_many(query) | ||
| except Exception as e: | ||
| log.error("Error in mongo_delete_many_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_update_many_async(collection: str, query: dict, update: dict): | ||
| """Async wrapper for update_many.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db[collection].update_many(query, update) | ||
| except Exception as e: | ||
| log.error("Error in mongo_update_many_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_update_one_async(collection: str, query: dict, update: dict, upsert: bool = False, bypass_document_validation: bool = False): | ||
| """Async wrapper for update_one.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db[collection].update_one(query, update, upsert=upsert, bypass_document_validation=bypass_document_validation) | ||
| except Exception as e: | ||
| log.error("Error in mongo_update_one_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_aggregate_async(collection: str, pipeline: list): | ||
| """Async wrapper for aggregate.""" | ||
| db = get_async_db() | ||
| try: | ||
| cursor = db[collection].aggregate(pipeline) | ||
| return await cursor.to_list(length=None) | ||
| except Exception as e: | ||
| log.error("Error in mongo_aggregate_async: %s", e) | ||
| return [] | ||
|
|
||
| async def mongo_collection_names_async() -> list: | ||
| """Async wrapper for list_collection_names.""" | ||
| db = get_async_db() | ||
| try: | ||
| return await db.list_collection_names() | ||
| except Exception as e: | ||
| log.error("Error in mongo_collection_names_async: %s", e) | ||
| return [] | ||
|
|
||
| async def mongo_find_one_and_update_async(collection: str, query: dict, update: dict, projection: dict = None): | ||
| """Async wrapper for find_one_and_update.""" | ||
| db = get_async_db() | ||
| if projection is None: | ||
| projection = {"_id": 1} | ||
| try: | ||
| return await db[collection].find_one_and_update(query, update, projection=projection) | ||
| except Exception as e: | ||
| log.error("Error in mongo_find_one_and_update_async: %s", e) | ||
| return None | ||
|
|
||
| async def mongo_drop_database_async(database: str): | ||
| """Async wrapper for drop_database.""" | ||
| # This requires access to the client, not just the default db | ||
| # We expose the client via a helper or access the global if needed, | ||
| # but strictly speaking drop_database is a method on the Client, not the Database object usually. | ||
| # However, pymongo Client.drop_database(name) exists. | ||
| # Our get_async_db() returns a Database object. We need the client. | ||
|
|
||
| global _async_client | ||
| if _async_client is None: | ||
| get_async_db() # ensure init | ||
|
|
||
| try: | ||
| if _async_client: | ||
| await _async_client.drop_database(database) | ||
| except Exception as e: | ||
| log.error("Error in mongo_drop_database_async: %s", e) | ||
|
|
||
| # Complex helpers (ported from mongodb.py logic) | ||
|
|
||
| async def mongo_delete_calls_async(task_ids: Sequence[int] | None) -> None: | ||
| """Async version of mongo_delete_calls.""" | ||
| log.info("attempting to delete calls for %d tasks (async)", len(task_ids) if task_ids else 0) | ||
|
|
||
| query = {"info.id": {"$in": list(task_ids)}} | ||
| projection = {"behavior.processes.calls": 1} | ||
| tasks = await mongo_find_async("analysis", query, projection=projection) | ||
|
|
||
| if not tasks: | ||
| return | ||
|
|
||
| delete_target_ids = [] | ||
|
|
||
| def get_call_ids_from_task(task: dict) -> list: | ||
| processes = task.get("behavior", {}).get("processes", []) | ||
| calls = [proc.get("calls", []) for proc in processes] | ||
| return list(itertools.chain.from_iterable(calls)) | ||
|
|
||
| for task in tasks: | ||
| delete_target_ids.extend(get_call_ids_from_task(task)) | ||
|
|
||
| delete_target_ids = list(set(delete_target_ids)) | ||
| chunk_size = 1000 | ||
| for idx in range(0, len(delete_target_ids), chunk_size): | ||
| await mongo_delete_many_async("calls", {"_id": {"$in": delete_target_ids[idx : idx + chunk_size]}}) | ||
|
|
||
| async def mongo_delete_data_async(task_ids: int | Sequence[int]) -> None: | ||
| """Async version of mongo_delete_data.""" | ||
| try: | ||
| if isinstance(task_ids, int): | ||
| task_ids = [task_ids] | ||
|
|
||
| if task_ids: | ||
| await mongo_delete_calls_async(task_ids=task_ids) | ||
| await mongo_delete_many_async("analysis", {"info.id": {"$in": list(task_ids)}}) | ||
| # Hooks are skipped for async simplicity for now, or can be added if needed | ||
| except Exception as e: | ||
| log.exception(e) | ||
|
|
||
| async def mongo_delete_calls_by_task_id_in_range_async(*, range_start: int = 0, range_end: int = 0) -> None: | ||
| """Async version of mongo_delete_calls_by_task_id_in_range.""" | ||
| task_id_query = {} | ||
| if range_start > 0: | ||
| task_id_query["$gte"] = range_start | ||
| if range_end > 0: | ||
| task_id_query["$lt"] = range_end | ||
| if task_id_query: | ||
| await mongo_delete_many_async("calls", {"task_id": task_id_query}) | ||
|
|
||
| async def mongo_delete_data_range_async(*, range_start: int = 0, range_end: int = 0) -> None: | ||
| """Async version of mongo_delete_data_range.""" | ||
| INFO_ID = "info.id" | ||
| try: | ||
| info_id_query = {} | ||
| if range_start > 0: | ||
| info_id_query["$gte"] = range_start | ||
| if range_end > 0: | ||
| info_id_query["$lt"] = range_end | ||
| if info_id_query: | ||
| await mongo_delete_calls_by_task_id_in_range_async(range_start=range_start, range_end=range_end) | ||
| await mongo_delete_many_async("analysis", {INFO_ID: info_id_query}) | ||
| except Exception as e: | ||
| log.exception(e) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| import logging | ||
| from asgiref.sync import sync_to_async | ||
| from django.conf import settings | ||
|
|
||
| # Default configuration | ||
| USE_ASYNC_MONGO = getattr(settings, "USE_ASYNC_MONGO", False) | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| # Fallback: Sync Implementation (wrapped) | ||
| from dev_utils import mongodb as sync_mongo | ||
|
|
||
| # Try Async Implementation | ||
| try: | ||
| from dev_utils import mongo_async as async_mongo | ||
| HAVE_ASYNC_IMPL = async_mongo.HAVE_PYMONGO_ASYNC | ||
| except ImportError: | ||
| HAVE_ASYNC_IMPL = False | ||
|
|
||
| if USE_ASYNC_MONGO and HAVE_ASYNC_IMPL: | ||
| log.info("Using Native Async MongoDB Provider") | ||
|
|
||
| # Direct alias to async functions | ||
| mongo_find_one = async_mongo.mongo_find_one_async | ||
| mongo_find = async_mongo.mongo_find_async | ||
| mongo_insert_one = async_mongo.mongo_insert_one_async | ||
| mongo_update_one = async_mongo.mongo_update_one_async | ||
| mongo_update_many = async_mongo.mongo_update_many_async | ||
| mongo_delete_one = async_mongo.mongo_delete_one_async | ||
| mongo_delete_many = async_mongo.mongo_delete_many_async | ||
| mongo_aggregate = async_mongo.mongo_aggregate_async | ||
| mongo_count = async_mongo.mongo_count_async | ||
| mongo_find_one_and_update = async_mongo.mongo_find_one_and_update_async | ||
| mongo_bulk_write = async_mongo.mongo_bulk_write_async | ||
| mongo_create_index = async_mongo.mongo_create_index_async | ||
|
|
||
| # Complex helpers | ||
| mongo_delete_data = async_mongo.mongo_delete_data_async | ||
| mongo_delete_data_range = async_mongo.mongo_delete_data_range_async | ||
|
|
||
| else: | ||
| if USE_ASYNC_MONGO: | ||
| log.warning("Async MongoDB requested but dependencies missing. Falling back to Sync wrapper.") | ||
| else: | ||
| log.info("Using Sync-to-Async MongoDB Wrapper") | ||
|
|
||
| # Wrap synchronous functions to make them awaitable | ||
| mongo_find_one = sync_to_async(sync_mongo.mongo_find_one) | ||
| mongo_find = sync_to_async(sync_mongo.mongo_find) | ||
| mongo_insert_one = sync_to_async(sync_mongo.mongo_insert_one) | ||
| mongo_update_one = sync_to_async(sync_mongo.mongo_update_one) | ||
| mongo_update_many = sync_to_async(sync_mongo.mongo_update_many) | ||
| mongo_delete_one = sync_to_async(sync_mongo.mongo_delete_one) | ||
| mongo_delete_many = sync_to_async(sync_mongo.mongo_delete_many) | ||
| mongo_aggregate = sync_to_async(sync_mongo.mongo_aggregate) | ||
|
|
||
| # Helper for count (mongodb.py doesn't have a direct count wrapper usually, | ||
| # but we can wrap a lambda or access the db directly if needed. | ||
| # mongodb.py usually returns the result directly from find if it was a cursor but it returns list. | ||
| # Let's check how count is usually done. | ||
| # Usually: results_db.collection.count_documents(query) | ||
| # We will wrap a custom lambda for count since mongodb.py might not export it explicitly. | ||
| def _sync_count(collection, query): | ||
| return getattr(sync_mongo.results_db, collection).count_documents(query) | ||
|
|
||
| mongo_count = sync_to_async(_sync_count) | ||
|
|
||
| mongo_find_one_and_update = sync_to_async(sync_mongo.mongo_find_one_and_update) | ||
| mongo_bulk_write = sync_to_async(sync_mongo.mongo_bulk_write) | ||
| mongo_create_index = sync_to_async(sync_mongo.mongo_create_index) | ||
|
|
||
| # Complex helpers | ||
| mongo_delete_data = sync_to_async(sync_mongo.mongo_delete_data) | ||
| mongo_delete_data_range = sync_to_async(sync_mongo.mongo_delete_data_range) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a broad
except Exception as e:can hide underlying issues and make debugging more difficult. It's better to catch more specific exceptions that you expectmotorto raise, such aspymongo.errors.ConnectionFailure. This will make the error handling more robust and debugging easier.