This document provides technical details for developers working with the ResourceStatusSystem (RSS) internals.
The ResourceStatusDB contains two sets of tables (Site and Resource), each with three table types.
Current state of all elements. Primary key: (Name, StatusType, VO)
| Field | Type | Default | Description |
|---|---|---|---|
| Name | VARCHAR(64) | Element name (e.g., site name, SE name) | |
| StatusType | VARCHAR(128) | 'all' | Status type (all, ReadAccess, WriteAccess, etc.) |
| VO | VARCHAR(64) | 'all' | Virtual Organization |
| Status | VARCHAR(8) | Current status (Active, Degraded, Probing, Banned, Unknown, Error) | |
| Reason | VARCHAR(512) | 'Unspecified' | Reason for current status |
| DateEffective | DATETIME | When status became effective | |
| TokenExpiration | DATETIME | 9999-12-31 23:59:59 | When lock expires |
| ElementType | VARCHAR(32) | Type (CE, SE, etc.) | |
| LastCheckTime | DATETIME | 1000-01-01 00:00:00 | Last policy check |
| TokenOwner | VARCHAR(16) | 'rs_svc' | Who owns the lock |
Tables: SiteStatus, ResourceStatus
All status changes with auto-increment ID. Primary key: ID
Same fields as Status tables plus:
- ID (BIGINT, AUTO_INCREMENT) - Unique identifier
Tables: SiteLog, ResourceLog
Archived logs with duplicates removed. Same schema as Log tables.
Tables: SiteHistory, ResourceHistory
Cache tables for metrics used by policies.
| Table | Purpose |
|---|---|
| DowntimeCache | Scheduled downtimes from GOCDB (DowntimeID, StartDate, EndDate, Severity) |
| GGUSTicketsCache | GGUS support tickets (Tickets JSON, OpenTickets count) |
| JobCache | Job efficiency metrics per site (Efficiency, MaskStatus) |
| PilotCache | Pilot efficiency per CE (Site, CE, Efficiency, Status) |
| PolicyResult | Policy evaluation results (Element, Name, PolicyName, Status, Reason) |
| SpaceTokenOccupancyCache | Storage space usage (Endpoint, Token, Free, Total) — values stored in MB |
| TransferCache | Transfer quality metrics (SourceName, DestinationName, Metric, Value) |
- insert()
- Insert new records
- select()
- Query records with flexible conditions
- delete()
- Remove records
- addOrModify()
- Insert if not exists, update if exists (by primary key)
- addIfNotThere()
- Insert only if record doesn't exist
Inspector Agent
↓
PEP (Policy Enforcement Point) ← Entry point
↓
PDP (Policy Decision Point) ← Discovers & runs policies
↓
Policies → Commands → Metrics
↓
StateMachine ← Combines results
↓
Actions ← Execute changes
Valid states ordered by priority (lowest number = highest priority):
| State | Level | Description |
|---|---|---|
| Unknown | 5 | No information available |
| Active | 4 | Fully operational |
| Degraded | 3 | Operational but suboptimal |
| Probing | 2 | Under testing (transition from Banned) |
| Banned | 1 | Not usable |
| Error | 0 | Error state |
Transition Rule: Transitions from Banned to Active/Degraded/Unknown must go through Probing first.
Location: PolicySystem/StateMachine.py:28-44
Policies inherit from PolicyBase and implement evaluate().
Example: Job Efficiency Policy
# Location: Policy/JobEfficiencyPolicy.py
class JobEfficiencyPolicy(PolicyBase):
def evaluate(self):
# Execute command to get job stats
jobStats = self.command.doCommand()
efficiency = jobStats['Done'] / jobStats['Total']
# Apply thresholds
if efficiency > 0.9:
return {'Status': 'Active', 'Reason': f'Efficiency: {efficiency:.2%}'}
elif efficiency > 0.7:
return {'Status': 'Degraded', 'Reason': f'Low efficiency: {efficiency:.2%}'}
return {'Status': 'Banned', 'Reason': f'Very low efficiency: {efficiency:.2%}'}The FreeDiskSpacePolicy (Policy/FreeDiskSpacePolicy.py) evaluates SE occupancy using
configurable thresholds. Thresholds are passed through as command arguments so they propagate
from the CS configuration all the way to the policy evaluation:
Configurations.pyreadsUnit,Banned_thresholdandDegraded_thresholdfrom the Operations CS viaOperations().getValue("ResourceStatus/Policies/FreeDiskSpace/Banned_threshold", 0.1)and stores them in the policyargsdict.FreeDiskSpaceCommandreads these values fromself.argsin_prepareCommand()and returns them alongsideFreeandTotalin bothdoNew()anddoCache().FreeDiskSpacePolicy._evaluate()readsBanned_thresholdandDegraded_thresholdfrom the command result dict (with safe defaults) and applies the comparison.
This design keeps thresholds fully configurable per deployment without code changes. See :ref:`rss_advanced_configuration` for the available CS keys.
Commands inherit from Command and implement doCommand().
Example: Downtime Command
# Location: Command/DowntimeCommand.py
class DowntimeCommand(Command):
def doCommand(self):
# Query GOCDB API
gocdb = GOCDB()
downtimes = gocdb.getDowntimes(elementName=self.args['name'])
# Store in cache
for dt in downtimes:
self.clients['ResourceManagementClient'].addOrModifyDowntimeCache(
name=dt['Name'], downtimeID=dt['ID'],
startDate=dt['StartDate'], endDate=dt['EndDate'],
severity=dt['Severity']
)
return S_OK(downtimes)High-level API with caching (Singleton pattern).
Location: Client/ResourceStatus.py
- getElementStatus(elementName, elementType, statusType=None, vO=None)
Get status for Storage/Computing Elements, FTS, Catalogs
from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus rs = ResourceStatus() # Get all status types for a SE rs.getElementStatus('CERN-USER', 'StorageElement') # Returns: {'CERN-USER': {'ReadAccess': 'Active', 'WriteAccess': 'Active', ...}} # Get specific status type rs.getElementStatus('CERN-USER', 'StorageElement', 'ReadAccess') # Returns: {'CERN-USER': {'ReadAccess': 'Active'}} # Get multiple SEs rs.getElementStatus(['CERN-USER', 'PIC-USER'], 'StorageElement', 'ReadAccess')
- setElementStatus(elementName, elementType, statusType, status, reason=None, tokenOwner=None)
Set element status (acquires 1-day token)
rs.setElementStatus('CERN-USER', 'StorageElement', 'ReadAccess', 'Banned', reason='Maintenance')
Caching: Internal RSSCache with configurable lifetime, refreshed automatically on cache miss.
Simplified interface for site operations (Singleton pattern).
Location: Client/SiteStatus.py
- getSiteStatuses(siteNames=None)
- Get status for one or more sites
- isUsableSite(siteName, statusType='all')
- Returns True if site is Active or Degraded
- getUsableSites(statusType='all')
- Returns list of all usable sites
- setSiteStatus(siteName, status, reason=None)
- Set site status manually
Check elements periodically based on their status.
Check Frequency:
- Active/Degraded
- Every 20 minutes
- Probing/Banned/Unknown
- Every 20 minutes
- Error
- Every 5 minutes
Implementation Pattern:
# Location: Agent/SiteInspectorAgent.py, Agent/ElementInspectorAgent.py
def execute(self):
# Get elements needing check (LastCheckTime + lifetime < now)
elements = self._getElementsToBeChecked()
# Process in parallel (15 threads)
with ThreadPoolExecutor(max_workers=15) as executor:
for element in elements:
executor.submit(self._execute, element)
def _execute(self, element):
pep = PEP(clients=self.clients)
pep.enforce({
'element': 'Site', # or 'Resource'
'name': element['Name'],
'statusType': element['StatusType'],
'status': element['Status'],
'elementType': element['ElementType']
})Skip Condition: Elements with TokenOwner != 'rs_svc' are skipped (manual override active).
- SiteInspectorAgent
- Evaluates policies for Site elements (PollingTime: 300s, maxThreads: 15)
- ElementInspectorAgent
- Evaluates policies for Resource elements (PollingTime: 300s, elementType: Resource)
- CacheFeederAgent
- Populates cache tables with metrics (PollingTime: 900s, commands: Downtime, GOCDBSync, FreeDiskSpace, Pilot)
- TokenAgent
- Manages token expiration, notifies owners 12h before expiry (PollingTime: 3600s)
- EmailAgent
- Sends aggregated status change notifications (PollingTime: 1800s)
- SummarizeLogsAgent
- Archives logs to history tables (PollingTime: 300s, keeps 36 months)
- RucioRSSAgent
- Synchronizes with Rucio RSE status (PollingTime: 120s)
Complete flow from element check to database update:
1. Inspector Agent
↓ Query elements where LastCheckTime + lifetime < now
2. Get Current Status from DB
↓ (Name, StatusType, Status, TokenOwner, etc.)
3. PEP.enforce(decisionParams)
↓ Skip if TokenOwner != 'rs_svc'
4. PDP.setup(decisionParams)
↓ Discover applicable policies from CS
5. PDP.takeDecision()
↓ Run each policy via PolicyCaller
6. Policy.evaluate()
↓ Execute Command to gather metrics
7. Command.doCommand()
↓ Fetch from cache or external API
8. StateMachine.orderPolicyResults()
↓ Sort by severity (Banned > Active)
9. PDP discovers applicable actions
↓ LogStatusAction, EmailAction, etc.
10. PEP validates element unchanged
↓ Check DB to prevent race conditions
11. Execute Actions
↓
12. LogStatusAction
↓ Update Status table (current state)
↓ Insert Log table (history)
↓ Set TokenOwner='rs_svc', LastCheckTime=now
13. EmailAction (optional)
↓ Insert into ResourceStatusCache
↓ EmailAgent aggregates and sends later
1. Create policy file:
MyExtension/ResourceStatusSystem/Policy/MyCustomPolicy.py
from DIRAC.ResourceStatusSystem.Policy.PolicyBase import PolicyBase
class MyCustomPolicy(PolicyBase):
def evaluate(self):
# Execute command to get metrics
result = self.command.doCommand()
# Apply custom logic
if result['metric'] > threshold:
return {'Status': 'Active', 'Reason': 'All good'}
return {'Status': 'Degraded', 'Reason': 'Metric below threshold'}2. Configure in CS:
Operations/Defaults/ResourceStatus/Policies
{
Site
{
all
{
policyName = MyCustomPolicy
}
}
}1. Create command file:
MyExtension/ResourceStatusSystem/Command/MyCustomCommand.py
from DIRAC.ResourceStatusSystem.Command.Command import Command
class MyCustomCommand(Command):
def doCommand(self):
# Fetch data from external source
data = self.fetchFromAPI()
# Store in cache (optional)
self.clients['ResourceManagementClient'].addOrModifyCustomCache(data)
return S_OK(data)2. Use in policy:
Policies automatically discover and execute commands based on naming convention.
1. Create action file:
MyExtension/ResourceStatusSystem/PolicySystem/Actions/MyCustomAction.py
from DIRAC.ResourceStatusSystem.PolicySystem.Actions.BaseAction import BaseAction
class MyCustomAction(BaseAction):
def run(self):
element = self.decisionParams['name']
newStatus = self.policyCombinedResult['Status']
# Execute custom action (e.g., call external API)
self.notifyExternalSystem(element, newStatus)
return S_OK()2. Configure in CS:
Operations/Defaults/ResourceStatus/PolicyActions += MyCustomAction- PEP
- Policy Enforcement Point -
PolicySystem/PEP.py:73-160 - PDP
- Policy Decision Point -
PolicySystem/PDP.py - RSSMachine
- State machine -
PolicySystem/StateMachine.py:28-44 - ResourceStatus
- High-level API (Singleton) -
Client/ResourceStatus.py:42-98 - SiteStatus
- Site-specific API (Singleton) -
Client/SiteStatus.py - ResourceStatusDB
- Status database access -
DB/ResourceStatusDB.py - ResourceManagementDB
- Cache database access -
DB/ResourceManagementDB.py
- Unknown (Level 5)
- Active (Level 4)
- Degraded (Level 3)
- Probing (Level 2)
- Banned (Level 1)
- Error (Level 0)
Lower level numbers indicate higher priority when combining policy results.
- Policies
/Operations/Defaults/ResourceStatus/Policies/<Element>/<StatusType>/policyName- Actions
/Operations/Defaults/ResourceStatus/PolicyActions- Services
/Systems/ResourceStatus/Services/- Agents
/Systems/ResourceStatus/Agents/