Skip to content

Commit d0b30a3

Browse files
committed
feed kinda works, albiet without dedupe
1 parent f5e2c98 commit d0b30a3

6 files changed

Lines changed: 264 additions & 53 deletions

File tree

api/dbv1/full_playlists.go

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"bridgerton.audius.co/trashid"
77
"github.com/jackc/pgx/v5/pgtype"
8-
"golang.org/x/sync/errgroup"
98
)
109

1110
type FullPlaylist struct {
@@ -17,8 +16,15 @@ type FullPlaylist struct {
1716
User FullUser `json:"user"`
1817
Tracks []FullTrack `json:"tracks"`
1918

20-
FolloweeReposts []*FolloweeRepost `json:"followee_reposts"`
21-
FolloweeFavorites []*FolloweeFavorite `json:"followee_favorites"`
19+
FolloweeReposts []*FolloweeRepost `json:"followee_reposts"`
20+
FolloweeFavorites []*FolloweeFavorite `json:"followee_favorites"`
21+
PlaylistContents []FullPlaylistContentsItem `json:"playlist_contents"`
22+
}
23+
24+
type FullPlaylistContentsItem struct {
25+
Time int64 `json:"timestamp"`
26+
TrackId string `json:"track_id"`
27+
MetadataTime int64 `json:"metadata_timestamp"`
2228
}
2329

2430
func (q *Queries) FullPlaylistsKeyed(ctx context.Context, arg GetPlaylistsParams) (map[int32]FullPlaylist, error) {
@@ -38,38 +44,19 @@ func (q *Queries) FullPlaylistsKeyed(ctx context.Context, arg GetPlaylistsParams
3844
}
3945

4046
// fetch users + tracks in parallel
41-
g, ctx := errgroup.WithContext(ctx)
42-
userMap := map[int32]FullUser{}
43-
trackMap := map[int32]FullTrack{}
44-
45-
// fetch users
46-
g.Go(func() error {
47-
var err error
48-
userMap, err = q.FullUsersKeyed(ctx, GetUsersParams{
49-
MyID: arg.MyID,
50-
Ids: userIds,
51-
})
52-
return err
53-
})
54-
55-
// fetch tracks
56-
g.Go(func() error {
57-
var err error
58-
trackMap, err = q.FullTracksKeyed(ctx, GetTracksParams{
59-
MyID: arg.MyID,
60-
Ids: trackIds,
61-
})
62-
return err
47+
loaded, err := q.Parallel(ctx, ParallelParams{
48+
UserIds: userIds,
49+
TrackIds: trackIds,
50+
MyID: arg.MyID,
6351
})
64-
65-
if err := g.Wait(); err != nil {
52+
if err != nil {
6653
return nil, err
6754
}
6855

6956
playlistMap := map[int32]FullPlaylist{}
7057
for _, playlist := range rawPlaylists {
7158
id, _ := trashid.EncodeHashId(int(playlist.PlaylistID))
72-
user, ok := userMap[playlist.PlaylistOwnerID]
59+
user, ok := loaded.UserMap[playlist.PlaylistOwnerID]
7360

7461
// GetUser will omit deactivated users
7562
// so skip tracks if user doesn't come back.
@@ -80,11 +67,22 @@ func (q *Queries) FullPlaylistsKeyed(ctx context.Context, arg GetPlaylistsParams
8067

8168
var tracks = make([]FullTrack, 0, len(playlist.PlaylistContents.TrackIDs))
8269
for _, t := range playlist.PlaylistContents.TrackIDs {
83-
if track, ok := trackMap[int32(t.Track)]; ok {
70+
if track, ok := loaded.TrackMap[int32(t.Track)]; ok {
8471
tracks = append(tracks, track)
8572
}
8673
}
8774

75+
// slightly change playlist_contents
76+
fullPlaylistContents := []FullPlaylistContentsItem{}
77+
for _, item := range playlist.PlaylistContents.TrackIDs {
78+
trackId, _ := trashid.EncodeHashId(int(item.Track))
79+
fullPlaylistContents = append(fullPlaylistContents, FullPlaylistContentsItem{
80+
Time: item.Time,
81+
MetadataTime: item.MetadataTime,
82+
TrackId: trackId,
83+
})
84+
}
85+
8886
playlistMap[playlist.PlaylistID] = FullPlaylist{
8987
GetPlaylistsRow: playlist,
9088
ID: id,
@@ -94,6 +92,7 @@ func (q *Queries) FullPlaylistsKeyed(ctx context.Context, arg GetPlaylistsParams
9492
Tracks: tracks,
9593
FolloweeFavorites: fullFolloweeFavorites(playlist.FolloweeFavorites),
9694
FolloweeReposts: fullFolloweeReposts(playlist.FolloweeReposts),
95+
PlaylistContents: fullPlaylistContents,
9796
}
9897
}
9998

api/dbv1/parallel.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package dbv1
2+
3+
import (
4+
"context"
5+
6+
"golang.org/x/sync/errgroup"
7+
)
8+
9+
type ParallelParams struct {
10+
UserIds []int32
11+
TrackIds []int32
12+
PlaylistIds []int32
13+
MyID interface{}
14+
}
15+
16+
type ParallelResult struct {
17+
UserMap map[int32]FullUser
18+
TrackMap map[int32]FullTrack
19+
PlaylistMap map[int32]FullPlaylist
20+
}
21+
22+
func (q *Queries) Parallel(ctx context.Context, arg ParallelParams) (*ParallelResult, error) {
23+
g, ctx := errgroup.WithContext(ctx)
24+
25+
var userMap map[int32]FullUser
26+
var trackMap map[int32]FullTrack
27+
var playlistMap map[int32]FullPlaylist
28+
29+
if len(arg.UserIds) > 0 {
30+
g.Go(func() error {
31+
var err error
32+
userMap, err = q.FullUsersKeyed(ctx, GetUsersParams{
33+
Ids: arg.UserIds,
34+
MyID: arg.MyID,
35+
})
36+
return err
37+
})
38+
}
39+
40+
if len(arg.TrackIds) > 0 {
41+
g.Go(func() error {
42+
var err error
43+
trackMap, err = q.FullTracksKeyed(ctx, GetTracksParams{
44+
Ids: arg.TrackIds,
45+
MyID: arg.MyID,
46+
})
47+
return err
48+
})
49+
}
50+
51+
if len(arg.PlaylistIds) > 0 {
52+
g.Go(func() error {
53+
var err error
54+
playlistMap, err = q.FullPlaylistsKeyed(ctx, GetPlaylistsParams{
55+
Ids: arg.PlaylistIds,
56+
MyID: arg.MyID,
57+
})
58+
return err
59+
})
60+
}
61+
62+
err := g.Wait()
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
result := &ParallelResult{
68+
userMap,
69+
trackMap,
70+
playlistMap,
71+
}
72+
73+
return result, nil
74+
}

api/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func NewApiServer(config Config) *ApiServer {
140140
g.Get("/users/:userId/reposts", app.v1UsersReposts)
141141
g.Get("/users/:userId/supporting", app.v1UsersSupporting)
142142
g.Get("/users/:userId/tracks", app.v1UserTracks)
143+
g.Get("/users/:userId/feed", app.v1UsersFeed)
143144

144145
// Tracks
145146
g.Get("/tracks", app.v1Tracks)

api/server_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func Test200(t *testing.T) {
9191
"/v1/full/users/7eP5n/reposts",
9292
"/v1/full/users/7eP5n/supporting",
9393
"/v1/full/users/7eP5n/tracks",
94+
"/v1/full/users/7eP5n/feed",
9495

9596
"/v1/full/users/handle/rayjacobson",
9697
"/v1/full/users/handle/rayjacobson/tracks",

api/v1_users_feed.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package api
2+
3+
import (
4+
"time"
5+
6+
"bridgerton.audius.co/api/dbv1"
7+
"github.com/gofiber/fiber/v2"
8+
"github.com/jackc/pgx/v5"
9+
)
10+
11+
// todo: some dedupe stuff
12+
func (app *ApiServer) v1UsersFeed(c *fiber.Ctx) error {
13+
14+
sql := `
15+
WITH
16+
follow_set AS (
17+
SELECT followee_user_id AS user_id
18+
FROM follows
19+
WHERE
20+
follower_user_id = @userId
21+
AND is_delete = false
22+
),
23+
history as (
24+
25+
(
26+
SELECT
27+
user_id as actor_id,
28+
'repost' as verb,
29+
repost_type as obj_type,
30+
repost_item_id as obj_id,
31+
reposts.created_at
32+
FROM reposts
33+
JOIN follow_set using (user_id)
34+
LEFT JOIN tracks
35+
ON repost_type = 'track'
36+
AND repost_item_id = track_id
37+
AND tracks.is_delete = false
38+
AND tracks.is_unlisted = false
39+
AND tracks.is_available = true
40+
LEFT JOIN playlists
41+
ON repost_type != 'track'
42+
AND repost_item_id = playlist_id
43+
AND playlists.is_delete = false
44+
AND playlists.is_private = false
45+
WHERE
46+
reposts.created_at < @before
47+
AND reposts.created_at >= @before - INTERVAL '30 DAYS'
48+
AND reposts.is_delete = false
49+
AND (tracks.track_id IS NOT NULL OR playlists.playlist_id IS NOT NULL)
50+
)
51+
52+
UNION ALL
53+
54+
(
55+
SELECT
56+
user_id as actor_id,
57+
'post' as verb,
58+
'track' as obj_type,
59+
track_id as obj_id,
60+
created_at
61+
from tracks
62+
join follow_set on owner_id = user_id
63+
where created_at < @before
64+
and created_at >= @before::timestamp - INTERVAL '30 DAYS'
65+
and is_unlisted = false
66+
and is_delete = false
67+
and stem_of is null
68+
)
69+
70+
UNION ALL
71+
72+
(
73+
SELECT
74+
user_id as actor_id,
75+
'post' as verb,
76+
'playlist' as obj_type,
77+
playlist_id as obj_id,
78+
created_at
79+
from playlists
80+
join follow_set on playlist_owner_id = user_id
81+
where created_at < @before
82+
and created_at >= @before::timestamp - INTERVAL '30 DAYS'
83+
and is_delete = false
84+
AND is_private = false
85+
)
86+
87+
)
88+
SELECT * FROM history
89+
ORDER BY created_at asc
90+
LIMIT @limit
91+
OFFSET @offset
92+
`
93+
94+
rows, err := app.pool.Query(c.Context(), sql, pgx.NamedArgs{
95+
"userId": c.Locals("userId"),
96+
"before": time.Now(),
97+
"limit": c.Query("limit", "50"),
98+
"offset": c.Query("offset", "0"),
99+
})
100+
if err != nil {
101+
return err
102+
}
103+
104+
type FeedItem struct {
105+
ActorID int
106+
Verb string
107+
ObjType string `json:"type"`
108+
ObjID int32
109+
CreatedAt time.Time
110+
111+
Item any `db:"-" json:"item"`
112+
}
113+
114+
stubs, err := pgx.CollectRows(rows, pgx.RowToStructByName[FeedItem])
115+
if err != nil {
116+
return err
117+
}
118+
119+
// todo: remove duplicates
120+
// something like:
121+
// https://github.com/stereosteve/Elemental/blob/master/server/db/query-feed.ts#L77-L85
122+
123+
trackIds := []int32{}
124+
playlistIds := []int32{}
125+
for _, stub := range stubs {
126+
if stub.ObjType == "track" {
127+
trackIds = append(trackIds, stub.ObjID)
128+
} else {
129+
playlistIds = append(playlistIds, stub.ObjID)
130+
}
131+
}
132+
133+
loaded, err := app.queries.Parallel(c.Context(), dbv1.ParallelParams{
134+
TrackIds: trackIds,
135+
PlaylistIds: playlistIds,
136+
MyID: c.Locals("myId"),
137+
})
138+
if err != nil {
139+
return err
140+
}
141+
142+
for idx, stub := range stubs {
143+
if stub.ObjType == "track" {
144+
stub.Item = loaded.TrackMap[stub.ObjID]
145+
} else {
146+
stub.Item = loaded.PlaylistMap[stub.ObjID]
147+
}
148+
stubs[idx] = stub
149+
}
150+
151+
return c.JSON(fiber.Map{
152+
"data": stubs,
153+
})
154+
}

0 commit comments

Comments
 (0)