|
| 1 | +// Copyright (c) 2026 Lark Technologies Pte. Ltd. |
| 2 | +// SPDX-License-Identifier: MIT |
| 3 | + |
| 4 | +package convertlib |
| 5 | + |
| 6 | +import ( |
| 7 | + "context" |
| 8 | + "sync" |
| 9 | + |
| 10 | + "github.com/larksuite/cli/shortcuts/common" |
| 11 | +) |
| 12 | + |
| 13 | +// resourceDownloadConcurrency caps in-flight resource downloads. Each download |
| 14 | +// is a GET plus a local disk write; capping at 3 keeps the |
| 15 | +// messages/{id}/resources/{key} endpoint well under any gateway-layer rate |
| 16 | +// ceiling while still cutting wall-clock versus a serial loop. |
| 17 | +const resourceDownloadConcurrency = 3 |
| 18 | + |
| 19 | +// ResourceDownloader downloads one resource and returns its local path and |
| 20 | +// size in bytes. messageID is the resource's owning message id (the download |
| 21 | +// API path parameter), key is the file_key/image_key, and fileType is the |
| 22 | +// download API resource type ("image" or "file"). A non-nil error means the |
| 23 | +// single resource failed; the engine isolates that failure (fail-silent). |
| 24 | +type ResourceDownloader func(ctx context.Context, messageID, key, fileType string) (string, int64, error) |
| 25 | + |
| 26 | +// EnrichResourceDownloads walks every message node (including nested |
| 27 | +// thread_replies) for "resources" blocks attached during formatting, downloads |
| 28 | +// each distinct (message_id, key) once with bounded concurrency, and fills |
| 29 | +// local_path/size_bytes back into every ref sharing that key. A single |
| 30 | +// resource failing is isolated: its ref is flagged "error": true and a warning |
| 31 | +// is written to stderr, while the main message and the other resources are |
| 32 | +// unaffected (S2.STA-DES-P0-002 weak-dependency isolation). |
| 33 | +func EnrichResourceDownloads(runtime *common.RuntimeContext, messages []map[string]interface{}, dl ResourceDownloader) { |
| 34 | + if len(messages) == 0 || dl == nil { |
| 35 | + return |
| 36 | + } |
| 37 | + |
| 38 | + type refKey struct { |
| 39 | + messageID string |
| 40 | + key string |
| 41 | + } |
| 42 | + groups := make(map[refKey][]map[string]interface{}) |
| 43 | + types := make(map[refKey]string) |
| 44 | + var order []refKey |
| 45 | + |
| 46 | + collectResourceRefs(messages, func(ref map[string]interface{}) { |
| 47 | + messageID, _ := ref["message_id"].(string) |
| 48 | + key, _ := ref["key"].(string) |
| 49 | + if messageID == "" || key == "" { |
| 50 | + return |
| 51 | + } |
| 52 | + rk := refKey{messageID: messageID, key: key} |
| 53 | + if _, seen := groups[rk]; !seen { |
| 54 | + order = append(order, rk) |
| 55 | + if t, _ := ref["type"].(string); t != "" { |
| 56 | + types[rk] = t |
| 57 | + } |
| 58 | + } |
| 59 | + groups[rk] = append(groups[rk], ref) |
| 60 | + }) |
| 61 | + if len(order) == 0 { |
| 62 | + return |
| 63 | + } |
| 64 | + |
| 65 | + ctx := runtime.Ctx() |
| 66 | + var stderrMu sync.Mutex |
| 67 | + |
| 68 | + download := func(rk refKey) { |
| 69 | + if err := ctx.Err(); err != nil { |
| 70 | + return |
| 71 | + } |
| 72 | + localPath, size, err := dl(ctx, rk.messageID, rk.key, types[rk]) |
| 73 | + if err != nil { |
| 74 | + warnSyncf(&stderrMu, runtime.IO().ErrOut, |
| 75 | + "warning: resource_download_failed: %s/%s: %v\n", rk.messageID, rk.key, err) |
| 76 | + for _, ref := range groups[rk] { |
| 77 | + ref["error"] = true |
| 78 | + } |
| 79 | + return |
| 80 | + } |
| 81 | + for _, ref := range groups[rk] { |
| 82 | + ref["local_path"] = localPath |
| 83 | + ref["size_bytes"] = size |
| 84 | + } |
| 85 | + } |
| 86 | + |
| 87 | + // Single-resource fast path: no goroutine overhead, deterministic stderr. |
| 88 | + if len(order) == 1 { |
| 89 | + download(order[0]) |
| 90 | + return |
| 91 | + } |
| 92 | + |
| 93 | + // Bounded-concurrency fan-out. Each goroutine writes only to its own |
| 94 | + // (message_id, key) group's ref maps — distinct keys map to distinct ref |
| 95 | + // maps, so there is no shared mutable state besides the stderr mutex. |
| 96 | + sem := make(chan struct{}, resourceDownloadConcurrency) |
| 97 | + var wg sync.WaitGroup |
| 98 | + for _, rk := range order { |
| 99 | + wg.Add(1) |
| 100 | + sem <- struct{}{} |
| 101 | + go func() { |
| 102 | + defer wg.Done() |
| 103 | + defer func() { <-sem }() |
| 104 | + download(rk) |
| 105 | + }() |
| 106 | + } |
| 107 | + wg.Wait() |
| 108 | +} |
| 109 | + |
| 110 | +// collectResourceRefs walks messages (and nested thread_replies) and invokes fn |
| 111 | +// for every resource ref map found in each node's "resources" block. Handles |
| 112 | +// both the typed []map[string]interface{} (in-memory, set by formatMessageItem) |
| 113 | +// and []interface{} (post JSON round-trip) shapes, mirroring collectMessageNodes. |
| 114 | +func collectResourceRefs(messages []map[string]interface{}, fn func(ref map[string]interface{})) { |
| 115 | + for _, msg := range messages { |
| 116 | + switch res := msg["resources"].(type) { |
| 117 | + case []map[string]interface{}: |
| 118 | + for _, ref := range res { |
| 119 | + fn(ref) |
| 120 | + } |
| 121 | + case []interface{}: |
| 122 | + for _, raw := range res { |
| 123 | + if ref, ok := raw.(map[string]interface{}); ok { |
| 124 | + fn(ref) |
| 125 | + } |
| 126 | + } |
| 127 | + } |
| 128 | + switch nested := msg["thread_replies"].(type) { |
| 129 | + case []map[string]interface{}: |
| 130 | + collectResourceRefs(nested, fn) |
| 131 | + case []interface{}: |
| 132 | + typed := make([]map[string]interface{}, 0, len(nested)) |
| 133 | + for _, raw := range nested { |
| 134 | + if m, ok := raw.(map[string]interface{}); ok { |
| 135 | + typed = append(typed, m) |
| 136 | + } |
| 137 | + } |
| 138 | + collectResourceRefs(typed, fn) |
| 139 | + } |
| 140 | + } |
| 141 | +} |
0 commit comments