Skip to content

Commit 70acf56

Browse files
vsilentCopilot
andcommitted
feat(pipe): Phase 1 — complete CLI commands, agent commands, and REST endpoint
Pipe feature Phase 1 implementation: CLI commands (src/bin/stacker.rs, src/console/commands/cli/pipe.rs): - pipe list: queries API, displays table with ID/source/target/status/triggers/errors - pipe create: interactive flow — scans both apps, lets user pick endpoints, auto-matches fields, creates template + instance via API - pipe activate: sets status to active, sends activate_pipe agent command with full pipe config (endpoints, field mapping, trigger type) - pipe deactivate: sets status to paused, sends deactivate_pipe agent command - pipe trigger: one-shot execution, sends trigger_pipe with optional input data Client methods (src/cli/stacker_client.rs): - 6 new methods: list_pipe_instances, get_pipe_instance, create_pipe_template, create_pipe_instance, update_pipe_status, list_pipe_templates - 4 new structs: PipeTemplateInfo, PipeInstanceInfo, CreatePipeTemplateApiRequest, CreatePipeInstanceApiRequest Agent command types (src/forms/status_panel.rs): - activate_pipe, deactivate_pipe, trigger_pipe command request/report structs - Full parameter validation and result validation - 9 unit tests for all new command types REST endpoint (src/routes/pipe/update.rs): - PUT /api/v1/pipes/instances/{id}/status — update pipe instance status Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 7fa0a05 commit 70acf56

7 files changed

Lines changed: 1270 additions & 15 deletions

File tree

src/bin/stacker.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,48 @@ enum PipeCommands {
581581
#[arg(long)]
582582
deployment: Option<String>,
583583
},
584+
/// Activate a pipe instance (start listening for triggers)
585+
Activate {
586+
/// Pipe instance ID (UUID)
587+
pipe_id: String,
588+
/// Trigger type: webhook, poll, or manual
589+
#[arg(long, default_value = "webhook")]
590+
trigger: String,
591+
/// Poll interval in seconds (only for --trigger=poll)
592+
#[arg(long, default_value = "300")]
593+
poll_interval: u32,
594+
/// Output in JSON format
595+
#[arg(long)]
596+
json: bool,
597+
/// Deployment hash
598+
#[arg(long)]
599+
deployment: Option<String>,
600+
},
601+
/// Deactivate a pipe instance (stop listening)
602+
Deactivate {
603+
/// Pipe instance ID (UUID)
604+
pipe_id: String,
605+
/// Output in JSON format
606+
#[arg(long)]
607+
json: bool,
608+
/// Deployment hash
609+
#[arg(long)]
610+
deployment: Option<String>,
611+
},
612+
/// Trigger a pipe instance manually (one-shot execution)
613+
Trigger {
614+
/// Pipe instance ID (UUID)
615+
pipe_id: String,
616+
/// Optional JSON input data to feed into the pipe
617+
#[arg(long)]
618+
data: Option<String>,
619+
/// Output in JSON format
620+
#[arg(long)]
621+
json: bool,
622+
/// Deployment hash
623+
#[arg(long)]
624+
deployment: Option<String>,
625+
},
584626
}
585627

