CORE is available as a Python package for direct integration into your own pipelines and tooling.
pip install cdisc-rules-engineThis installs the engine underlying the CLI and executable, but does not include core.py or the CLI entrypoints. If you need the full CLI, use the executable or source code instead.
Installing the package alone is not enough to run validations. You also need:
- The rules cache — download the contents of
resources/cache/from the repository and store them somewhere in your project. Keep this in sync with the package version you're using. - A CDISC Library API key — required for controlled terminology and library metadata. See update-cache for how to obtain one.
The package also includes the USDM and Dataset-JSON schemas, available if you use the dataset reader classes in cdisc_rules_engine/services/data_readers or the metadata readers in cdisc_rules_engine/services.
| Option A: Business Rules Engine | Option B: RulesEngine Class | |
|---|---|---|
| Interface | Low-level, rule-by-rule | High-level, dataset-oriented |
| Data input | pandas DataFrame | XPT or other file-based datasets |
| Setup | Minimal | More configuration required |
| Best for | Simple in-memory validation | Full multi-domain validation pipelines |
Both options start by loading the cache:
import os
import pathlib
import pickle
from multiprocessing.managers import SyncManager
from cdisc_rules_engine.services.cache import InMemoryCacheService
class CacheManager(SyncManager):
pass
CacheManager.register("InMemoryCacheService", InMemoryCacheService)
def load_rules_cache(path_to_rules_cache):
cache_path = pathlib.Path(path_to_rules_cache)
manager = CacheManager()
manager.start()
cache = manager.InMemoryCacheService()
files = next(os.walk(cache_path), (None, None, []))[2]
for fname in files:
with open(cache_path / fname, "rb") as f:
cache.add_all(pickle.load(f))
return cacheRetrieve rules for a standard and version:
from cdisc_rules_engine.utilities.utils import get_rules_cache_key
cache = load_rules_cache("path/to/rules/cache")
# Note: version uses dashes, not dots
rule_keys = cache.get_all_by_prefix(get_rules_cache_key("sdtmig", "3-4"))
rules = [cache.get(key) for key in rule_keys[0]]get_all_by_prefix returns a nested list of cache keys, not rule objects directly. Fetch the actual rule dicts by calling cache.get() on each key.
Each rule is a dict with keys: core_id, domains, author, reference, sensitivity, executability, description, authorities, standards, classes, rule_type, conditions, actions, output_variables.
If you have rules in raw CDISC metadata format, convert them first:
from cdisc_rules_engine.models.rule import Rule
rule_dict = Rule.from_cdisc_metadata(rule_metadata)
rule_obj = Rule(rule_dict)Minimal setup — good for validating a single domain against an in-memory DataFrame.
from cdisc_rules_engine.models.dataset.pandas_dataset import PandasDataset
from cdisc_rules_engine.models.dataset_variable import DatasetVariable
from cdisc_rules_engine.models.sdtm_dataset_metadata import SDTMDatasetMetadata
pandas_dataset = PandasDataset(data=df)
dataset_metadata = SDTMDatasetMetadata(
name="AE",
label="Adverse Events",
first_record=df.iloc[0].to_dict() if not df.empty else None
)
dataset_variable = DatasetVariable(
pandas_dataset,
column_prefix_map={"--": dataset_metadata.domain},
)DatasetVariable accepts additional optional arguments for richer validation:
dataset_variable = DatasetVariable(
pandas_dataset,
column_prefix_map={"--": dataset_metadata.domain},
value_level_metadata=value_level_metadata,
column_codelist_map=variable_codelist_map,
codelist_term_maps=codelist_term_maps,
)from business_rules.engine import run
from cdisc_rules_engine.models.actions import COREActions
ae_rules = [
r for r in rules
if "AE" in r.get("domains", {}).get("Include", [])
or "ALL" in r.get("domains", {}).get("Include", [])
]
all_results = []
for rule in ae_rules:
results = []
core_actions = COREActions(
output_container=results,
variable=dataset_variable,
dataset_metadata=dataset_metadata,
rule=rule,
value_level_metadata=None,
)
try:
was_triggered = run(rule=rule, defined_variables=dataset_variable, defined_actions=core_actions)
if was_triggered:
all_results.extend(results)
except Exception as e:
print(f"Error in {rule.get('core_id')}: {e}")was_triggered is True if issues were found. Each result in all_results looks like:
{
'executionStatus': 'success',
'domain': 'AE',
'variables': ['AESLIFE'],
'message': 'AESLIFE is completed, but not equal to "N" or "Y"',
'errors': [{'value': {'AESLIFE': 'Maybe'}, 'row': 1}]
}More setup, but handles dataset reading, preprocessing, and multi-domain validation. The source code in cdisc_rules_engine/rules_engine.py and the existing CLI implementation in core.py are the best reference for wiring this together — the initializer arguments map closely to the CLI flags documented in the CLI Reference.
import os
import pyreadstat
from cdisc_rules_engine.models.sdtm_dataset_metadata import SDTMDatasetMetadata
def create_dataset_metadata(file_path):
data, meta = pyreadstat.read_xport(file_path)
first_record = data.iloc[0].to_dict() if not data.empty else None
return SDTMDatasetMetadata(
name=os.path.basename(file_path).split('.')[0].upper(),
label=meta.file_label if hasattr(meta, 'file_label') else "",
filename=os.path.basename(file_path),
full_path=file_path,
file_size=os.path.getsize(file_path),
record_count=len(data),
first_record=first_record,
)
datasets = [
create_dataset_metadata(os.path.join(directory, f))
for f in os.listdir(directory)
if f.lower().endswith('.xpt')
]You don't need to manually create PandasDataset or DatasetVariable objects for Option B — the engine handles this internally.
from cdisc_rules_engine.models.library_metadata_container import LibraryMetadataContainer
from cdisc_rules_engine.utilities.utils import (
get_library_variables_metadata_cache_key,
get_model_details_cache_key_from_ig,
get_standard_details_cache_key,
get_variable_codelist_map_cache_key,
)
standard = "sdtmig"
standard_version = "3-4"
standard_substandard = None
standard_metadata = cache.get(get_standard_details_cache_key(standard, standard_version, standard_substandard))
model_metadata = cache.get(get_model_details_cache_key_from_ig(standard_metadata)) if standard_metadata else {}
ct_packages = ["sdtmct-2021-12-17"] # replace with your CT package versions
ct_package_metadata = {pkg: cache.get(pkg) for pkg in ct_packages}
library_metadata = LibraryMetadataContainer(
standard_metadata=standard_metadata,
model_metadata=model_metadata,
variables_metadata=cache.get(get_library_variables_metadata_cache_key(standard, standard_version, standard_substandard)),
variable_codelist_map=cache.get(get_variable_codelist_map_cache_key(standard, standard_version, standard_substandard)),
ct_package_metadata=ct_package_metadata,
)from cdisc_rules_engine.config import config as default_config
from cdisc_rules_engine.services.data_services import DataServiceFactory
max_dataset_size = max(datasets, key=lambda x: x.file_size).file_size
# Set max_dataset_size=0 to force Dask processing for all datasets
data_service_factory = DataServiceFactory(
config=default_config,
cache_service=cache,
standard=standard,
standard_version=standard_version,
standard_substandard=standard_substandard,
library_metadata=library_metadata,
max_dataset_size=max_dataset_size,
)
data_service = data_service_factory.get_data_service(dataset_paths)from cdisc_rules_engine.rules_engine import RulesEngine
rules_engine = RulesEngine(
cache=cache,
data_service=data_service,
config_obj=default_config,
external_dictionaries=None,
standard=standard,
standard_version=standard_version,
standard_substandard=None,
library_metadata=library_metadata,
max_dataset_size=max_dataset_size,
dataset_paths=dataset_paths,
ct_packages=ct_packages,
define_xml_path="path/to/define.xml", # optional
validate_xml=False,
)Note the ConditionCompositeFactory conversion step — this is required before passing rules to validate_single_rule:
import time
from cdisc_rules_engine.models.rule_conditions import ConditionCompositeFactory
from cdisc_rules_engine.models.rule_validation_result import RuleValidationResult
start_time = time.time()
validation_results = []
for rule in rules:
try:
if isinstance(rule["conditions"], dict):
rule["conditions"] = ConditionCompositeFactory.get_condition_composite(rule["conditions"])
results = rules_engine.validate_single_rule(rule, datasets)
flattened = [r for domain_results in results.values() for r in domain_results]
validation_results.append(RuleValidationResult(rule, flattened))
except Exception as e:
print(f"Error validating rule {rule.get('core_id')}: {e}")
elapsed_time = time.time() - start_timeSimple text output:
import json
with open("validation_results.txt", "w") as f:
for result in validation_results:
rule_id = result.rule.get("core_id", "Unknown")
f.write(f"Rule: {rule_id}\n")
if hasattr(result, 'violations') and result.violations:
f.write(f"Found {len(result.violations)} violations\n")
for violation in result.violations:
f.write(f" - {json.dumps(violation, default=str)}\n")
else:
f.write(" No violations found\n")
f.write("\n")For structured output, use ReportFactory:
reporting_factory = ReportFactory(
datasets=datasets,
validation_results=validation_results,
elapsed_time=elapsed_time,
args=args,
data_service=data_service,
)
reporting_services = reporting_factory.get_report_services()Cache key format — always use dashes in version strings (3-4, not 3.4).
column_prefix_map — maps the -- variable prefix to the dataset domain (e.g. {"--": "AE"}), resolving placeholders like --SEQ → AESEQ.
External dictionaries — pass an ExternalDictionariesContainer to RulesEngine if validating rules that require MedDRA, WHODrug, LOINC, UNII, MedRT, or SNOMED. See the External Dictionary Reference.
Dask — set max_dataset_size=0 when initializing DataServiceFactory to force Dask processing for all datasets.
Windows compatibility — add freeze_support() for multiprocessing:
from multiprocessing import freeze_support
if __name__ == "__main__":
freeze_support()
main()- Ensure the DataFrame contains all required columns for the rules being run
column_prefix_mapmust correctly map"--"to the domain (e.g.{"--": "AE"})- The dataset object must be a
PandasDatasetinstance, not a raw pandas DataFrame full_pathmust be set inSDTMDatasetMetadatawhen using theRulesEngineapproach- The rule's
domains.Includemust match your dataset's domain standard_versionformat must be consistent throughout (3-4, not3.4)- CT package metadata must be present in the cache if validating against controlled terminology
- When using
define.xml, the file must be nameddefine.xmland the path must be valid - If using external dictionaries, verify all file paths are correct and accessible
- Don't forget the
ConditionCompositeFactoryconversion before callingvalidate_single_rule(Option B)