Skip to content

Commit 2f79fe0

Browse files
add support for batch updates (#172)
1 parent e718607 commit 2f79fe0

2 files changed

Lines changed: 254 additions & 0 deletions

File tree

moderation.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package stream
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
)
78

@@ -100,6 +101,88 @@ func (c *ModerationClient) updateStatus(ctx context.Context, r updateStatusReque
100101
return err
101102
}
102103

104+
type UpdateStatusBatchRequest struct {
105+
ModeratorID string `json:"moderator_id"`
106+
Updates []UpdateStatusBatchItem `json:"updates"`
107+
}
108+
109+
type UpdateStatusBatchItem struct {
110+
EntityType string `json:"entity_type"`
111+
EntityID string `json:"entity_id"`
112+
Status string `json:"status"`
113+
RecommendedAction string `json:"recommended_action"`
114+
LatestModeratorAction string `json:"latest_moderator_action"`
115+
}
116+
117+
type UpdateStatusBatchResponse struct {
118+
BaseResponse
119+
Results []UpdateStatusBatchResult `json:"results"`
120+
}
121+
122+
type UpdateStatusBatchResult struct {
123+
EntityID string `json:"entity_id"`
124+
EntityType string `json:"entity_type"`
125+
Success bool `json:"success"`
126+
Error string `json:"error,omitempty"`
127+
}
128+
129+
func (c *ModerationClient) UpdateStatusBatch(ctx context.Context, req UpdateStatusBatchRequest) (*UpdateStatusBatchResponse, error) {
130+
// Validate the request
131+
if err := validateUpdateStatusBatchRequest(req); err != nil {
132+
return nil, err
133+
}
134+
135+
endpoint := c.client.makeEndpoint("moderation/status/batch/")
136+
137+
resp, err := c.client.post(ctx, endpoint, req, c.client.authenticator.moderationAuth)
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
var result UpdateStatusBatchResponse
143+
if err := json.Unmarshal(resp, &result); err != nil {
144+
return nil, err
145+
}
146+
147+
return &result, nil
148+
}
149+
150+
func validateUpdateStatusBatchRequest(req UpdateStatusBatchRequest) error {
151+
// Check moderator ID
152+
if req.ModeratorID == "" {
153+
return fmt.Errorf("moderator_id is required")
154+
}
155+
156+
// Check maximum limit
157+
if len(req.Updates) > 100 {
158+
return fmt.Errorf("maximum of 100 updates allowed, got %d", len(req.Updates))
159+
}
160+
161+
// Check for empty updates
162+
if len(req.Updates) == 0 {
163+
return fmt.Errorf("at least one update is required")
164+
}
165+
166+
// Check for duplicate entity type/ID combinations
167+
seen := make(map[string]bool)
168+
for i, update := range req.Updates {
169+
if update.EntityType == "" {
170+
return fmt.Errorf("entity_type is required for update at index %d", i)
171+
}
172+
if update.EntityID == "" {
173+
return fmt.Errorf("entity_id is required for update at index %d", i)
174+
}
175+
176+
key := update.EntityType + ":" + update.EntityID
177+
if seen[key] {
178+
return fmt.Errorf("duplicate entity found: entity_type=%s, entity_id=%s", update.EntityType, update.EntityID)
179+
}
180+
seen[key] = true
181+
}
182+
183+
return nil
184+
}
185+
103186
func (c *ModerationClient) InvalidateUserCache(ctx context.Context, userID string) error {
104187
if userID == "" {
105188
return fmt.Errorf("empty userID")

moderation_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package stream_test
22

33
import (
44
"context"
5+
"fmt"
56
"net/http"
67
"testing"
78

89
"github.com/stretchr/testify/require"
10+
11+
stream "github.com/GetStream/stream-go2/v8"
912
)
1013

1114
func TestFlagActivity(t *testing.T) {
@@ -78,6 +81,174 @@ func TestUpdateReactionModerationStatus(t *testing.T) {
7881
)
7982
}
8083

84+
func TestUpdateStatusBatch(t *testing.T) {
85+
ctx := context.Background()
86+
client, requester := newClient(t)
87+
88+
batchReq := stream.UpdateStatusBatchRequest{
89+
ModeratorID: "moderator_123",
90+
Updates: []stream.UpdateStatusBatchItem{
91+
{
92+
EntityType: "stream:feeds:v2:activity",
93+
EntityID: "activity_1",
94+
Status: "complete",
95+
RecommendedAction: "watch",
96+
LatestModeratorAction: "mark_safe",
97+
},
98+
{
99+
EntityType: "stream:feeds:v2:reaction",
100+
EntityID: "reaction_1",
101+
Status: "complete",
102+
RecommendedAction: "flag",
103+
LatestModeratorAction: "mark_harmful",
104+
},
105+
},
106+
}
107+
108+
// Mock the response
109+
requester.resp = `{
110+
"results": [
111+
{
112+
"entity_id": "activity_1",
113+
"entity_type": "stream:feeds:v2:activity",
114+
"success": true
115+
},
116+
{
117+
"entity_id": "reaction_1",
118+
"entity_type": "stream:feeds:v2:reaction",
119+
"success": true
120+
}
121+
]
122+
}`
123+
124+
resp, err := client.Moderation().UpdateStatusBatch(ctx, batchReq)
125+
require.NoError(t, err)
126+
require.NotNil(t, resp)
127+
require.Len(t, resp.Results, 2)
128+
require.Equal(t, "activity_1", resp.Results[0].EntityID)
129+
require.Equal(t, "stream:feeds:v2:activity", resp.Results[0].EntityType)
130+
require.True(t, resp.Results[0].Success)
131+
require.Equal(t, "reaction_1", resp.Results[1].EntityID)
132+
require.Equal(t, "stream:feeds:v2:reaction", resp.Results[1].EntityType)
133+
require.True(t, resp.Results[1].Success)
134+
135+
testRequest(
136+
t,
137+
requester.req,
138+
http.MethodPost,
139+
"https://api.stream-io-api.com/api/v1.0/moderation/status/batch/?api_key=key",
140+
`{"moderator_id":"moderator_123","updates":[{"entity_type":"stream:feeds:v2:activity","entity_id":"activity_1","status":"complete","recommended_action":"watch","latest_moderator_action":"mark_safe"},{"entity_type":"stream:feeds:v2:reaction","entity_id":"reaction_1","status":"complete","recommended_action":"flag","latest_moderator_action":"mark_harmful"}]}`,
141+
)
142+
}
143+
144+
func TestUpdateStatusBatchValidation(t *testing.T) {
145+
ctx := context.Background()
146+
client, _ := newClient(t)
147+
148+
// Test missing moderator ID
149+
t.Run("missing moderator id", func(t *testing.T) {
150+
batchReq := stream.UpdateStatusBatchRequest{
151+
ModeratorID: "", // Missing
152+
Updates: []stream.UpdateStatusBatchItem{
153+
{
154+
EntityType: "stream:feeds:v2:activity",
155+
EntityID: "activity_1",
156+
Status: "complete",
157+
},
158+
},
159+
}
160+
_, err := client.Moderation().UpdateStatusBatch(ctx, batchReq)
161+
require.Error(t, err)
162+
require.Contains(t, err.Error(), "moderator_id is required")
163+
})
164+
165+
// Test empty updates
166+
t.Run("empty updates", func(t *testing.T) {
167+
batchReq := stream.UpdateStatusBatchRequest{
168+
ModeratorID: "moderator_123",
169+
Updates: []stream.UpdateStatusBatchItem{},
170+
}
171+
_, err := client.Moderation().UpdateStatusBatch(ctx, batchReq)
172+
require.Error(t, err)
173+
require.Contains(t, err.Error(), "at least one update is required")
174+
})
175+
176+
// Test too many updates (over 100)
177+
t.Run("too many updates", func(t *testing.T) {
178+
updates := make([]stream.UpdateStatusBatchItem, 101)
179+
for i := 0; i < 101; i++ {
180+
updates[i] = stream.UpdateStatusBatchItem{
181+
EntityType: "stream:feeds:v2:activity",
182+
EntityID: fmt.Sprintf("activity_%d", i),
183+
Status: "complete",
184+
}
185+
}
186+
batchReq := stream.UpdateStatusBatchRequest{
187+
ModeratorID: "moderator_123",
188+
Updates: updates,
189+
}
190+
_, err := client.Moderation().UpdateStatusBatch(ctx, batchReq)
191+
require.Error(t, err)
192+
require.Contains(t, err.Error(), "maximum of 100 updates allowed, got 101")
193+
})
194+
195+
// Test duplicate entity type/ID combination
196+
t.Run("duplicate entities", func(t *testing.T) {
197+
batchReq := stream.UpdateStatusBatchRequest{
198+
ModeratorID: "moderator_123",
199+
Updates: []stream.UpdateStatusBatchItem{
200+
{
201+
EntityType: "stream:feeds:v2:activity",
202+
EntityID: "activity_1",
203+
Status: "complete",
204+
},
205+
{
206+
EntityType: "stream:feeds:v2:activity",
207+
EntityID: "activity_1", // Same as above
208+
Status: "pending",
209+
},
210+
},
211+
}
212+
_, err := client.Moderation().UpdateStatusBatch(ctx, batchReq)
213+
require.Error(t, err)
214+
require.Contains(t, err.Error(), "duplicate entity found: entity_type=stream:feeds:v2:activity, entity_id=activity_1")
215+
})
216+
217+
// Test missing entity type
218+
t.Run("missing entity type", func(t *testing.T) {
219+
batchReq := stream.UpdateStatusBatchRequest{
220+
ModeratorID: "moderator_123",
221+
Updates: []stream.UpdateStatusBatchItem{
222+
{
223+
EntityType: "", // Missing
224+
EntityID: "activity_1",
225+
Status: "complete",
226+
},
227+
},
228+
}
229+
_, err := client.Moderation().UpdateStatusBatch(ctx, batchReq)
230+
require.Error(t, err)
231+
require.Contains(t, err.Error(), "entity_type is required for update at index 0")
232+
})
233+
234+
// Test missing entity ID
235+
t.Run("missing entity id", func(t *testing.T) {
236+
batchReq := stream.UpdateStatusBatchRequest{
237+
ModeratorID: "moderator_123",
238+
Updates: []stream.UpdateStatusBatchItem{
239+
{
240+
EntityType: "stream:feeds:v2:activity",
241+
EntityID: "", // Missing
242+
Status: "complete",
243+
},
244+
},
245+
}
246+
_, err := client.Moderation().UpdateStatusBatch(ctx, batchReq)
247+
require.Error(t, err)
248+
require.Contains(t, err.Error(), "entity_id is required for update at index 0")
249+
})
250+
}
251+
81252
func TestInvalidateUserCache(t *testing.T) {
82253
ctx := context.Background()
83254
client, requester := newClient(t)

0 commit comments

Comments
 (0)