586628
#[derive(Debug, Subcommand)]
@@ -1134,6 +1176,15 @@ fn get_command(
11341176
PipeCommands::List { json, deployment } => Box::new(
11351177
pipe::PipeListCommand::new(json, deployment),
11361178
),
1179+
PipeCommands::Activate { pipe_id, trigger, poll_interval, json, deployment } => Box::new(
1180+
pipe::PipeActivateCommand::new(pipe_id, trigger, poll_interval, json, deployment),
1181+
),
1182+
PipeCommands::Deactivate { pipe_id, json, deployment } => Box::new(
1183+
pipe::PipeDeactivateCommand::new(pipe_id, json, deployment),
1184+
),
1185+
PipeCommands::Trigger { pipe_id, data, json, deployment } => Box::new(
1186+
pipe::PipeTriggerCommand::new(pipe_id, data, json, deployment),
1187+
),
11371188
}
11381189
},
11391190
StackerCommands::Agent { command: agent_cmd } => {

src/cli/stacker_client.rs

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,93 @@ pub struct DeploymentStatusInfo {
186186
pub updated_at: String,
187187
}
188188

189+
/// Pipe template info from `/api/v1/pipes/templates`
190+
#[derive(Debug, Clone, Serialize, Deserialize)]
191+
pub struct PipeTemplateInfo {
192+
pub id: String,
193+
pub name: String,
194+
#[serde(default)]
195+
pub description: Option<String>,
196+
pub source_app_type: String,
197+
pub source_endpoint: serde_json::Value,
198+
pub target_app_type: String,
199+
pub target_endpoint: serde_json::Value,
200+
#[serde(default)]
201+
pub target_external_url: Option<String>,
202+
pub field_mapping: serde_json::Value,
203+
#[serde(default)]
204+
pub config: Option<serde_json::Value>,
205+
#[serde(default)]
206+
pub is_public: Option<bool>,
207+
pub created_by: String,
208+
pub created_at: String,
209+
pub updated_at: String,
210+
}
211+
212+
/// Pipe instance info from `/api/v1/pipes/instances`
213+
#[derive(Debug, Clone, Serialize, Deserialize)]
214+
pub struct PipeInstanceInfo {
215+
pub id: String,
216+
#[serde(default)]
217+
pub template_id: Option<String>,
218+
pub deployment_hash: String,
219+
pub source_container: String,
220+
#[serde(default)]
221+
pub target_container: Option<String>,
222+
#[serde(default)]
223+
pub target_url: Option<String>,
224+
#[serde(default)]
225+
pub field_mapping_override: Option<serde_json::Value>,
226+
#[serde(default)]
227+
pub config_override: Option<serde_json::Value>,
228+
pub status: String,
229+
#[serde(default)]
230+
pub last_triggered_at: Option<String>,
231+
#[serde(default)]
232+
pub trigger_count: i64,
233+
#[serde(default)]
234+
pub error_count: i64,
235+
pub created_by: String,
236+
pub created_at: String,
237+
pub updated_at: String,
238+
}
239+
240+
/// Request body for creating a pipe template
241+
#[derive(Debug, Clone, Serialize)]
242+
pub struct CreatePipeTemplateApiRequest {
243+
pub name: String,
244+
#[serde(skip_serializing_if = "Option::is_none")]
245+
pub description: Option<String>,
246+
pub source_app_type: String,
247+
pub source_endpoint: serde_json::Value,
248+
pub target_app_type: String,
249+
pub target_endpoint: serde_json::Value,
250+
#[serde(skip_serializing_if = "Option::is_none")]
251+
pub target_external_url: Option<String>,
252+
pub field_mapping: serde_json::Value,
253+
#[serde(skip_serializing_if = "Option::is_none")]
254+
pub config: Option<serde_json::Value>,
255+
#[serde(skip_serializing_if = "Option::is_none")]
256+
pub is_public: Option<bool>,
257+
}
258+
259+
/// Request body for creating a pipe instance
260+
#[derive(Debug, Clone, Serialize)]
261+
pub struct CreatePipeInstanceApiRequest {
262+
pub deployment_hash: String,
263+
pub source_container: String,
264+
#[serde(skip_serializing_if = "Option::is_none")]
265+
pub target_container: Option<String>,
266+
#[serde(skip_serializing_if = "Option::is_none")]
267+
pub target_url: Option<String>,
268+
#[serde(skip_serializing_if = "Option::is_none")]
269+
pub template_id: Option<String>,
270+
#[serde(skip_serializing_if = "Option::is_none")]
271+
pub field_mapping_override: Option<serde_json::Value>,
272+
#[serde(skip_serializing_if = "Option::is_none")]
273+
pub config_override: Option<serde_json::Value>,
274+
}
275+
189276
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
190277
// StackerClient — HTTP client for the Stacker server
191278
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@@ -1426,6 +1513,224 @@ impl StackerClient {
14261513
Ok((json, hash))
14271514
}
14281515

1516+
// ── Pipe management ─────────────────────────────
1517+
1518+
/// List pipe instances for a deployment.
1519+
///
1520+
/// `GET /api/v1/pipes/instances/{deployment_hash}`
1521+
pub async fn list_pipe_instances(
1522+
&self,
1523+
deployment_hash: &str,
1524+
) -> Result<Vec<PipeInstanceInfo>, CliError> {
1525+
let url = format!(
1526+
"{}/api/v1/pipes/instances/{}",
1527+
self.base_url, deployment_hash
1528+
);
1529+
let resp = self
1530+
.http
1531+
.get(&url)
1532+
.bearer_auth(&self.token)
1533+
.send()
1534+
.await
1535+
.map_err(|e| CliError::ConfigValidation(format!("Failed to list pipes: {}", e)))?;
1536+
1537+
if !resp.status().is_success() {
1538+
let status = resp.status().as_u16();
1539+
let body = resp.text().await.unwrap_or_default();
1540+
return Err(CliError::ConfigValidation(format!(
1541+
"List pipes failed ({}): {}",
1542+
status, body
1543+
)));
1544+
}
1545+
1546+
let api: ApiResponse<PipeInstanceInfo> =
1547+
resp.json().await.map_err(|e| CliError::ConfigValidation(format!("Invalid pipe list response: {}", e)))?;
1548+
1549+
Ok(api.list.unwrap_or_default())
1550+
}
1551+
1552+
/// Get a pipe instance by ID.
1553+
///
1554+
/// `GET /api/v1/pipes/instances/detail/{instance_id}`
1555+
pub async fn get_pipe_instance(
1556+
&self,
1557+
instance_id: &str,
1558+
) -> Result<Option<PipeInstanceInfo>, CliError> {
1559+
let url = format!(
1560+
"{}/api/v1/pipes/instances/detail/{}",
1561+
self.base_url, instance_id
1562+
);
1563+
let resp = self
1564+
.http
1565+
.get(&url)
1566+
.bearer_auth(&self.token)
1567+
.send()
1568+
.await
1569+
.map_err(|e| CliError::ConfigValidation(format!("Failed to get pipe: {}", e)))?;
1570+
1571+
if resp.status().as_u16() == 404 {
1572+
return Ok(None);
1573+
}
1574+
if !resp.status().is_success() {
1575+
let status = resp.status().as_u16();
1576+
let body = resp.text().await.unwrap_or_default();
1577+
return Err(CliError::ConfigValidation(format!(
1578+
"Get pipe failed ({}): {}",
1579+
status, body
1580+
)));
1581+
}
1582+
1583+
let api: ApiResponse<PipeInstanceInfo> =
1584+
resp.json().await.map_err(|e| CliError::ConfigValidation(format!("Invalid pipe response: {}", e)))?;
1585+
1586+
Ok(api.item)
1587+
}
1588+
1589+
/// Create a pipe template.
1590+
///
1591+
/// `POST /api/v1/pipes/templates`
1592+
pub async fn create_pipe_template(
1593+
&self,
1594+
request: &CreatePipeTemplateApiRequest,
1595+
) -> Result<PipeTemplateInfo, CliError> {
1596+
let url = format!("{}/api/v1/pipes/templates", self.base_url);
1597+
let resp = self
1598+
.http
1599+
.post(&url)
1600+
.bearer_auth(&self.token)
1601+
.json(request)
1602+
.send()
1603+
.await
1604+
.map_err(|e| CliError::ConfigValidation(format!("Failed to create pipe template: {}", e)))?;
1605+
1606+
if !resp.status().is_success() {
1607+
let status = resp.status().as_u16();
1608+
let body = resp.text().await.unwrap_or_default();
1609+
return Err(CliError::ConfigValidation(format!(
1610+
"Create pipe template failed ({}): {}",
1611+
status, body
1612+
)));
1613+
}
1614+
1615+
let api: ApiResponse<PipeTemplateInfo> =
1616+
resp.json().await.map_err(|e| CliError::ConfigValidation(format!("Invalid template response: {}", e)))?;
1617+
1618+
api.item.ok_or_else(|| CliError::ConfigValidation("Empty template response".to_string()))
1619+
}
1620+
1621+
/// Create a pipe instance.
1622+
///
1623+
/// `POST /api/v1/pipes/instances`
1624+
pub async fn create_pipe_instance(
1625+
&self,
1626+
request: &CreatePipeInstanceApiRequest,
1627+
) -> Result<PipeInstanceInfo, CliError> {
1628+
let url = format!("{}/api/v1/pipes/instances", self.base_url);
1629+
let resp = self
1630+
.http
1631+
.post(&url)
1632+
.bearer_auth(&self.token)
1633+
.json(request)
1634+
.send()
1635+
.await
1636+
.map_err(|e| CliError::ConfigValidation(format!("Failed to create pipe instance: {}", e)))?;
1637+
1638+
if !resp.status().is_success() {
1639+
let status = resp.status().as_u16();
1640+
let body = resp.text().await.unwrap_or_default();
1641+
return Err(CliError::ConfigValidation(format!(
1642+
"Create pipe instance failed ({}): {}",
1643+
status, body
1644+
)));
1645+
}
1646+
1647+
let api: ApiResponse<PipeInstanceInfo> =
1648+
resp.json().await.map_err(|e| CliError::ConfigValidation(format!("Invalid instance response: {}", e)))?;
1649+
1650+
api.item.ok_or_else(|| CliError::ConfigValidation("Empty instance response".to_string()))
1651+
}
1652+
1653+
/// Update pipe instance status.
1654+
///
1655+
/// `PUT /api/v1/pipes/instances/{instance_id}/status`
1656+
pub async fn update_pipe_status(
1657+
&self,
1658+
instance_id: &str,
1659+
status: &str,
1660+
) -> Result<PipeInstanceInfo, CliError> {
1661+
let url = format!(
1662+
"{}/api/v1/pipes/instances/{}/status",
1663+
self.base_url, instance_id
1664+
);
1665+
let body = serde_json::json!({ "status": status });
1666+
let resp = self
1667+
.http
1668+
.put(&url)
1669+
.bearer_auth(&self.token)
1670+
.json(&body)
1671+
.send()
1672+
.await
1673+
.map_err(|e| CliError::ConfigValidation(format!("Failed to update pipe status: {}", e)))?;
1674+
1675+
if !resp.status().is_success() {
1676+
let status_code = resp.status().as_u16();
1677+
let body = resp.text().await.unwrap_or_default();
1678+
return Err(CliError::ConfigValidation(format!(
1679+
"Update pipe status failed ({}): {}",
1680+
status_code, body
1681+
)));
1682+
}
1683+
1684+
let api: ApiResponse<PipeInstanceInfo> =
1685+
resp.json().await.map_err(|e| CliError::ConfigValidation(format!("Invalid status response: {}", e)))?;
1686+
1687+
api.item.ok_or_else(|| CliError::ConfigValidation("Empty status response".to_string()))
1688+
}
1689+
1690+
/// List pipe templates visible to the current user.
1691+
///
1692+
/// `GET /api/v1/pipes/templates`
1693+
pub async fn list_pipe_templates(
1694+
&self,
1695+
source_app_type: Option<&str>,
1696+
target_app_type: Option<&str>,
1697+
) -> Result<Vec<PipeTemplateInfo>, CliError> {
1698+
let mut url = format!("{}/api/v1/pipes/templates", self.base_url);
1699+
let mut params = Vec::new();
1700+
if let Some(source) = source_app_type {
1701+
params.push(format!("source_app_type={}", source));
1702+
}
1703+
if let Some(target) = target_app_type {
1704+
params.push(format!("target_app_type={}", target));
1705+
}
1706+
if !params.is_empty() {
1707+
url.push('?');
1708+
url.push_str(&params.join("&"));
1709+
}
1710+
1711+
let resp = self
1712+
.http
1713+
.get(&url)
1714+
.bearer_auth(&self.token)
1715+
.send()
1716+
.await
1717+
.map_err(|e| CliError::ConfigValidation(format!("Failed to list templates: {}", e)))?;
1718+
1719+
if !resp.status().is_success() {
1720+
let status = resp.status().as_u16();
1721+
let body = resp.text().await.unwrap_or_default();
1722+
return Err(CliError::ConfigValidation(format!(
1723+
"List templates failed ({}): {}",
1724+
status, body
1725+
)));
1726+
}
1727+
1728+
let api: ApiResponse<PipeTemplateInfo> =
1729+
resp.json().await.map_err(|e| CliError::ConfigValidation(format!("Invalid templates response: {}", e)))?;
1730+
1731+
Ok(api.list.unwrap_or_default())
1732+
}
1733+
14291734
// ── Marketplace (creator) ────────────────────────
14301735

14311736
/// List the current user's marketplace template submissions.

0 commit comments

Comments
 (0)