Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 286 additions & 0 deletions tests/federation_room_get_missing_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/gorilla/mux"
"github.com/matrix-org/complement"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/fclient"
Expand All @@ -18,6 +19,7 @@ import (

"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/ct"
"github.com/matrix-org/complement/federation"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/match"
Expand Down Expand Up @@ -598,3 +600,287 @@ func TestOutboundFederationEventSizeGetMissingEvents(t *testing.T) {
// Alice should receive the sent event, even though the "bad" event has a too large state key
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(room.RoomID, sentEvent.EventID()))
}

// Test that if you respond to /state_ids, and fail some /event requests, we end up
// with correctly persisted auth information for the event. This creates an _auth graph_ like so:
//
// A <- B <- C <- D <- E m.room.member,bob
//
// Complement needs the HS to hit /state_ids and /event for missing events so it does some work to manipulate this:
// - it sends 100 unrelated state events. This ensures that any statistical analysis done on the number of missing events
// in /state_ids means we will bias to using /event and not /state. The test needs /event.
// - it sends an unrelated event to the HS with unknown prev_events.
// - it returns an unrelated event for /get_missing_events.
// - then /state_ids should be hit.
//
// When /state_ids is hit, we will include A,B,C,D,E in the response. This will be the first time the HS sees these events.
// Because we've gamed the number of state events in the room, HSes _should_ hit /event for each event ID.
// Now the actual test can begin:
// - We fail the /event request for B.
// - We ensure that we do not see C,D,E in the final room state.
//
// This is a regression test where a HS could have code which does the following:
// - Sort events topologically (A,B,C,D,E)
// - for each event, check you have the auth events and then auth it.
// - If you don't have the auth events, drop it, else persist it (incl. whether it was rejected).
//
// This has a subtle bug IF "check you have the auth events" uses an in-memory event map AND dropping the event doesn't remove
// the entry from that event map. If this happens: A is processed, B is missing, C is dropped due to missing B,
// crucially D and E ARE PERSISTED because C exists in-memory.
// This breaks the auth chain for the room, which matters when doing state resolution.
func TestCorruptedAuthChain(t *testing.T) {
deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)

srv := federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandleMakeSendJoinRequests(),
federation.HandleTransactionRequests(nil, nil),
federation.HandleInviteRequests(nil),
)
srv.UnexpectedRequestsAreErrors = false // we expect to be pushed events
cancel := srv.Listen()
defer cancel()

alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "alice"})
sentinel := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "sentinel"})
roomID := alice.MustCreateRoom(t, map[string]interface{}{
"preset": "public_chat",
"room_version": "10",
})
sentinel.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
// pack out the room state
for i := 0; i < 100; i++ {
if i%2 == 0 {
alice.MustLeaveRoom(t, roomID)
} else {
alice.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
}
}
bob := srv.UserID("bob")
defaultImpl := federation.ServerRoomImplDefault{}
var existingAuthChain []gomatrixserverlib.PDU
srvRoom := srv.MustJoinRoom(t, deployment, spec.ServerName("hs1"), roomID, bob, federation.WithRoomOpts(federation.WithImpl(&federation.ServerRoomImplCustom{
ServerRoomImplDefault: defaultImpl,
PopulateFromSendJoinResponseFn: func(def federation.ServerRoomImpl, room *federation.ServerRoom, joinEvent gomatrixserverlib.PDU, resp fclient.RespSendJoin) {
defaultImpl.PopulateFromSendJoinResponse(room, joinEvent, resp)
existingAuthChain = resp.AuthEvents.TrustedEvents(joinEvent.Version(), false)
},
})))
// we should have at least 100 events in the auth chain
if len(existingAuthChain) < 100 {
ct.Fatalf(t, "not enough events in the auth chain, got %d want >100", len(existingAuthChain))
}
createEvent := srvRoom.CurrentState(spec.MRoomCreate, "")
plEvent := srvRoom.CurrentState(spec.MRoomPowerLevels, "")
jrEvent := srvRoom.CurrentState(spec.MRoomJoinRules, "")

// Create A,B,C,D,E which will be profile changes for Bob
eventA := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "A",
},
})
eventB := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "B",
},
PrevEvents: []string{eventA.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventA.EventID()},
})
eventC := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "C",
},
PrevEvents: []string{eventB.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventB.EventID()},
})
eventD := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "D",
},
PrevEvents: []string{eventC.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventC.EventID()},
})
eventE := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "E",
},
PrevEvents: []string{eventD.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventD.EventID()},
})
srvRoom.AddEvent(eventE) // so we include this in auth_events for subsequent events below.

// Create 3 unrelated events (one for /send, one for /gme, one for /state_ids snapshot)
stateIDsEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: "m.room.message",
Sender: bob,
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "for /state_ids",
},
PrevEvents: []string{eventE.EventID()},
})
srvRoom.AddEvent(stateIDsEvent)
gmeEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: "m.room.message",
Sender: bob,
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "for /get_missing_events",
},
PrevEvents: []string{stateIDsEvent.EventID()},
})
srvRoom.AddEvent(gmeEvent)
sendTxnEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: "m.room.message",
Sender: bob,
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "for /send",
},
PrevEvents: []string{gmeEvent.EventID()},
})
srvRoom.AddEvent(sendTxnEvent)

