Skip to content

Commit e06e98d

Browse files
author
ddx-land-coordinator
committed
Merge bead ddx-b5816493 attempt 20260521T031817- into main
2 parents 29fd06b + 3d34a81 commit e06e98d

3 files changed

Lines changed: 170 additions & 0 deletions

File tree

cli/internal/agent/execute_bead_intake_test.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,3 +1556,152 @@ func TestReadinessUnavailableOutputIsActionable(t *testing.T) {
15561556
// Fail-open: execution must proceed.
15571557
assert.Equal(t, 1, result.Successes)
15581558
}
1559+
1560+
func TestIntakeBlockedCarriesRuleFingerprint(t *testing.T) {
1561+
inner, candidate, _ := newExecuteLoopTestStore(t)
1562+
store := &claimCountingStore{Store: inner}
1563+
1564+
worker := &ExecuteBeadWorker{
1565+
Store: store,
1566+
Executor: ExecuteBeadExecutorFunc(func(ctx context.Context, beadID string) (ExecuteBeadReport, error) {
1567+
return ExecuteBeadReport{
1568+
BeadID: beadID,
1569+
Status: ExecuteBeadStatusSuccess,
1570+
SessionID: "sess-rule-fp",
1571+
ResultRev: "abc123",
1572+
}, nil
1573+
}),
1574+
}
1575+
1576+
cfgOpts := config.TestLoopConfigOpts{Assignee: "worker"}
1577+
rcfg := config.NewTestConfigForLoop(cfgOpts).Resolve(config.TestLoopOverrides(cfgOpts))
1578+
1579+
result, err := worker.Run(context.Background(), rcfg, ExecuteBeadLoopRuntime{
1580+
Once: true,
1581+
TargetBeadID: candidate.ID,
1582+
PreClaimIntakeHook: func(ctx context.Context, beadID string) (PreClaimIntakeResult, error) {
1583+
return PreClaimIntakeResult{
1584+
Outcome: PreClaimIntakeOperatorRequired,
1585+
Reason: "test_block",
1586+
Detail: "test detail",
1587+
}, nil
1588+
},
1589+
})
1590+
require.NoError(t, err)
1591+
require.NotNil(t, result)
1592+
1593+
got, err := inner.Get(candidate.ID)
1594+
require.NoError(t, err)
1595+
1596+
events, err := inner.Events(got.ID)
1597+
require.NoError(t, err)
1598+
1599+
blockedEvent := -1
1600+
for i, ev := range events {
1601+
if ev.Kind == "intake.blocked" {
1602+
blockedEvent = i
1603+
break
1604+
}
1605+
}
1606+
require.NotEqual(t, -1, blockedEvent, "must have an intake.blocked event")
1607+
1608+
var body map[string]any
1609+
err = json.Unmarshal([]byte(events[blockedEvent].Body), &body)
1610+
require.NoError(t, err)
1611+
1612+
ruleFp, ok := body["rule_fingerprint"].(string)
1613+
assert.True(t, ok, "rule_fingerprint must be a string")
1614+
assert.NotEmpty(t, ruleFp, "rule_fingerprint must not be empty")
1615+
}
1616+
1617+
func TestIntakeBlockedFingerprintDedupes(t *testing.T) {
1618+
inner, candidate, _ := newExecuteLoopTestStore(t)
1619+
store := &claimCountingStore{Store: inner}
1620+
1621+
worker := &ExecuteBeadWorker{
1622+
Store: store,
1623+
Executor: ExecuteBeadExecutorFunc(func(ctx context.Context, beadID string) (ExecuteBeadReport, error) {
1624+
return ExecuteBeadReport{
1625+
BeadID: beadID,
1626+
Status: ExecuteBeadStatusSuccess,
1627+
SessionID: "sess-dedup",
1628+
ResultRev: "abc123",
1629+
}, nil
1630+
}),
1631+
}
1632+
1633+
cfgOpts := config.TestLoopConfigOpts{Assignee: "worker"}
1634+
rcfg := config.NewTestConfigForLoop(cfgOpts).Resolve(config.TestLoopOverrides(cfgOpts))
1635+
1636+
intakeHook := func(ctx context.Context, beadID string) (PreClaimIntakeResult, error) {
1637+
return PreClaimIntakeResult{
1638+
Outcome: PreClaimIntakeOperatorRequired,
1639+
Reason: "test_block",
1640+
Detail: "test detail",
1641+
}, nil
1642+
}
1643+
1644+
// First run
1645+
result1, err := worker.Run(context.Background(), rcfg, ExecuteBeadLoopRuntime{
1646+
Once: true,
1647+
TargetBeadID: candidate.ID,
1648+
PreClaimIntakeHook: intakeHook,
1649+
})
1650+
require.NoError(t, err)
1651+
require.NotNil(t, result1)
1652+
1653+
got1, err := inner.Get(candidate.ID)
1654+
require.NoError(t, err)
1655+
1656+
events1, err := inner.Events(got1.ID)
1657+
require.NoError(t, err)
1658+
1659+
blockedCount1 := 0
1660+
var fp1 string
1661+
for _, ev := range events1 {
1662+
if ev.Kind == "intake.blocked" {
1663+
blockedCount1++
1664+
var body map[string]any
1665+
json.Unmarshal([]byte(ev.Body), &body)
1666+
if ruleFp, ok := body["rule_fingerprint"].(string); ok {
1667+
fp1 = ruleFp
1668+
}
1669+
}
1670+
}
1671+
require.Equal(t, 1, blockedCount1, "first run must have exactly one intake.blocked event")
1672+
assert.NotEmpty(t, fp1, "first run must have a rule_fingerprint")
1673+
1674+
// Unclaim the bead so we can run again
1675+
err = inner.Unclaim(candidate.ID)
1676+
require.NoError(t, err)
1677+
1678+
// Second run with identical inputs
1679+
result2, err := worker.Run(context.Background(), rcfg, ExecuteBeadLoopRuntime{
1680+
Once: true,
1681+
TargetBeadID: candidate.ID,
1682+
PreClaimIntakeHook: intakeHook,
1683+
})
1684+
require.NoError(t, err)
1685+
require.NotNil(t, result2)
1686+
1687+
got2, err := inner.Get(candidate.ID)
1688+
require.NoError(t, err)
1689+
1690+
events2, err := inner.Events(got2.ID)
1691+
require.NoError(t, err)
1692+
1693+
blockedCount2 := 0
1694+
var fp2 string
1695+
for _, ev := range events2 {
1696+
if ev.Kind == "intake.blocked" {
1697+
blockedCount2++
1698+
var body map[string]any
1699+
json.Unmarshal([]byte(ev.Body), &body)
1700+
if ruleFp, ok := body["rule_fingerprint"].(string); ok {
1701+
fp2 = ruleFp
1702+
}
1703+
}
1704+
}
1705+
assert.Equal(t, 1, blockedCount2, "second run with identical inputs must still have exactly one intake.blocked event (dedup)")
1706+
assert.Equal(t, fp1, fp2, "fingerprints must match across runs")
1707+
}

