Organize multi-schema pipelines for team collaboration.
A production DataJoint pipeline typically involves:
- Multiple schemas — Organized by experimental modality or processing stage
- Team of users — With different roles and access levels
- Shared infrastructure — Database server, object storage, code repository
- Coordination — Between code, database, and storage permissions
This guide covers practical project organization. For conceptual background on pipeline architecture and the DAG structure, see Data Pipelines.
For a fully managed solution, request a DataJoint Platform account.
Use a modern Python project layout with source code under src/:
my_pipeline/
├── datajoint.json # Shared settings (committed)
├── .secrets/ # Local credentials (gitignored)
│ ├── database.password
│ └── storage.credentials
├── .gitignore
├── pyproject.toml # Package metadata and dependencies
├── README.md
├── src/
│ └── my_pipeline/
│ ├── __init__.py
│ ├── subject.py # subject schema
│ ├── session.py # session schema
│ ├── ephys.py # ephys schema
│ ├── imaging.py # imaging schema
│ ├── analysis.py # analysis schema
│ └── utils/
│ └── __init__.py
├── tests/
│ ├── conftest.py
│ └── test_ephys.py
└── docs/
└── ...
Each module defines and binds to its schema:
# src/my_pipeline/ephys.py
import datajoint as dj
from . import session # Import dependency
schema = dj.Schema('ephys')
@schema
class Probe(dj.Lookup):
definition = """
probe_type : varchar(32)
---
num_channels : int32
"""
@schema
class Recording(dj.Imported):
definition = """
-> session.Session
-> Probe
---
recording_path : varchar(255)
"""Module imports reflect the schema DAG:
# analysis.py depends on both ephys and imaging
from . import ephys
from . import imaging
schema = dj.Schema('analysis')
@schema
class MultiModalAnalysis(dj.Computed):
definition = """
-> ephys.Recording
-> imaging.Scan
---
correlation : float64
"""Store non-secret configuration in datajoint.json at the project root:
datajoint.json (committed):
{
"database": {
"host": "db.example.com",
"port": 3306
},
"stores": {
"main": {
"protocol": "s3",
"endpoint": "s3.example.com",
"bucket": "my-org-data",
"location": "my_pipeline"
}
}
}Credentials are stored locally and never committed:
Option 1: .secrets/ directory
.secrets/
├── database.user
├── database.password
├── storage.access_key
└── storage.secret_key
Option 2: Environment variables
export DJ_USER=alice
export DJ_PASS=alice_password
export DJ_STORES__MAIN__ACCESS_KEY=...
export DJ_STORES__MAIN__SECRET_KEY=...# Credentials
.secrets/
# Python
__pycache__/
*.pyc
*.egg-info/
dist/
build/
# Environment
.env
.venv/
# IDE
.idea/
.vscode/[project]
name = "my-pipeline"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = [
"datajoint>=2.0",
"numpy",
]
[project.optional-dependencies]
dev = ["pytest", "pytest-cov"]
[tool.setuptools.packages.find]
where = ["src"]Multi-user database access requires:
- User accounts — Individual credentials per team member
- Schema permissions — Which users can access which schemas
- Operation permissions — SELECT, INSERT, UPDATE, DELETE, CREATE, DROP
- Role hierarchy — Admin, developer, analyst, viewer
- Audit trail — Who modified what and when
-- Create user
CREATE USER 'alice'@'%' IDENTIFIED BY 'password';
-- Grant read-only on specific schema
GRANT SELECT ON ephys.* TO 'alice'@'%';
-- Grant read-write on specific schema
GRANT SELECT, INSERT, UPDATE, DELETE ON analysis.* TO 'alice'@'%';
-- Grant full access (developers)
GRANT ALL PRIVILEGES ON my_pipeline_*.* TO 'bob'@'%';| Role | Permissions | Typical Use |
|---|---|---|
| Viewer | SELECT | Browse data, run queries |
| Analyst | SELECT, INSERT on analysis | Add analysis results |
| Operator | SELECT, INSERT, DELETE on data schemas | Run pipeline |
| Developer | ALL on development schemas | Schema changes |
| Admin | ALL + GRANT | User management |
- Users need SELECT on parent schemas to INSERT into child schemas (FK validation)
- Cascading deletes require DELETE on all dependent schemas
- Schema creation requires CREATE privilege
- Coordinating permissions across many schemas becomes complex
Object storage permissions must align with database permissions:
- Bucket/prefix policies — Map to schema access
- Read vs write — Match SELECT vs INSERT/UPDATE
- Credential distribution — Per-user or shared service accounts
- Cross-schema objects — When computed tables reference multiple inputs
A DataJoint project creates a structured storage pattern:
📁 project_name/
├── 📁 schema_name1/
├── 📁 schema_name2/
├── 📁 schema_name3/
│ ├── objects/
│ │ └── table1/
│ │ └── key1-value1/
│ └── fields/
│ └── table1-field1/
└── ...
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": "arn:aws:s3:::my-lab-data/datajoint/ephys/*"
},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::my-lab-data/datajoint/analysis/*"
}
]
}- Object paths include schema name:
{project}/{schema}/{table}/... - Users need read access to fetch blobs from upstream schemas
- Content-addressed storage (
<blob@>) shares objects across tables - Garbage collection requires coordinated delete permissions
Initialize schemas in dependency order:
# src/my_pipeline/__init__.py
from . import subject # No dependencies
from . import session # Depends on subject
from . import ephys # Depends on session
from . import imaging # Depends on session
from . import analysis # Depends on ephys, imaging
def initialize():
"""Create all schemas in dependency order."""
# Schemas are created when modules are imported
# and tables are first accessed
subject.Subject()
session.Session()
ephys.Recording()
imaging.Scan()
analysis.MultiModalAnalysis()Track schema versions with your code:
# src/my_pipeline/version.py
__version__ = "1.2.0"
SCHEMA_VERSIONS = {
'subject': '1.0.0',
'session': '1.1.0',
'ephys': '1.2.0',
'imaging': '1.2.0',
'analysis': '1.2.0',
}┌─────────────────┐ ┌─────────────────┐
│ Development │ │ Production │
├─────────────────┤ ├─────────────────┤
│ dev_subject │ │ subject │
│ dev_session │ │ session │
│ dev_ephys │ │ ephys │
└─────────────────┘ └─────────────────┘
│ │
│ Schema promotion │
└───────────────────────┘
main ────────────────────────────────────▶
│ │
│ feature/ │ hotfix/
▼ ▼
ephys-v2 fix-recording
│ │
└──────────────┴──▶ main
Managing a team pipeline requires coordinating:
| Component | Challenges |
|---|---|
| Code | Module dependencies, version control, deployment |
| Database | User accounts, schema permissions, role hierarchy |
| Object Storage | Bucket policies, credential distribution, path alignment |
| Compute | Worker deployment, job distribution, resource allocation |
| Monitoring | Progress tracking, error alerting, audit logging |
These challenges grow with team size and pipeline complexity. The DataJoint Platform provides integrated management for all these concerns.
- Deploy to Production — Production mode and environment configuration
- Data Pipelines — Conceptual overview and architecture
- Configure Object Storage — Storage setup
- Distributed Computing — Multi-worker pipelines
- Model Relationships — Foreign key patterns