Skip to content

Commit 538cfee

Browse files
khaliqgantProactive Runtime Botagent-relay-code[bot]
authored
Add writeback draft content identity to mount bulk writes (#253)
* fix: add writeback draft content identity * chore: apply pr-reviewer fixes for #253 * chore: apply pr-reviewer fixes for #253 * fix: add writeback draft content identity * chore: remove trajectory noise from writeback identity PR --------- Co-authored-by: Proactive Runtime Bot <agent@agent-relay.com> Co-authored-by: agent-relay-code[bot] <agent-relay-code[bot]@users.noreply.github.com>
1 parent 66f7117 commit 538cfee

11 files changed

Lines changed: 432 additions & 68 deletions

File tree

internal/mountsync/syncer.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"unicode/utf8"
3131

3232
"github.com/agentworkforce/relayfile/internal/digest"
33+
"github.com/agentworkforce/relayfile/internal/relayfile"
3334
"github.com/fsnotify/fsnotify"
3435
"nhooyr.io/websocket"
3536
"nhooyr.io/websocket/wsjson"
@@ -63,6 +64,11 @@ func (e *SchemaValidationError) Is(target error) bool {
6364

6465
const defaultBulkFlushThreshold = 256
6566

67+
const (
68+
mountWritebackCreateDraftContentIdentityKind = "mount-writeback-create-draft"
69+
mountWritebackCreateDraftContentIdentityTTLSeconds = 2592000
70+
)
71+
6672
// defaultFullPullEvery is the default cadence for the "trust but verify"
6773
// periodic full tree pull that runs from the incremental path. At 30s sync
6874
// intervals, 20 cycles is roughly every 10 minutes. This is the safety net
@@ -1577,7 +1583,7 @@ func (s *Syncer) flushPendingBulkWrites(ctx context.Context, pending []pendingBu
15771583
s.logf("writeback flush refused: cloud-error circuit breaker is open; %d file(s) remain pending", len(pending))
15781584
return nil
15791585
}
1580-
for _, chunk := range chunkPendingBulkWrites(pending, maxWritebackBatchBytes()) {
1586+
for _, chunk := range chunkPendingBulkWrites(s.workspace, pending, maxWritebackBatchBytes()) {
15811587
if err := s.flushPendingBulkWriteChunk(ctx, chunk, conflicted); err != nil {
15821588
return err
15831589
}
@@ -1586,16 +1592,7 @@ func (s *Syncer) flushPendingBulkWrites(ctx context.Context, pending []pendingBu
15861592
}
15871593

15881594
func (s *Syncer) flushPendingBulkWriteChunk(ctx context.Context, pending []pendingBulkWrite, conflicted map[string]struct{}) error {
1589-
files := make([]BulkWriteFile, 0, len(pending))
1590-
for _, pendingWrite := range pending {
1591-
files = append(files, BulkWriteFile{
1592-
Path: pendingWrite.remotePath,
1593-
ContentType: pendingWrite.snapshot.ContentType,
1594-
Content: pendingWrite.snapshot.WireContent,
1595-
Encoding: pendingWrite.snapshot.Encoding,
1596-
})
1597-
}
1598-
1595+
files := bulkWriteFilesForPending(s.workspace, pending)
15991596
response, err := s.client.WriteFilesBulk(ctx, s.workspace, files)
16001597
if err != nil {
16011598
s.recordCloudFailure(err)
@@ -1637,20 +1634,36 @@ func (s *Syncer) flushPendingBulkWriteChunk(ctx context.Context, pending []pendi
16371634
return firstErr
16381635
}
16391636

1640-
func bulkWriteFilesForPending(pending []pendingBulkWrite) []BulkWriteFile {
1637+
func bulkWriteFilesForPending(workspaceID string, pending []pendingBulkWrite) []BulkWriteFile {
16411638
files := make([]BulkWriteFile, 0, len(pending))
16421639
for _, pendingWrite := range pending {
16431640
files = append(files, BulkWriteFile{
1644-
Path: pendingWrite.remotePath,
1645-
ContentType: pendingWrite.snapshot.ContentType,
1646-
Content: pendingWrite.snapshot.WireContent,
1647-
Encoding: pendingWrite.snapshot.Encoding,
1641+
Path: pendingWrite.remotePath,
1642+
ContentType: pendingWrite.snapshot.ContentType,
1643+
Content: pendingWrite.snapshot.WireContent,
1644+
Encoding: pendingWrite.snapshot.Encoding,
1645+
ContentIdentity: mountWritebackCreateDraftContentIdentity(workspaceID, pendingWrite.remotePath, pendingWrite.snapshot.Hash),
16481646
})
16491647
}
16501648
return files
16511649
}
16521650

1653-
func chunkPendingBulkWrites(pending []pendingBulkWrite, maxBytes int64) [][]pendingBulkWrite {
1651+
func mountWritebackCreateDraftContentIdentity(workspaceID, normalizedRemotePath, contentHash string) *ContentIdentity {
1652+
if !relayfile.IsDraftFilePath(normalizedRemotePath) {
1653+
return nil
1654+
}
1655+
return newMountWritebackCreateDraftContentIdentity(workspaceID, normalizedRemotePath, contentHash)
1656+
}
1657+
1658+
func newMountWritebackCreateDraftContentIdentity(workspaceID, normalizedRemotePath, contentHash string) *ContentIdentity {
1659+
return &ContentIdentity{
1660+
Kind: mountWritebackCreateDraftContentIdentityKind,
1661+
Key: fmt.Sprintf("%s:%s:%s", workspaceID, normalizedRemotePath, contentHash),
1662+
TTLSeconds: mountWritebackCreateDraftContentIdentityTTLSeconds,
1663+
}
1664+
}
1665+
1666+
func chunkPendingBulkWrites(workspaceID string, pending []pendingBulkWrite, maxBytes int64) [][]pendingBulkWrite {
16541667
if len(pending) == 0 {
16551668
return nil
16561669
}
@@ -1661,7 +1674,7 @@ func chunkPendingBulkWrites(pending []pendingBulkWrite, maxBytes int64) [][]pend
16611674
current := make([]pendingBulkWrite, 0, len(pending))
16621675
for _, item := range pending {
16631676
candidate := append(append([]pendingBulkWrite(nil), current...), item)
1664-
if len(current) > 0 && bulkWriteRequestSize(bulkWriteFilesForPending(candidate)) > maxBytes {
1677+
if len(current) > 0 && bulkWriteRequestSize(bulkWriteFilesForPending(workspaceID, candidate)) > maxBytes {
16651678
chunks = append(chunks, append([]pendingBulkWrite(nil), current...))
16661679
current = current[:0]
16671680
}

internal/mountsync/syncer_test.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,183 @@ func TestBulkWrite_SingleCallForNFiles(t *testing.T) {
18121812
}
18131813
}
18141814

1815+
func TestBulkWrite_ContentIdentityGoldenVector(t *testing.T) {
1816+
const (
1817+
workspaceID = "ws_test"
1818+
remotePath = "/slack/channels/C123/messages/messages 5ab77d67.json"
1819+
content = "{\"channel\":\"C123\",\"text\":\"hello writeback idempotency\"}\n"
1820+
contentHash = "751f9591557700f69b5ceefcdec7ead8563a10f0a712c501a5028699be021511"
1821+
key = "ws_test:/slack/channels/C123/messages/messages 5ab77d67.json:751f9591557700f69b5ceefcdec7ead8563a10f0a712c501a5028699be021511"
1822+
)
1823+
1824+
snapshot := newLocalSnapshot(remotePath, []byte(content))
1825+
if snapshot.Hash != contentHash {
1826+
t.Fatalf("golden vector hash = %q, want %q", snapshot.Hash, contentHash)
1827+
}
1828+
identity := newMountWritebackCreateDraftContentIdentity(workspaceID, remotePath, snapshot.Hash)
1829+
if identity.Kind != mountWritebackCreateDraftContentIdentityKind {
1830+
t.Fatalf("content identity kind = %q, want %q", identity.Kind, mountWritebackCreateDraftContentIdentityKind)
1831+
}
1832+
if identity.Key != key {
1833+
t.Fatalf("content identity key = %q, want %q", identity.Key, key)
1834+
}
1835+
if identity.TTLSeconds != mountWritebackCreateDraftContentIdentityTTLSeconds {
1836+
t.Fatalf("content identity ttl = %d, want %d", identity.TTLSeconds, mountWritebackCreateDraftContentIdentityTTLSeconds)
1837+
}
1838+
if identity.TTLSeconds != 2592000 {
1839+
t.Fatalf("golden vector ttl = %d, want 2592000", identity.TTLSeconds)
1840+
}
1841+
if strings.TrimSpace(identity.Key) != identity.Key {
1842+
t.Fatalf("golden vector key should be trim-stable: %q", identity.Key)
1843+
}
1844+
if !strings.Contains(identity.Key, "messages 5ab77d67.json") {
1845+
t.Fatalf("golden vector key should preserve the internal path space: %q", identity.Key)
1846+
}
1847+
}
1848+
1849+
func TestBulkWrite_ContentIdentityStabilityAndIsolation(t *testing.T) {
1850+
const (
1851+
workspaceID = "ws_test"
1852+
remotePath = "/slack/channels/C123/messages/messages 5ab77d67-1111-4111-8111-123456789abc.json"
1853+
)
1854+
1855+
snapshot := newLocalSnapshot(remotePath, []byte("{\"text\":\"same\"}\n"))
1856+
files := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1857+
remotePath: remotePath,
1858+
snapshot: snapshot,
1859+
}})
1860+
reupload := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1861+
remotePath: remotePath,
1862+
snapshot: snapshot,
1863+
}})
1864+
if files[0].ContentIdentity == nil || reupload[0].ContentIdentity == nil {
1865+
t.Fatal("expected content identity on both bulk files")
1866+
}
1867+
if files[0].ContentIdentity.Key != reupload[0].ContentIdentity.Key {
1868+
t.Fatalf("same draft re-upload key changed: %q vs %q", files[0].ContentIdentity.Key, reupload[0].ContentIdentity.Key)
1869+
}
1870+
1871+
edited := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1872+
remotePath: remotePath,
1873+
snapshot: newLocalSnapshot(remotePath, []byte("{\"text\":\"edited\"}\n")),
1874+
}})
1875+
if edited[0].ContentIdentity.Key == files[0].ContentIdentity.Key {
1876+
t.Fatalf("edited draft content should change key %q", edited[0].ContentIdentity.Key)
1877+
}
1878+
1879+
otherPath := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1880+
remotePath: "/slack/channels/C123/messages/messages 6ab77d67-2222-4222-8222-123456789abc.json",
1881+
snapshot: snapshot,
1882+
}})
1883+
if otherPath[0].ContentIdentity.Key == files[0].ContentIdentity.Key {
1884+
t.Fatalf("different draft path should change key %q", otherPath[0].ContentIdentity.Key)
1885+
}
1886+
1887+
otherWorkspace := bulkWriteFilesForPending("ws_other", []pendingBulkWrite{{
1888+
remotePath: remotePath,
1889+
snapshot: snapshot,
1890+
}})
1891+
if otherWorkspace[0].ContentIdentity.Key == files[0].ContentIdentity.Key {
1892+
t.Fatalf("different workspace should change key %q", otherWorkspace[0].ContentIdentity.Key)
1893+
}
1894+
}
1895+
1896+
func TestBulkWrite_ContentIdentityOnlyForCreateDraftPaths(t *testing.T) {
1897+
const workspaceID = "ws_test"
1898+
snapshot := newLocalSnapshot("/notion/pages/pages 5ab77d67-1111-4111-8111-123456789abc.json", []byte("{}\n"))
1899+
1900+
draft := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1901+
remotePath: "/notion/pages/pages 5ab77d67-1111-4111-8111-123456789abc.json",
1902+
snapshot: snapshot,
1903+
}})
1904+
if draft[0].ContentIdentity == nil {
1905+
t.Fatal("expected content identity for space-uuid create draft path")
1906+
}
1907+
1908+
stable := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1909+
remotePath: "/notion/pages/page-1.md",
1910+
snapshot: snapshot,
1911+
}})
1912+
if stable[0].ContentIdentity != nil {
1913+
t.Fatalf("stable non-draft path should not carry content identity: %+v", stable[0].ContentIdentity)
1914+
}
1915+
1916+
nonUUID := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1917+
remotePath: "/slack/channels/C123/messages/messages not-a-uuid.json",
1918+
snapshot: snapshot,
1919+
}})
1920+
if nonUUID[0].ContentIdentity != nil {
1921+
t.Fatalf("non-uuid draft-like path should not carry content identity: %+v", nonUUID[0].ContentIdentity)
1922+
}
1923+
}
1924+
1925+
func TestBulkWrite_ContentIdentityOmittedForStablePathRevert(t *testing.T) {
1926+
const (
1927+
workspaceID = "ws_test"
1928+
remotePath = "/notion/pages/page-1.md"
1929+
)
1930+
1931+
for _, content := range []string{"# C\n", "# D\n", "# C\n"} {
1932+
files := bulkWriteFilesForPending(workspaceID, []pendingBulkWrite{{
1933+
remotePath: remotePath,
1934+
snapshot: newLocalSnapshot(remotePath, []byte(content)),
1935+
}})
1936+
if files[0].ContentIdentity != nil {
1937+
t.Fatalf("stable path content %q should not carry content identity: %+v", content, files[0].ContentIdentity)
1938+
}
1939+
}
1940+
}
1941+
1942+
func TestBulkWrite_FlushSendsContentIdentity(t *testing.T) {
1943+
const (
1944+
workspaceID = "ws_test"
1945+
content = "{\"channel\":\"C123\",\"text\":\"hello writeback idempotency\"}\n"
1946+
draftID = "5ab77d67-1111-4111-8111-123456789abc"
1947+
key = "ws_test:/slack/channels/C123/messages/messages 5ab77d67-1111-4111-8111-123456789abc.json:751f9591557700f69b5ceefcdec7ead8563a10f0a712c501a5028699be021511"
1948+
)
1949+
1950+
client := &fakeClient{files: map[string]RemoteFile{}}
1951+
localDir := t.TempDir()
1952+
messageDir := filepath.Join(localDir, "slack", "channels", "C123", "messages")
1953+
if err := os.MkdirAll(messageDir, 0o755); err != nil {
1954+
t.Fatalf("mkdir message dir failed: %v", err)
1955+
}
1956+
if err := os.WriteFile(filepath.Join(messageDir, "messages "+draftID+".json"), []byte(content), 0o644); err != nil {
1957+
t.Fatalf("write draft failed: %v", err)
1958+
}
1959+
1960+
syncer, err := NewSyncer(client, SyncerOptions{
1961+
WorkspaceID: workspaceID,
1962+
RemoteRoot: "/",
1963+
LocalRoot: localDir,
1964+
})
1965+
if err != nil {
1966+
t.Fatalf("new syncer failed: %v", err)
1967+
}
1968+
if err := syncer.SyncOnce(context.Background()); err != nil {
1969+
t.Fatalf("sync once failed: %v", err)
1970+
}
1971+
if got := len(client.bulkWriteBatches); got != 1 {
1972+
t.Fatalf("expected one bulk write batch, got %d", got)
1973+
}
1974+
if got := len(client.bulkWriteBatches[0]); got != 1 {
1975+
t.Fatalf("expected one bulk write file, got %d", got)
1976+
}
1977+
identity := client.bulkWriteBatches[0][0].ContentIdentity
1978+
if identity == nil {
1979+
t.Fatal("expected flushed bulk file to carry content identity")
1980+
}
1981+
if identity.Kind != mountWritebackCreateDraftContentIdentityKind {
1982+
t.Fatalf("flushed identity kind = %q, want %q", identity.Kind, mountWritebackCreateDraftContentIdentityKind)
1983+
}
1984+
if identity.Key != key {
1985+
t.Fatalf("flushed identity key = %q, want %q", identity.Key, key)
1986+
}
1987+
if identity.TTLSeconds != 2592000 {
1988+
t.Fatalf("flushed identity ttl = %d, want 2592000", identity.TTLSeconds)
1989+
}
1990+
}
1991+
18151992
func TestBulkWrite_MixedCreateAndUpdateBatch(t *testing.T) {
18161993
client := &fakeClient{
18171994
files: map[string]RemoteFile{

internal/mountsync/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99

1010
type BulkWriteFile = relayfile.BulkWriteFile
1111

12+
type ContentIdentity = relayfile.ContentIdentity
13+
1214
type BulkWriteError = relayfile.BulkWriteError
1315

1416
type BulkWriteResult = relayfile.BulkWriteResult

internal/relayfile/draft_reconcile.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ func isDraftFileBasename(basename string) bool {
104104
return draftFileBasenamePattern.MatchString(basename)
105105
}
106106

107+
// IsDraftFilePath reports whether a path matches the relayfile-owned
108+
// draftFile() create-draft basename contract.
109+
func IsDraftFilePath(filePath string) bool {
110+
return isDraftFileBasename(basenameOf(filePath))
111+
}
112+
107113
// reconcileAckedDraftLocked applies the draftFile() rename contract after a
108114
// successful externally-executed writeback: the agent-authored draft is
109115
// renamed to the canonical id (or removed when the canonical record already

0 commit comments

Comments
 (0)