cli/internal/agent/execute_bead_loop.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4061,6 +4061,7 @@ func parkBeadPostIntakeRejection(store ExecuteBeadLoopStore, candidate *bead.Bea
40614061
},
40624062
)
40634063
body["fingerprint"] = findingFingerprint
4064+
body["rule_fingerprint"] = findingFingerprint
40644065
body["prompt_fingerprint"] = promptFingerprint
40654066

40664067
if err := store.ParkToProposedWithIntakeEvent(candidate.ID, actor, string(outcome), reason, detail, body, at, func(b *bead.Bead) {

cli/internal/bead/park.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,32 @@ func (s *Store) ParkToProposed(id string, reason ParkReason, mutate func(*Bead))
6565
// appends an intake.blocked event. It atomically performs the transition and
6666
// appends the event. The mutate callback runs after the status transition and
6767
// before the event is appended; pass nil if no additional mutations are needed.
68+
// If the body contains a rule_fingerprint, dedup checks existing intake.blocked
69+
// events and skips the append if a matching fingerprint already exists.
6870
func (s *Store) ParkToProposedWithIntakeEvent(id, actor, outcome, reason, detail string, body map[string]any, at time.Time, mutate func(*Bead)) error {
6971
// Transition to proposed with intake rejection
7072
if err := s.ParkToProposed(id, ParkIntakeRejection, mutate); err != nil {
7173
return err
7274
}
7375

76+
// Check for rule_fingerprint dedup
77+
ruleFp, ok := body["rule_fingerprint"].(string)
78+
if ok && ruleFp != "" {
79+
events, err := s.Events(id)
80+
if err == nil {
81+
for _, ev := range events {
82+
if ev.Kind == "intake.blocked" {
83+
var existing map[string]any
84+
if err := json.Unmarshal([]byte(ev.Body), &existing); err == nil {
85+
if existing["rule_fingerprint"] == ruleFp {
86+
return nil
87+
}
88+
}
89+
}
90+
}
91+
}
92+
}
93+
7494
// Append the intake.blocked event
7595
bodyJSON, _ := json.Marshal(body)
7696
return s.AppendEvent(id, BeadEvent{

0 commit comments

Comments
 (0)