// the possible events to return in /event. This omits B.
allEvents := []gomatrixserverlib.PDU{
stateIDsEvent, gmeEvent, sendTxnEvent, eventA, eventC, eventD, eventE,
}
t.Logf("event A: %s", eventA.EventID())
t.Logf("event B: %s", eventB.EventID())
t.Logf("event C: %s", eventC.EventID())
t.Logf("event D: %s", eventD.EventID())
t.Logf("event E: %s", eventE.EventID())

// add handlers for them
gmeWaiter := helpers.NewWaiter()
srv.Mux().HandleFunc("/_matrix/federation/v1/get_missing_events/{roomID}", func(w http.ResponseWriter, req *http.Request) {
defer gmeWaiter.Finish()
body := must.ParseJSON(t, req.Body)
t.Logf("/get_missing_events req for room %s => %s", mux.Vars(req)["roomID"], body.Raw)
must.Equal(t, body.Get("latest_events").Array()[0].String(), sendTxnEvent.EventID(), "unexpected event provided to /get_missing_events")
w.WriteHeader(200)
res := struct {
Events []gomatrixserverlib.PDU `json:"events"`
}{
Events: []gomatrixserverlib.PDU{gmeEvent},
}
t.Logf("/get_missing_events req for room %s responding with %s in room %s", mux.Vars(req)["roomID"], res.Events[0].EventID(), res.Events[0].RoomID())
var responseBytes []byte
responseBytes, err := json.Marshal(&res)
must.NotError(t, "failed to marshal response", err)
w.Write(responseBytes)
})
stateIDWaiter := helpers.NewWaiter()
srv.Mux().HandleFunc("/_matrix/federation/v1/state_ids/{roomID}", func(w http.ResponseWriter, req *http.Request) {
defer stateIDWaiter.Finish()
t.Logf("/state_ids req for room %s => %s", mux.Vars(req)["roomID"], req.URL.Query().Encode())
reqEventID := req.URL.Query().Get("event_id")
must.Equal(t, reqEventID, stateIDsEvent.EventID(), "unexpected event provided to /state_ids")
w.WriteHeader(200)

var authChainIDs []string
for _, ev := range existingAuthChain {
authChainIDs = append(authChainIDs, ev.EventID())
}
// include A,B,C,D
authChainIDs = append(authChainIDs, eventA.EventID(), eventB.EventID(), eventC.EventID(), eventD.EventID())
// the current state is the same as before but with E as the member event for bob
var pduIDs []string
for _, ev := range srvRoom.AllCurrentState() {
if ev.Type() == spec.MRoomMember && ev.StateKeyEquals(bob) {
continue
}
pduIDs = append(pduIDs, ev.EventID())
}
pduIDs = append(pduIDs, eventE.EventID())
res := struct {
AuthChainIDs []string `json:"auth_chain_ids"`
PDUIDs []string `json:"pdu_ids"`
}{
AuthChainIDs: authChainIDs,
PDUIDs: pduIDs,
}
var responseBytes []byte
responseBytes, err := json.Marshal(&res)
must.NotError(t, "failed to marshal response", err)
w.Write(responseBytes)
})
eventWaiter := helpers.NewWaiter()
srv.Mux().Handle("/_matrix/federation/v1/event/{eventID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
eventID := vars["eventID"]
var event gomatrixserverlib.PDU
// find the event
for _, ev := range allEvents {
if ev.EventID() == eventID {
event = ev
break
}
}
// we should see a request for event B
if eventID == eventB.EventID() {
eventWaiter.Finish()
}

if event == nil {
t.Logf("/event returning 404 for event %v", eventID)
w.WriteHeader(404)
w.Write([]byte(fmt.Sprintf(`complement: failed to find event: %s`, eventID)))
return
}

txn := gomatrixserverlib.Transaction{
Origin: spec.ServerName(srv.ServerName()),
OriginServerTS: spec.AsTimestamp(time.Now()),
PDUs: []json.RawMessage{
event.JSON(),
},
}
resp, err := json.Marshal(txn)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf(`complement: failed to marshal JSON response: %s`, err)))
return
}
w.WriteHeader(200)
w.Write(resp)
}))

srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sendTxnEvent.JSON()}, nil)

// wait for the server to make the requests
gmeWaiter.Wait(t, 5*time.Second)
stateIDWaiter.Wait(t, 5*time.Second)
eventWaiter.Wait(t, 5*time.Second)

// let things settle
time.Sleep(time.Second)

// we should not see event E as the current state for bob.
content := alice.MustGetStateEventContent(t, roomID, spec.MRoomMember, bob)
t.Logf("bob's membership content: %v", content.Raw)
// assert bob's member event was his initial join, not any of the others. Technically you can argue A should be valid.
must.Equal(t, content.Get("displayname").Str, "", "Events C/D/E were processed when they should not have been as the server doesn't know B.")
}
Loading