Status: Draft Version: 0.1 Target Release: DataJoint 2.2 Authors: Dimitri Yatsenko, Claude
Thread-safe mode enables DataJoint to operate in multi-tenant environments (web applications, serverless functions, multi-threaded services) where multiple users or requests share the same Python process. When enabled, global mutable state is disabled, and all connection-specific configuration becomes scoped to individual Connection objects.
Traditional DataJoint usage relies on global state:
dj.config— singleton configuration objectdj.conn()— singleton database connection
This model works well for single-user scripts and notebooks but creates problems in:
- Web applications — concurrent requests from different users/tenants
- Serverless functions — shared runtime across invocations
- Multi-threaded workers — parallel processing with different credentials
- Agentic workflows — AI agents managing multiple database contexts
- Deployment-time decision — Thread-safe mode is set via environment or config file, not programmatically
- Explicit over implicit — All connection parameters must be explicitly provided
- No hidden global state — Connection behavior is fully determined by its configuration
- Backward compatible — Existing code works unchanged when
thread_safe=False
In thread-safe mode, dj.config access is blocked except for thread_safe itself:
| Setting | thread_safe=False |
thread_safe=True |
|---|---|---|
thread_safe |
Read-only (set via env var or config file only) | Read-only |
| All other settings | Read/write | Raises ThreadSafetyError |
All settings become connection-scoped and are accessed via conn.config (read/write):
| Setting | Type | Default | Description |
|---|---|---|---|
safemode |
bool | True | Require confirmation for destructive ops |
database_prefix |
str | "" | Schema name prefix |
stores |
dict | {} | Blob storage configuration |
cache |
Path | None | Local cache directory |
query_cache |
Path | None | Query cache directory |
reconnect |
bool | True | Auto-reconnect on lost connection |
display_limit |
int | 12 | Max rows to display |
display_width |
int | 14 | Column width for display |
show_tuple_count |
bool | True | Show tuple count in repr |
loglevel |
str | "INFO" | Logging level |
filepath_checksum_size_limit |
int | None | Max file size for checksum |
Connection parameters (set at creation, read-only after):
| Setting | Type | Default | Description |
|---|---|---|---|
host |
str | "localhost" | Database hostname |
port |
int | 3306/5432 | Database port |
user |
str | required | Database username |
password |
str | required | Database password |
backend |
str | "mysql" | Database backend |
use_tls |
bool/dict | None | TLS configuration |
Thread-safe mode is read-only after initialization and can only be set via environment variable or config file:
# Method 1: Environment variable
DJ_THREAD_SAFE=true python app.py// Method 2: Config file (datajoint.json)
{ "thread_safe": true }Programmatic setting is not allowed:
dj.config.thread_safe = True # Raises ThreadSafetyError# With DJ_THREAD_SAFE=true set in environment
# Only thread_safe is accessible
dj.config.thread_safe # OK (returns True)
# Everything else raises ThreadSafetyError
dj.config.database.host # Raises ThreadSafetyError
dj.config.display.width # Raises ThreadSafetyError
dj.config.safemode # Raises ThreadSafetyError# Thread-safe connection creation
conn = dj.Connection.from_config(
host="db.example.com",
user="tenant_user",
password="tenant_password",
port=3306,
backend="mysql",
safemode=False,
stores={
"raw": {
"protocol": "s3",
"endpoint": "s3.amazonaws.com",
"bucket": "tenant-data",
"access_key": "...",
"secret_key": "...",
}
},
)
# Or from a configuration dict
tenant_config = {
"host": "db.example.com",
"user": request.tenant.db_user,
"password": request.tenant.db_password,
"stores": request.tenant.stores,
}
conn = dj.Connection.from_config(tenant_config)Settings are accessed through connection.config. The connection is available on:
Connectionobjects directlySchema.connectionTable.connection(any table class: Manual, Lookup, Imported, Computed)
conn = dj.Connection.from_config(...)
schema = dj.Schema("my_pipeline", connection=conn)
@schema
class Subject(dj.Manual):
definition = "subject_id : int"
# All of these access the same connection-scoped config:
conn.config.safemode # Via connection directly
schema.connection.config.safemode # Via schema
Subject.connection.config.safemode # Via table class
# Read settings
conn.config.safemode # True (default)
conn.config.database_prefix # ""
conn.config.stores # {}
conn.config.display_limit # 12
# Modify settings for this connection
conn.config.safemode = False
conn.config.display_limit = 25
conn.config.stores = {"raw": {"protocol": "file", "location": "/data"}}conn = dj.Connection.from_config(tenant_config)
# Explicit connection binding
schema = dj.Schema("my_pipeline", connection=conn)
@schema
class Subject(dj.Manual):
definition = """
subject_id : int
"""
# All operations use connection-scoped settings
Subject.insert([{"subject_id": 1}]) # Uses conn.config.safemode
# Access config through schema or table
schema.connection.config.display_limit = 50
Subject.connection.config.safemode # Same as conn.config.safemode| API | thread_safe=False |
thread_safe=True |
|---|---|---|
dj.conn() |
Works | Raises ThreadSafetyError |
dj.config.thread_safe |
Read/write | Read-only |
dj.config.* (all else) |
Read/write | Raises ThreadSafetyError |
Schema() without connection |
Works | Raises ThreadSafetyError |
Connection.from_config() |
Works | Works |
conn.config.* |
Read/write (forwards to global) | Read/write (connection-scoped) |
The new API (Connection.from_config() and conn.config) is the universal API that works in both modes.
Existing code continues to work unchanged when thread_safe=False:
import datajoint as dj
# Global config access - works
dj.config["database.host"] = "localhost"
dj.config["database.user"] = "root"
dj.config["database.password"] = "secret"
# Singleton connection - works
conn = dj.conn()
# Schema without explicit connection - works
schema = dj.Schema("my_schema") # Uses dj.conn()The new API works identically whether thread_safe is on or off:
import datajoint as dj
# Works with thread_safe=False OR thread_safe=True
conn = dj.Connection.from_config(
host="localhost",
user="root",
password="secret",
safemode=False,
stores={"raw": {...}},
)
# Access settings through connection - works in both modes
conn.config.safemode # False
conn.config.stores # {"raw": {...}}
conn.config.database_prefix # ""
# Schema with explicit connection - works in both modes
schema = dj.Schema("my_schema", connection=conn)The behavior of conn.config depends on which API created the connection, not on the thread_safe setting:
New API (Connection.from_config()) — Uses explicit values or defaults. Never accesses global config. Works identically with thread_safe on or off:
dj.config.safemode = False # Set in global config
dj.config.database_prefix = "dev_"
conn = dj.Connection.from_config(
host="localhost",
user="root",
password="secret",
# safemode not specified - uses default, NOT global config
)
conn.config.safemode # True (default, not global)
conn.config.database_prefix # "" (default, not global)Legacy API (dj.conn()) — Forwards unset values to global config for backward compatibility:
dj.config.safemode = False
dj.config.database_prefix = "dev_"
conn = dj.conn() # Legacy API
conn.config.safemode # False (from dj.config)
conn.config.database_prefix # "dev_" (from dj.config)This design ensures that code using Connection.from_config() is portable and behaves identically whether thread_safe is enabled or not.
Migration is immediate — adopt the new API and your code works in both modes:
# Before (legacy API - only works with thread_safe=False)
dj.config["database.host"] = "localhost"
dj.config["database.user"] = "root"
dj.config["database.password"] = "secret"
conn = dj.conn()
schema = dj.Schema("pipeline")
# After (new API - works with thread_safe=False AND thread_safe=True)
conn = dj.Connection.from_config(
host="localhost",
user="root",
password="secret",
)
schema = dj.Schema("pipeline", connection=conn)Once migrated to the new API, enabling thread_safe=True requires no code changes.
class Config(BaseSettings):
def __getattribute__(self, name):
# Allow private attributes
if name.startswith("_"):
return object.__getattribute__(self, name)
# Allow Pydantic internals
if name.startswith("model_"):
return object.__getattribute__(self, name)
# Always allow checking thread_safe itself
if name == "thread_safe":
return object.__getattribute__(self, name)
# Block everything else in thread-safe mode
if object.__getattribute__(self, "thread_safe"):
raise ThreadSafetyError(
f"Setting '{name}' is connection-scoped in thread-safe mode. "
"Access it via connection.config instead."
)
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
# Allow private attributes
if name.startswith("_"):
return object.__setattr__(self, name, value)
# thread_safe is read-only after initialization
if name == "thread_safe":
try:
object.__getattribute__(self, "thread_safe")
# If we get here, thread_safe already exists - block the set
raise ThreadSafetyError(
"thread_safe cannot be set programmatically. "
"Set DJ_THREAD_SAFE=true in environment or datajoint.json."
)
except AttributeError:
pass # First time setting during __init__ - allow it
return object.__setattr__(self, name, value)
# Block everything else in thread-safe mode
if object.__getattribute__(self, "thread_safe"):
raise ThreadSafetyError(
"Global config is inaccessible in thread-safe mode. "
"Use Connection.from_config() with explicit configuration."
)
return object.__setattr__(self, name, value)class Connection:
def __init__(self, host, user, password, port=None,
use_tls=None, backend=None, *, _config=None):
# ... existing connection setup ...
# Connection-scoped configuration
# Legacy API (dj.conn()) uses global fallback for backward compatibility
self.config = _config if _config is not None else ConnectionConfig(_use_global_fallback=True)
@classmethod
def from_config(cls, cfg=None, *, host=None, user=None, password=None,
port=None, backend=None, safemode=None, stores=None,
database_prefix=None, cache=None, query_cache=None,
reconnect=None, use_tls=None) -> "Connection":
"""
Create connection with explicit configuration.
Works in both thread_safe=False and thread_safe=True modes.
"""
# ... merge cfg dict with kwargs ...
# ... validate required fields (host, user, password) ...
# Build ConnectionConfig - new API never falls back to global config
conn_config = ConnectionConfig(
_use_global_fallback=False,
**({"safemode": safemode} if safemode is not None else {}),
**({"stores": stores} if stores is not None else {}),
**({"database_prefix": database_prefix} if database_prefix is not None else {}),
**({"cache": cache} if cache is not None else {}),
**({"query_cache": query_cache} if query_cache is not None else {}),
**({"reconnect": reconnect} if reconnect is not None else {}),
)
return cls(
host=effective_host,
user=effective_user,
password=effective_password,
port=effective_port,
use_tls=effective_use_tls,
backend=effective_backend,
_config=conn_config,
)class ConnectionConfig:
"""
Connection-scoped configuration (read/write).
Behavior depends on how the connection was created:
- New API (from_config): Uses explicit values or defaults. Never accesses global config.
- Legacy API (dj.conn): Forwards unset values to global dj.config.
"""
_DEFAULTS = {
"safemode": True,
"database_prefix": "",
"stores": {},
"cache": None,
"query_cache": None,
"reconnect": True,
"display_limit": 12,
"display_width": 14,
"show_tuple_count": True,
"loglevel": "INFO",
"filepath_checksum_size_limit": None,
}
def __init__(self, **explicit_values):
self._values = {} # Mutable storage for this connection
# If True, forward unset values to global config (legacy API behavior)
# If False, use defaults only (new API behavior)
self._use_global_fallback = explicit_values.pop("_use_global_fallback", False)
self._values.update(explicit_values)
def __getattr__(self, name):
if name.startswith("_"):
return object.__getattribute__(self, name)
# If set on this connection, return that value
if name in self._values:
return self._values[name]
# Legacy API: forward to global config for backward compatibility
if self._use_global_fallback:
from .settings import config
return getattr(config, name, self._DEFAULTS.get(name))
# New API: use defaults only (no global config access)
return self._DEFAULTS.get(name)
def __setattr__(self, name, value):
if name.startswith("_"):
return object.__setattr__(self, name, value)
# Store in connection-local values
self._values[name] = value
def get_store_spec(self, store_name: str) -> dict:
"""Get store specification by name."""
stores = self.stores
if store_name not in stores:
raise DataJointError(f"Store '{store_name}' is not configured.")
return stores[store_name]class ThreadSafetyError(DataJointError):
"""
Raised when global state is accessed in thread-safe mode.
This error indicates that code is attempting to use global
configuration or connections that are not thread-safe.
"""# Reading blocked config
dj.config.safemode
ThreadSafetyError: Setting 'safemode' is connection-scoped in thread-safe mode.
Access it via connection.config instead.
# Writing blocked config
dj.config.display_limit = 20
ThreadSafetyError: Setting 'display_limit' is connection-scoped in thread-safe mode.
Modify it via connection.config instead.
# Using dj.conn()
dj.conn()
ThreadSafetyError: dj.conn() is disabled in thread-safe mode.
Use Connection.from_config() with explicit configuration.
# Setting thread-safe mode programmatically
dj.config.thread_safe = True
ThreadSafetyError: thread_safe cannot be set programmatically.
Set DJ_THREAD_SAFE=true in environment or datajoint.json.
# Schema without connection
dj.Schema("my_schema")
ThreadSafetyError: Schema requires explicit connection in thread-safe mode.
Use Schema('name', connection=conn).-
Global config in thread-safe mode
- Verify only
thread_safeis accessible (read-only) - Verify all other settings raise ThreadSafetyError (read and write)
- Verify thread_safe cannot be set programmatically (only via env var or config file)
- Verify only
-
Connection.from_config()
- Verify all parameters are accepted
- Verify defaults are applied correctly
- Verify cfg dict merging with kwargs
- Verify works in both thread_safe modes
-
ConnectionConfig
- Verify read/write access to all settings
- Verify forwarding to global config when thread_safe=False
- Verify defaults used when thread_safe=True
- Verify store spec resolution
-
Multi-tenant simulation
- Create multiple connections with different configs
- Verify isolation between connections
- Verify correct store resolution per connection
-
Schema binding
- Verify schemas use connection's config
- Verify safemode behavior per connection
- Connection pooling — Pool of connections per tenant configuration
- Async support — Async connection management for async frameworks
- Context managers — Temporary connection context for specific operations
- Thread-local storage — Rejected in favor of explicit connection passing
- Automatic credential rotation — Application responsibility
- Multi-database transactions — Not supported by underlying backends