⚠️ BREAKING CHANGE: SSL verification default changed fromFalsetoTruein v0.9.0. If using self-signed certificates, you must either add them to your system trust store or setverify_ssl=Falseexplicitly.
The Advanced Search module provides access to Darktrace's advanced search functionality for querying logs and events.
RESOLVED: POST requests to the Advanced Search API now work correctly! This was resolved in v0.8.3 by fixing JSON formatting inconsistencies in the authentication system.
- Darktrace 6.1+: POST requests are recommended for advanced queries
- Earlier versions: GET requests continue to work as before
- Both methods supported: You can choose between GET and POST based on your needs
from darktrace import DarktraceClient
client = DarktraceClient(
host="https://your-darktrace-instance",
public_token="YOUR_PUBLIC_TOKEN",
private_token="YOUR_PRIVATE_TOKEN"
)
# Access the advanced_search module
advanced_search = client.advanced_searchPerform advanced search queries on Darktrace logs and events.
# Basic search query structure
query = {
"search": "@type:\"ssl\" AND @fields.dest_port:\"443\"",
"fields": [],
"offset": 0,
"timeframe": "3600" # 1 hour in seconds
}
# GET request (traditional method, works with all Darktrace versions)
results = advanced_search.search(query)
# or explicitly
results = advanced_search.search(query, post_request=False)
# POST request (recommended for Darktrace 6.1+)
results = advanced_search.search(query, post_request=True)# Search for SSL connections with custom timeframe
ssl_query = {
"search": "@type:\"ssl\" AND @fields.dest_port:\"443\"",
"fields": ["@fields.source_ip", "@fields.dest_ip", "@fields.cipher"],
"offset": 0,
"timeframe": "7200" # 2 hours
}
results = advanced_search.search(ssl_query, post_request=True)
# Search with custom time range
custom_time_query = {
"search": "@type:\"conn\" AND @fields.proto:\"tcp\"",
"fields": [],
"offset": 0,
"timeframe": "custom",
"from": "2025-07-01T09:00:00",
"to": "2025-07-01T10:00:00"
}
results = advanced_search.search(custom_time_query, post_request=True)query(Dict[str, Any]): Dictionary containing the search query parameterspost_request(bool, optional): If True, attempts to use POST method. Defaults to False (GET method)
The query dictionary can include:
offset(int): Starting offset for resultscount(int): Number of results to returnquery(str): Search query stringtimeframe(str): Time frame for the search (e.g., "1 hour", "24 hours")
Analyze field data from search results.
# Analyze a specific field
query = {
"query": "*",
"timeframe": "1 hour"
}
analysis = advanced_search.analyze(
field="source_ip",
analysis_type="count",
query=query
)field(str): The field to analyzeanalysis_type(str): Type of analysis to perform (e.g., "count", "unique")query(Dict[str, Any]): Search query parameters
Get graph data for visualization.
# Get graph data
query = {
"query": "*",
"timeframe": "1 hour"
}
graph_data = advanced_search.graph(
graph_type="timeseries",
interval=300, # 5 minutes in seconds
query=query
)graph_type(str): Type of graph to generate (e.g., "timeseries")interval(int): Time interval in seconds for graph data pointsquery(Dict[str, Any]): Search query parameters
from darktrace import DarktraceClient
client = DarktraceClient(
host="https://your-darktrace-instance.com",
public_token="your_public_token",
private_token="your_private_token"
)
# Perform a basic search
query = {
"offset": 0,
"count": 50,
"query": "source_ip:192.168.1.100",
"timeframe": "2 hours"
}
try:
results = client.advanced_search.search(query)
print(f"Found {len(results.get('events', []))} events")
for event in results.get('events', [])[:5]:
print(f"Event: {event}")
except Exception as e:
print(f"Search failed: {e}")# Analyze source IPs
query = {
"query": "*",
"timeframe": "24 hours"
}
analysis = client.advanced_search.analyze(
field="source_ip",
analysis_type="count",
query=query
)
print(f"Top source IPs: {analysis}")# Generate time series graph data
graph_data = client.advanced_search.graph(
graph_type="timeseries",
interval=600, # 10-minute intervals
query=query
)
print(f"Graph data points: {len(graph_data.get('data', []))}")try:
results = client.advanced_search.search(query, post_request=True)
# Process the data
except requests.exceptions.HTTPError as e:
print(f"HTTP error occurred: {e}")
except Exception as e:
print(f"An error occurred: {e}")- All queries are automatically base64-encoded before being sent to the API
- POST requests now supported (v0.8.3+) for Darktrace 6.1+ installations
- GET requests continue to work for all Darktrace versions
- Time intervals for graphs are specified in seconds
- Query syntax follows Darktrace's advanced search format
Perform advanced search queries on Darktrace data.
# Basic search query
query = {
"search": "@type:\"conn\" AND @fields.proto:\"tcp\"",
"fields": [],
"offset": 0,
"timeframe": "3600", # 1 hour in seconds
"time": {"user_interval": 0}
}
# Execute search (GET request - traditional method)
results = advanced_search.search(query)
# Execute search (POST request - recommended for Darktrace 6.1+)
results = advanced_search.search(query, post_request=True)query(dict): Search query dictionary containing:search(str): The search query stringfields(list): Fields to include in responseoffset(int): Starting offset for paginationtimeframe(str): Time range in secondstime(dict): Time configuration
post_request(bool, optional): If True, uses POST method (supported in v0.8.3+)
{
"hits": {
"total": 1234,
"hits": [
{
"_source": {
"@timestamp": "2023-01-01T12:00:00.000Z",
"@type": "conn",
"@fields": {
"proto": "tcp",
"dest_port": 443
}
}
}
]
}
}Analyze field data using aggregations.
# Analyze destination ports for DNS traffic
analyze_query = {
"search": "@type:\"dns\"",
"fields": [],
"offset": 0,
"timeframe": "3600",
"time": {"user_interval": 0}
}
results = advanced_search.analyze("@fields.dest_port", "terms", analyze_query)Generate graph data over time intervals.
# Get connection counts over time with 5-minute intervals
graph_query = {
"search": "@type:\"conn\"",
"fields": [],
"offset": 0,
"timeframe": "14400", # 4 hours
"time": {"user_interval": 0}
}
results = advanced_search.graph("count", 300000, graph_query) # 5-minute intervals# Search for HTTPS connections in the last hour
query = {
"search": "@type:\"conn\" AND @fields.dest_port:\"443\"",
"fields": [],
"offset": 0,
"timeframe": "3600",
"time": {"user_interval": 0}
}
results = client.advanced_search.search(query)
print(f"Found {results['hits']['total']} HTTPS connections")# Analyze top destination ports
query = {
"search": "@type:\"conn\"",
"fields": [],
"offset": 0,
"timeframe": "3600",
"time": {"user_interval": 0}
}
analysis = client.advanced_search.analyze("@fields.dest_port", "terms", query)
for bucket in analysis['aggregations']['terms']['buckets'][:5]:
print(f"Port {bucket['key']}: {bucket['doc_count']} connections")try:
results = client.advanced_search.search(query, post_request=True)
# Process the data
except requests.exceptions.HTTPError as e:
print(f"HTTP error occurred: {e}")
except Exception as e:
print(f"An error occurred: {e}")