Skip to content

Commit 10efc7b

Browse files
committed
feat: add catalogs support
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent a205884 commit 10efc7b

10 files changed

Lines changed: 121 additions & 33 deletions

File tree

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ The following variables can be set in the environment.
6666
| `MCPSERVER_TOKEN` | Token to use for testing | unset |
6767

6868

69+
6970
## Usage
7071

7172
### Start the Server
@@ -272,6 +273,27 @@ mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8
272273
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --archetype hpc
273274
```
274275

276+
### Providers Interface
277+
278+
When you bring up a hub and workers, by default we will use the [resource-secretary](https://github.com/converged-computing/resource-secretary) library to create real and/or mock providers to add to it. If you do not have this installed it will still come up, but without these tools for the agent. In addition, you can add your own catalogs of tools for your hierarchy. Add them to the mcpserver.yaml. Here is a simple example:
279+
280+
```yaml
281+
tools:
282+
- path: flux_mcp.validate.flux_validate_jobspec
283+
- path: hpc_mcp.filesystem.filesystem_write_file
284+
catalogs:
285+
- path: snakemake_agent.catalog.SnakemakeCatalog
286+
name: snakemake
287+
```
288+
289+
A catalog is a provider that is not probed for automatically, and they can be provisioned by the library here (as the example above) or externally. They typically are not probed for because the user should request using it. As an example, snakemake is going to clone snakemake-wrappers, which is a system change even if just writing to a temporary directory. You would never want to do that automatically.
290+
291+
Adding this catalog will expose interfaces for your worker to provide Snakemake functions, more specifically wrappers, to use. If you want to implement a catalog, you can make a basic class with `probe` that needs to return True/False to determine if it should be used, and then appropriate functions you want to expose. We support the following decorators for the Worker Secretary Agent:
292+
293+
- `workflow_tool`: A tool intended for use when the workflow agent is running a workflow
294+
- `dispatch_tool`: revealed when a secretary agent is deciding to dispatch work.
295+
- `secretary_tool`: revealed when a secretary agent is negotiating (e.g., "Can I satisfy this request?")
296+
275297
### Mocking a Hub
276298

277299
If you are doing experiments, you can bring up a hub the same way:
@@ -430,6 +452,7 @@ Here are a few design choices (subject to change, of course). I am starting with
430452
- [ ] need way to "pass forward" an error from a worker that, for example, API key not set.
431453
- [ ] I want to have the equivalent of a satisfy endpoint, checking for the negotiate but not dispatch.
432454
- [ ] I also want an equivalent "just submit to this cluster" endpoint.
455+
- [ ] likely we want an ability to one off disabling probing or specific providers
433456

434457
Idea:
435458

examples/catalogs/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Catalogs
2+
3+
A catalog is a provider that needs to be explicitly added, and then will expose multiple different functions for an agent.
4+
Let's first prepare some snakemake data:
5+
6+
```bash
7+
wget https://github.com/snakemake/snakemake-tutorial-data/archive/v5.4.5.tar.gz
8+
tar --wildcards -xf v5.4.5.tar.gz --strip 1 "*/data"
9+
```
10+
11+
Setup your conda:
12+
13+
```bash
14+
conda config --add channels defaults
15+
conda config --add channels bioconda
16+
conda config --add channels conda-forge
17+
conda config --set channel_priority strict
18+
pip install snakemake-wrapper-utils
19+
```
20+
21+
Export envars so the snakemake catalog tools know EXACTLY where the data is (read only) and where to write (should be an empty directory for read/write)
22+
23+
```bash
24+
mkdir -p ./workdir
25+
export RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR=$(pwd)/workdir
26+
export RESOURCE_SECRETARY_SNAKEMAKE_INPUT=$(pwd)/data
27+
```
28+
29+
```bash
30+
mcpserver start --config ./snakemake.yaml --dual --port 8089
31+
```
32+
33+
And now run fractale (and export needed tokens):
34+
35+
```bash
36+
fractale run --database json ./plan.yaml
37+
```

examples/catalogs/plan.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
name: Snakemake Tutorial Workflow
2+
agents:
3+
- path: fractale_agents.hpc.workflow.SnakemakeWorkflowAgent
4+
steps:
5+
- name: tutorial
6+
type: agent
7+
tool: snakemake-workflow
8+
inputs:
9+
goal: |
10+
Map E. coli sequencing reads from samples A, B, and C to the reference genome,
11+
sort and index the resulting alignments, then call genomic variants across all
12+
three samples jointly. The reference genome and reads are in the INPUT_DIR.
13+
Write all outputs to WORK_DIR.

examples/catalogs/snakemake.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
catalogs:
2+
- path: resource_secretary.providers.workflow.SnakemakeProvider
3+
name: snakemake

mcpserver/cli/args.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def populate_start_args(start):
3838
"--event", action="append", help="Direct event stream to import.", default=[]
3939
)
4040
start.add_argument("--tool", action="append", help="Direct tool to import.", default=[])
41+
start.add_argument("--catalog", action="append", help="Direct catalog to import.", default=[])
4142
start.add_argument("--resource", action="append", help="Direct resource to import.", default=[])
4243
start.add_argument("--prompt", action="append", help="Direct prompt to import.", default=[])
4344
start.add_argument("--include", help="Include tags", action="append", default=None)
@@ -111,13 +112,6 @@ def populate_start_args(start):
111112
action="store_true",
112113
default=False,
113114
)
114-
worker_group.add_argument(
115-
"--label",
116-
action="append",
117-
dest="labels",
118-
help="Custom labels in key=value format (e.g., --label gpu=h100). Can be used multiple times.",
119-
)
120-
121115
# const=True is what we get if the flag is present but no value is given
122116
# default=False is what we get if the flag is totally absent
123117
# THe user can also ask for an archetype (hpc, cloud, standalone)

mcpserver/cli/manager.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ def get_manager(mcp, cfg: MCPConfig):
2525
# A repeated function will return None
2626
if not endpoint:
2727
continue
28-
print(f" {emoji} Registered: {endpoint.name}")
28+
29+
# Catalog can include a listing of tools
30+
if isinstance(endpoint, list):
31+
for e in endpoint:
32+
print(f" {emoji} Registered: {e.name}")
33+
else:
34+
print(f" {emoji} Registered: {endpoint.name}")
2935

3036
# Handle SSL
3137
if cfg.server.ssl_keyfile is not None and cfg.server.ssl_certfile is not None:
@@ -45,6 +51,7 @@ def register_explicit_capabilities(mcp, cfg: MCPConfig):
4551
# Map configuration lists to the manager's registration methods
4652
registries = [
4753
(cfg.tools, manager.register_tool, "✅"),
54+
(cfg.catalogs, manager.register_catalog, "📖"),
4855
(cfg.prompts, manager.register_prompt, "💬"),
4956
(cfg.resources, manager.register_resource, "⛰️"),
5057
(cfg.events, manager.register_event, "📡"),

mcpserver/core/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class MCPConfig:
5151
discovery: List[str] = field(default_factory=list)
5252
tools: List[Capability] = field(default_factory=list)
5353
events: List[Capability] = field(default_factory=list)
54+
catalogs: List[Capability] = field(default_factory=list)
5455
prompts: List[Capability] = field(default_factory=list)
5556
resources: List[Capability] = field(default_factory=list)
5657

@@ -92,6 +93,7 @@ def make_caps(key):
9293
discovery=data.get("discovery", []),
9394
tools=make_caps("tools"),
9495
events=make_caps("events"),
96+
catalogs=make_caps("catalogs"),
9597
prompts=make_caps("prompts"),
9698
resources=make_caps("resources"),
9799
)
@@ -114,7 +116,8 @@ def from_args(cls, args):
114116
exclude=args.exclude,
115117
discovery=args.tool_module or [],
116118
tools=[Capability(path=t) for t in (args.tool or [])],
117-
events=[Capability(path=e) for t in (args.event or [])],
119+
events=[Capability(path=e) for e in (args.event or [])],
120+
catalogs=[Capability(path=c) for c in (args.catalog or [])],
118121
prompts=[Capability(path=p) for p in (args.prompt or [])],
119122
resources=[Capability(path=r) for r in (args.resource or [])],
120123
)

mcpserver/core/hub.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,7 @@ async def fetch_all_statuses(self) -> dict:
291291

292292
async def status_handler(wid, sess):
293293
info = self.workers[wid]
294-
base_metadata = {
295-
"labels": info.get("labels", {}),
296-
"url": info["url"],
297-
}
294+
base_metadata = {"url": info["url"]}
298295

299296
mcp_result = await sess.call_tool("get_status", {})
300297
raw_text = mcp_result.content[0].text
@@ -329,7 +326,6 @@ async def register(request: Request):
329326
self.workers[wid] = {
330327
"url": wurl,
331328
"client": Client(wurl),
332-
"labels": data.get("labels", {}),
333329
}
334330

335331
# Discover tools in the background

mcpserver/core/worker.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ def __init__(
2323
secret: str,
2424
worker_id: Optional[str] = None,
2525
public_url: Optional[str] = None,
26-
labels: Optional[list] = None,
2726
mock: Optional[bool] = False,
2827
verbose: Optional[bool] = False,
2928
):
@@ -39,9 +38,6 @@ def __init__(
3938
# Static Manifest for the worker
4039
self.manifest = self.build_manifest()
4140

42-
# Note from vsoch: not sure if this will be useful / what we should use for.
43-
self.labels = self.parse_labels(labels)
44-
4541
# Register MCP Tools automatically
4642
self.register_agent_tools()
4743

@@ -65,19 +61,6 @@ def build_manifest(self) -> Dict[str, Any]:
6561
manifest[category] = {inst.name: inst.metadata for inst in instances}
6662
return manifest
6763

68-
def parse_labels(self, label_list: Optional[list]) -> dict:
69-
"""
70-
Converts ['key=val', 'key2=val2'] to a dictionary.
71-
"""
72-
labels = {}
73-
if not label_list:
74-
return labels
75-
for item in label_list:
76-
if "=" in item:
77-
k, v = item.split("=", 1)
78-
labels[k.strip()] = v.strip()
79-
return labels
80-
8164
async def run_registration(self):
8265
"""
8366
Registers the worker with the Hub.
@@ -88,7 +71,6 @@ async def run_registration(self):
8871
payload = {
8972
"id": self.worker_id,
9073
"url": self.public_url,
91-
"labels": self.labels,
9274
"manifest": self.manifest,
9375
}
9476
headers = {"X-MCP-Token": self.secret}
@@ -119,6 +101,5 @@ def from_args(cls, mcp, args, cfg) -> Optional["WorkerManager"]:
119101
secret=args.join_secret,
120102
worker_id=args.worker_id,
121103
public_url=public_url,
122-
labels=args.labels,
123104
verbose=args.verbose,
124105
)

mcpserver/tools/manager.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,37 @@ def register_tool(self, mcp, tool_path: str, name: str = None):
118118
self.registered_keys.add(f"tool:{actual_name}")
119119
return endpoint
120120

121+
def register_catalog(self, mcp, tool_path: str, name: str = None):
122+
"""
123+
Register a catalog (a provider) with tools
124+
"""
125+
126+
module_name, class_name = tool_path.rsplit(".", 1)
127+
module = importlib.import_module(module_name)
128+
129+
tools = []
130+
for attr_name, obj in inspect.getmembers(module, inspect.isclass):
131+
if attr_name != class_name:
132+
continue
133+
134+
# Instantiate the class so methods are bound and self is resolved
135+
instance = obj()
136+
if hasattr(instance, "probe"):
137+
instance.probe()
138+
139+
for method_name, method in inspect.getmembers(instance, predicate=inspect.ismethod):
140+
if (
141+
hasattr(method, "is_tool")
142+
and method.is_tool
143+
and f"tool:{method_name}" not in self.registered_keys
144+
):
145+
endpoint = Tool.from_function(method, name=method_name)
146+
mcp.add_tool(endpoint)
147+
self.registered_keys.add(f"tool:{method_name}")
148+
tools.append(endpoint)
149+
150+
return tools
151+
121152
def register_resource(self, mcp, tool_path: str, name: str = None):
122153
"""
123154
Register a resource.

0 commit comments

Comments
 (0)