| name | update-codeql-query-dataflow-python |
|---|---|
| description | Update CodeQL queries for Python from legacy v1 dataflow API to modern v2 shared dataflow API. Use this skill when migrating Python queries to use DataFlow::ConfigSig modules, ensuring query results remain equivalent through TDD. |
This skill guides you through migrating Python CodeQL queries from the legacy v1 (language-specific) dataflow API to the modern v2 (shared) dataflow API while ensuring query results remain equivalent.
- Migrating Python queries using deprecated
DataFlow::ConfigurationorTaintTracking::Configurationclasses - Updating queries to use
DataFlow::ConfigSigmodules - Modernizing Python queries to use the shared dataflow library
- Ensuring query result equivalence during dataflow API migration
- Existing Python CodeQL query using v1 dataflow API that you want to migrate
- Existing unit tests for the query
- Understanding of the query's detection purpose
- Access to CodeQL Development MCP Server tools
v1 (Legacy):
class MyConfig extends TaintTracking::Configuration {
MyConfig() { this = "MyConfig" }
override predicate isSource(DataFlow::Node source) { ... }
override predicate isSink(DataFlow::Node sink) { ... }
override predicate isSanitizer(DataFlow::Node node) { ... }
override predicate isAdditionalTaintStep(DataFlow::Node n1, DataFlow::Node n2) { ... }
}v2 (Modern):
module MyConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) { ... }
predicate isSink(DataFlow::Node sink) { ... }
predicate isBarrier(DataFlow::Node node) { ... }
predicate isAdditionalFlowStep(DataFlow::Node n1, DataFlow::Node n2) { ... }
}
module MyFlow = TaintTracking::Global<MyConfig>;| v1 API | v2 API | Purpose |
|---|---|---|
DataFlow::Configuration |
DataFlow::ConfigSig |
Configuration signature |
isSanitizer |
isBarrier |
Stop data flow propagation |
isAdditionalTaintStep |
isAdditionalFlowStep |
Custom flow steps |
this.hasFlow(source, sink) |
MyFlow::flow(source, sink) |
Query flow paths |
Python dataflow uses multiple node representations:
ExprNode: AST expression nodes (function calls, attribute access)CfgNode: Control-flow graph nodes (more precise than AST)CallCfgNode: CFG nodes representing function/method callsParameterNode: Function parameter nodesLocalSourceNode: API graph modeling for tracking method chains
Critical: Before any code changes, capture current query behavior.
Use codeql_test_run to establish baseline:
{
"testPath": "<query-pack>/test/{QueryName}",
"searchPath": ["<query-pack>"]
}Save the output - this is your reference for query result equivalence.
Create a reference file with current results:
cp <query-pack>/test/{QueryName}/{QueryName}.expected \
<query-pack>/test/{QueryName}/{QueryName}.expected.v1-baselineThis ensures you can verify equivalence after migration.
Review the query for v1 API usage:
class X extends DataFlow::Configurationorclass X extends TaintTracking::ConfigurationisSanitizerpredicatesisAdditionalTaintSteppredicatesthis.hasFlow(source, sink)queries
Identify how the query uses Python dataflow constructs:
- RemoteFlowSource: Predefined sources for HTTP requests, user input
- CFG vs AST nodes:
getCfgNode(),asExpr()conversions - API graphs:
semmle.python.ApiGraphsfor library usage tracking - Python sources: Django/Flask requests,
sys.argv,input(), file operations - Python sinks:
eval(),exec(),subprocesscalls, SQL operations
Before:
class CommandInjectionConfig extends TaintTracking::Configuration {
CommandInjectionConfig() { this = "CommandInjectionConfig" }
override predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource
}
override predicate isSink(DataFlow::Node sink) {
exists(DataFlow::CallCfgNode call |
call.getFunction().(DataFlow::AttrRead).getAttributeName() in ["system", "popen"] and
call.getFunction().(DataFlow::AttrRead).getObject().asCfgNode().(NameNode).getId() = "os" and
sink = call.getArg(0)
)
}
override predicate isSanitizer(DataFlow::Node node) {
node = any(SanitizationCall c).getResult()
}
}
from CommandInjectionConfig cfg, DataFlow::PathNode source, DataFlow::PathNode sink
where cfg.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Command injection from $@", source.getNode(), "user input"After:
module CommandInjectionConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource
}
predicate isSink(DataFlow::Node sink) {
exists(DataFlow::CallCfgNode call |
call.getFunction().(DataFlow::AttrRead).getAttributeName() in ["system", "popen"] and
call.getFunction().(DataFlow::AttrRead).getObject().asCfgNode().(NameNode).getId() = "os" and
sink = call.getArg(0)
)
}
predicate isBarrier(DataFlow::Node node) {
node = any(SanitizationCall c).getResult()
}
}
module CommandInjectionFlow = TaintTracking::Global<CommandInjectionConfig>;
from CommandInjectionFlow::PathNode source, CommandInjectionFlow::PathNode sink
where CommandInjectionFlow::flowPath(source, sink)
select sink.getNode(), source, sink, "Command injection from $@", source.getNode(), "user input"isSanitizer→isBarrier: Change method name only, logic unchangedisAdditionalTaintStep→isAdditionalFlowStep: Change method name only
Replace cfg.hasFlow(source, sink) with MyFlow::flow(source, sink):
- Remove configuration variable from
fromclause - Use module flow predicate directly
- For path queries, use
MyFlow::PathNodeandMyFlow::flowPath(source, sink)
Ensure proper node type handling with Python's multiple dataflow representations:
// v1 and v2 both support these conversions
DataFlow::Node n;
Expr e = n.asExpr(); // AST expression
CfgNode cfg = n.asCfgNode(); // CFG node
ControlFlowNode cfn = n.getCfgNode(); // Another way to get CFG nodeImportant: Python has multiple dataflow nodes per expression due to CFG splitting. The v2 API handles this identically to v1.
RemoteFlowSource works identically in v1 and v2:
predicate isSource(DataFlow::Node source) {
source instanceof RemoteFlowSource or
// Django request parameters
exists(DataFlow::AttrRead attr |
attr.getAttributeName() in ["GET", "POST", "FILES"] and
attr.getObject().(DataFlow::ParameterNode).getParameter().getName() = "request" and
source = attr
) or
// Flask request access
exists(DataFlow::ModuleVariableNode request |
request.getName() = "request" and
exists(DataFlow::AttrRead attr |
attr.getObject() = request and
attr.getAttributeName() in ["args", "form", "json", "files"] and
source = attr
)
)
}For tracking library usage patterns with API graphs:
predicate isAdditionalFlowStep(DataFlow::Node n1, DataFlow::Node n2) {
// Track method chaining through API graphs
exists(API::CallNode call |
n1 = call.getArg(0) and
n2 = call.getReturn()
) or
// Track attribute reads
exists(DataFlow::AttrRead attr |
n1 = attr.getObject() and
n2 = attr
)
}Track flows through Django-specific constructs:
predicate isAdditionalFlowStep(DataFlow::Node n1, DataFlow::Node n2) {
// Django QuerySet methods
exists(DataFlow::CallCfgNode call |
call.getFunction().(DataFlow::AttrRead).getAttributeName() in ["raw", "extra"] and
n1 = call.getArg(_) and
n2 = call
) or
// Django template rendering
exists(DataFlow::CallCfgNode render |
render.getFunction().(DataFlow::AttrRead).getAttributeName() in ["render_template_string", "Template"] and
n1 = render.getArg(_) and
n2 = render
)
}Use codeql_query_compile to check for errors:
{
"queryPath": "<query-pack>/src/{QueryName}/{QueryName}.ql",
"searchPath": ["<query-pack>"]
}Fix any compilation errors before testing.
Use codeql_test_run on migrated query:
{
"testPath": "<query-pack>/test/{QueryName}",
"searchPath": ["<query-pack>"]
}Critical: Results MUST match baseline from Phase 1.
Compare results line-by-line:
diff <query-pack>/test/{QueryName}/{QueryName}.expected.v1-baseline \
<query-pack>/test/{QueryName}/{QueryName}.expectedSuccess: Empty diff (identical results) Failure: Any differences require investigation and fixes
If baseline tests pass, add more test cases to ensure robustness:
Create additional test files covering:
- Django-specific patterns (ORM, templates, middleware)
- Flask route handlers and request processing
- FastAPI dependency injection and async operations
- Dynamic code execution (
eval(),exec(),compile()) - Attribute access patterns (
getattr,setattr) - Data science libraries (pandas, numpy operations with user input)
For each new test:
- Add test code to
test2.py,test3.py, etc. - Update
.expectedfile with anticipated results - Re-extract test database with
codeql_test_extract - Run tests to verify
Run query on realistic database and monitor performance:
{
"query": "<query-pack>/src/{QueryName}/{QueryName}.ql",
"database": "<path-to-realistic-python-database>",
"searchPath": ["<query-pack>"]
}If performance degrades significantly, consider:
- Caching expensive predicates with
cached - Using local flow instead of global flow where possible
- Limiting scope with additional constraints
- Leveraging API graphs more efficiently
Ensure query metadata reflects v2 API usage:
/**
* @name Command Injection via Untrusted Data
* @description Executes system commands with user-controllable data
* @kind path-problem
* @id py/command-injection
* @tags security external/cwe/cwe-078
* @precision high
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import DataFlow::PathGraph- Remove v1 baseline files after verification
- Add migration notes in query comments if helpful
- Format query with
codeql_query_format
predicate isSource(DataFlow::Node source) {
// Django request object attributes
exists(DataFlow::AttrRead attr |
attr.getObject().(DataFlow::ParameterNode).getParameter().getName() = "request" and
attr.getAttributeName() in ["GET", "POST", "FILES", "META", "COOKIES"] and
source = attr
) or
// Django form fields
exists(DataFlow::CallCfgNode call |
call.getFunction().(DataFlow::AttrRead).getAttributeName() = "cleaned_data" and
source = call.getArg(_)
)
}predicate isSource(DataFlow::Node source) {
// Flask request object
exists(API::Node request |
request = API::moduleImport("flask").getMember("request") and
source = request.getMember(["args", "form", "json", "files", "headers"]).getAUse()
) or
// FastAPI path/query parameters via dependency injection
exists(DataFlow::ParameterNode param |
(param.getParameter().getAnnotation().toString().matches("%Path%") or
param.getParameter().getAnnotation().toString().matches("%Query%")) and
source = param
)
}predicate isSink(DataFlow::Node sink) {
// eval, exec, compile functions
exists(DataFlow::CallCfgNode call |
call.getFunction().asCfgNode().(NameNode).getId() in ["eval", "exec", "compile"] and
sink = call.getArg(0)
) or
// subprocess operations
exists(API::CallNode call |
call = API::moduleImport("subprocess").getMember(["run", "call", "Popen", "check_output"]).getACall() and
sink = call.getArg(0)
) or
// os.system and os.popen
exists(API::CallNode call |
call = API::moduleImport("os").getMember(["system", "popen"]).getACall() and
sink = call.getArg(0)
)
}predicate isSink(DataFlow::Node sink) {
// Django raw SQL
exists(DataFlow::CallCfgNode call |
call.getFunction().(DataFlow::AttrRead).getAttributeName() in ["raw", "extra", "execute"] and
sink = call.getArg(0)
) or
// SQLAlchemy text() function
exists(API::CallNode call |
call = API::moduleImport("sqlalchemy").getMember("text").getACall() and
sink = call.getArg(0)
) or
// sqlite3 execute
exists(API::CallNode call |
call = API::moduleImport("sqlite3").getMember("Connection").getMember("execute").getACall() and
sink = call.getArg(0)
)
}predicate isSink(DataFlow::Node sink) {
// Jinja2 Template constructor with string
exists(API::CallNode call |
call = API::moduleImport("jinja2").getMember("Template").getACall() and
sink = call.getArg(0)
) or
// Django render_template_string
exists(DataFlow::CallCfgNode call |
call.getFunction().asCfgNode().(NameNode).getId() = "render_template_string" and
sink = call.getArg(0)
) or
// Flask render_template_string
exists(API::CallNode call |
call = API::moduleImport("flask").getMember("render_template_string").getACall() and
sink = call.getArg(0)
)
}predicate isSink(DataFlow::Node sink) {
// pickle.loads with untrusted data
exists(API::CallNode call |
call = API::moduleImport("pickle").getMember(["loads", "load", "Unpickler"]).getACall() and
sink = call.getArg(0)
)
}codeql_test_run: Run tests and compare with expected resultscodeql_test_extract: Extract test databases from Python source codecodeql_query_compile: Compile queries and check for errorscodeql_query_run: Run queries for analysiscodeql_bqrs_decode: Decode binary query resultscodeql_query_format: Format query files for consistencycodeql_pack_install: Install query pack dependencies
❌ Don't:
- Skip baseline test establishment before migration
- Change query logic alongside API migration (separate concerns)
- Accept test results without verifying equivalence
- Remove v1 baseline until migration is confirmed successful
- Ignore performance regressions
- Forget to update imports (
import DataFlow::PathGraph) - Confuse CFG nodes and AST nodes in Python
✅ Do:
- Establish test baseline BEFORE any changes
- Make purely mechanical API changes first
- Verify exact result equivalence after migration
- Keep v1 baseline for comparison during migration
- Test edge cases specific to Python (dynamic typing, frameworks, CFG splitting)
- Document any intentional behavior changes separately
- Understand Python's multiple-nodes-per-expression model
If results differ after migration:
- Check node type conversions: Ensure
asExpr(),asCfgNode(),getCfgNode()usage is correct - Verify predicate renames: Confirm
isBarriervsisSanitizerlogic is identical - Review flow predicates: Check
isAdditionalFlowStepmirrorsisAdditionalTaintStep - CFG splitting: Understand Python's control-flow splits may create multiple nodes per expression
- Debug with partial flow: Use flow exploration to find missing edges
- API graph issues: Verify API graph usage patterns are correctly translated
- New dataflow API for writing custom CodeQL queries - Official v2 API announcement
- Analyzing data flow in Python - Python dataflow guide
- CodeQL Python Library Reference - Standard library documentation
- Create CodeQL Query TDD Generic - TDD workflow for queries
Your dataflow migration is successful when:
- ✅ Test baseline established before migration
- ✅ Query compiles without errors using v2 API
- ✅ All configuration classes converted to modules
- ✅ All
isSanitizerrenamed toisBarrier - ✅ All
isAdditionalTaintSteprenamed toisAdditionalFlowStep - ✅ All
cfg.hasFlow()calls replaced with module flow predicates - ✅ Test results EXACTLY match v1 baseline (zero diff)
- ✅ No performance regressions
- ✅ Query metadata updated appropriately
- ✅ Python-specific patterns (CFG nodes, API graphs, frameworks) handled correctly