Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 316 additions & 0 deletions tests/federation_room_get_missing_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/gorilla/mux"
"github.com/matrix-org/complement"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/fclient"
Expand All @@ -18,10 +19,12 @@ import (

"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/ct"
"github.com/matrix-org/complement/federation"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/match"
"github.com/matrix-org/complement/must"
"github.com/matrix-org/complement/runtime"
)

// TODO:
Expand Down Expand Up @@ -598,3 +601,316 @@ func TestOutboundFederationEventSizeGetMissingEvents(t *testing.T) {
// Alice should receive the sent event, even though the "bad" event has a too large state key
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(room.RoomID, sentEvent.EventID()))
}

// Test that if you respond to /state_ids, and fail some /event requests, we end up
// with correctly persisted auth information for the event. This creates an _auth graph_ like so:
//
// A <- B <- C <- D <- E m.room.member,bob
//
// Complement needs the HS to hit /state_ids and /event for missing events so it does some work to manipulate this:
// - it sends 100 unrelated state events. This ensures that any statistical analysis done on the number of missing events
// in /state_ids means we will bias to using /event and not /state. The test needs /event.
// - it sends an unrelated event to the HS with unknown prev_events.
// - it returns an unrelated event for /get_missing_events.
// - then /state_ids should be hit.
//
// When /state_ids is hit, we will include A,B,C,D,E in the response. This will be the first time the HS sees these events.
// Because we've gamed the number of state events in the room, HSes _should_ hit /event for each event ID.
// Now the actual test can begin:
// - We fail the /event request for B.
// - We ensure that we do not see C,D,E in the final room state.
//
// This is a regression test where a HS could have code which does the following:
// - Sort events topologically (A,B,C,D,E)
// - for each event, check you have the auth events and then auth it.
// - If you don't have the auth events, drop it, else persist it (incl. whether it was rejected).
//
// This has a subtle bug IF "check you have the auth events" uses an in-memory event map AND dropping the event doesn't remove
// the entry from that event map. If this happens: A is processed, B is missing, C is dropped due to missing B,
// crucially D and E ARE PERSISTED because C exists in-memory.
// This breaks the auth chain for the room, which matters when doing state resolution.
func TestCorruptedAuthChain(t *testing.T) {
// Dendrite doesn't make exactly the same requests as it seems to fallback to /event_auth.
// As this is intended for a synapse bugfix, we'll skip dendrite for now.
runtime.SkipIf(t, runtime.Dendrite)
deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)

srv := federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandleMakeSendJoinRequests(),
federation.HandleTransactionRequests(nil, nil),
federation.HandleInviteRequests(nil),
)
// We expect to be pushed events that we don't care about responding to (not relevant to the test)
srv.UnexpectedRequestsAreErrors = false
cancel := srv.Listen()
defer cancel()

alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "alice"})
// ensure the server under test remains in the room when alice rejoins
sentinel := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "sentinel"})
roomID := alice.MustCreateRoom(t, map[string]interface{}{
"preset": "public_chat",
"room_version": "10",
})
sentinel.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
// Pad out the room state
for i := 0; i < 100; i++ {
if i%2 == 0 {
alice.MustLeaveRoom(t, roomID)
} else {
alice.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
}
}
bob := srv.UserID("bob")
defaultImpl := federation.ServerRoomImplDefault{}
var existingAuthChain []gomatrixserverlib.PDU
srvRoom := srv.MustJoinRoom(t, deployment, spec.ServerName("hs1"), roomID, bob, federation.WithRoomOpts(federation.WithImpl(&federation.ServerRoomImplCustom{
ServerRoomImplDefault: defaultImpl,
PopulateFromSendJoinResponseFn: func(def federation.ServerRoomImpl, room *federation.ServerRoom, joinEvent gomatrixserverlib.PDU, resp fclient.RespSendJoin) {
defaultImpl.PopulateFromSendJoinResponse(room, joinEvent, resp)
existingAuthChain = resp.AuthEvents.TrustedEvents(joinEvent.Version(), false)
},
})))
// we should have at least 100 events in the auth chain
if len(existingAuthChain) < 100 {
ct.Fatalf(t, "not enough events in the auth chain, got %d want >100", len(existingAuthChain))
}
Comment thread
kegsay marked this conversation as resolved.
createEvent := srvRoom.CurrentState(spec.MRoomCreate, "")
plEvent := srvRoom.CurrentState(spec.MRoomPowerLevels, "")
jrEvent := srvRoom.CurrentState(spec.MRoomJoinRules, "")
bobOriginalJoinEvent := srvRoom.CurrentState(spec.MRoomMember, bob)

// Create A,B,C,D,E which will be profile changes for Bob (where each event is dependent on the next)
eventA := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "A",
},
})
eventB := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "B",
},
PrevEvents: []string{eventA.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventA.EventID()},
})
eventC := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "C",
},
PrevEvents: []string{eventB.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventB.EventID()},
})
eventD := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "D",
},
PrevEvents: []string{eventC.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventC.EventID()},
})
eventE := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: spec.MRoomMember,
Sender: bob,
StateKey: &bob,
Content: map[string]interface{}{
"membership": "join",
"displayname": "E",
},
PrevEvents: []string{eventD.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventD.EventID()},
})
// We include this in auth_events for subsequent events below.
srvRoom.AddEvent(eventE)

// Create 3 unrelated events (one for /send, one for /gme, one for /state_ids snapshot)
stateIDsEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: "m.room.message",
Sender: bob,
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "for /state_ids",
},
PrevEvents: []string{eventE.EventID()},
})
srvRoom.AddEvent(stateIDsEvent)
gmeEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: "m.room.message",
Sender: bob,
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "for /get_missing_events",
},
PrevEvents: []string{stateIDsEvent.EventID()},
})
srvRoom.AddEvent(gmeEvent)
sendTxnEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: "m.room.message",
Sender: bob,
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "for /send",
},
PrevEvents: []string{gmeEvent.EventID()},
})
srvRoom.AddEvent(sendTxnEvent)

// the possible events to return in /event. This omits B.
allEventsToShare := []gomatrixserverlib.PDU{
stateIDsEvent, gmeEvent, sendTxnEvent, eventA, eventC, eventD, eventE,
}
t.Logf("event A: %s", eventA.EventID())
t.Logf("event B: %s", eventB.EventID())
t.Logf("event C: %s", eventC.EventID())
t.Logf("event D: %s", eventD.EventID())
t.Logf("event E: %s", eventE.EventID())
t.Logf("event for /state_ids: %s", stateIDsEvent.EventID())
t.Logf("event for /get_missing_events: %s", gmeEvent.EventID())
t.Logf("event for /send: %s", sendTxnEvent.EventID())

// add handlers for them
gmeWaiter := helpers.NewWaiter()
// We will send 'sendTxnEvent' via /send. The homeserver will see the event has unknown prev_events and hit /get_missing_events
srv.Mux().HandleFunc("/_matrix/federation/v1/get_missing_events/{roomID}", func(w http.ResponseWriter, req *http.Request) {
Comment thread
kegsay marked this conversation as resolved.
defer gmeWaiter.Finish()
body := must.ParseJSON(t, req.Body)
t.Logf("/get_missing_events req for room %s => %s", mux.Vars(req)["roomID"], body.Raw)
must.Equal(t, body.Get("latest_events").Array()[0].String(), sendTxnEvent.EventID(), "unexpected event provided to /get_missing_events")
w.WriteHeader(200)
res := struct {
Events []gomatrixserverlib.PDU `json:"events"`
}{
Events: []gomatrixserverlib.PDU{gmeEvent},
}
t.Logf("/get_missing_events req for room %s responding with %s in room %s", mux.Vars(req)["roomID"], res.Events[0].EventID(), res.Events[0].RoomID())
var responseBytes []byte
responseBytes, err := json.Marshal(&res)
must.NotError(t, "failed to marshal response", err)
w.Write(responseBytes)
})
stateIDWaiter := helpers.NewWaiter()
// The homeserver won't be able to link up the events returned via /get_missing_events to what it previously knew, so it will
// ask for a state snapshot via /state_ids.
srv.Mux().HandleFunc("/_matrix/federation/v1/state_ids/{roomID}", func(w http.ResponseWriter, req *http.Request) {
defer stateIDWaiter.Finish()
t.Logf("/state_ids req for room %s => %s", mux.Vars(req)["roomID"], req.URL.Query().Encode())
reqEventID := req.URL.Query().Get("event_id")
must.Equal(t, reqEventID, stateIDsEvent.EventID(), "unexpected event provided to /state_ids")
w.WriteHeader(200)

var authChainIDs []string
for _, ev := range existingAuthChain {
authChainIDs = append(authChainIDs, ev.EventID())
}
// include A,B,C,D
authChainIDs = append(authChainIDs, eventA.EventID(), eventB.EventID(), eventC.EventID(), eventD.EventID())
// the current state is the same as before but with E as the member event for bob
var pduIDs []string
for _, ev := range srvRoom.AllCurrentState() {
if ev.Type() == spec.MRoomMember && ev.StateKeyEquals(bob) {
continue
}
pduIDs = append(pduIDs, ev.EventID())
}
pduIDs = append(pduIDs, eventE.EventID())
res := struct {
AuthChainIDs []string `json:"auth_chain_ids"`
PDUIDs []string `json:"pdu_ids"`
}{
AuthChainIDs: authChainIDs,
PDUIDs: pduIDs,
}
var responseBytes []byte
responseBytes, err := json.Marshal(&res)
must.NotError(t, "failed to marshal response", err)
w.Write(responseBytes)
})
eventBWaiter := helpers.NewWaiter()
// /state_ids will return some unknown events which the homeserver will try to fetch via /event
srv.Mux().Handle("/_matrix/federation/v1/event/{eventID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
eventID := vars["eventID"]
var event gomatrixserverlib.PDU
// find the event
for _, ev := range allEventsToShare {
if ev.EventID() == eventID {
event = ev
break
}
}
// we should see a request for event B
if eventID == eventB.EventID() {
eventBWaiter.Finish()
}

if event == nil {
t.Logf("/event returning 404 for event %v", eventID)
w.WriteHeader(404)
w.Write([]byte(fmt.Sprintf(`complement: failed to find event: %s`, eventID)))
return
}

txn := gomatrixserverlib.Transaction{
Origin: spec.ServerName(srv.ServerName()),
OriginServerTS: spec.AsTimestamp(time.Now()),
PDUs: []json.RawMessage{
event.JSON(),
},
}
resp, err := json.Marshal(txn)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf(`complement: failed to marshal JSON response: %s`, err)))
return
}
w.WriteHeader(200)
w.Write(resp)
}))

srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sendTxnEvent.JSON()}, nil)

// wait for the server to make the requests
gmeWaiter.Wait(t, 5*time.Second)
stateIDWaiter.Wait(t, 5*time.Second)
eventBWaiter.Wait(t, 5*time.Second)

// At this point all we know is that the server requested event B when doing /state_ids.
// We don't know if sendTxnEvent has been fully processed / the room state has been updated.
// If the server is functioning correctly, sendTxnEvent will never be delivered to the client
// as the server will be unable to fetch room state for it. So send another event as a sentinel.
// Wait until we see sendTxnEvent in the sync timeline before asserting that the room state is correct.
sentinelEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
Type: "m.room.message",
Sender: bob,
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "finished",
},
PrevEvents: []string{bobOriginalJoinEvent.EventID()},
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), bobOriginalJoinEvent.EventID()},
})
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sentinelEvent.JSON()}, nil)
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, sentinelEvent.EventID()))

// we should not see event E as the current state for bob.
content := alice.MustGetStateEventContent(t, roomID, spec.MRoomMember, bob)
t.Logf("bob's membership content: %v", content.Raw)
// assert bob's member event was his initial join, not any of the others. Technically you can argue A should be valid.
must.Equal(t, content.Get("displayname").Str, "", "Events C/D/E were processed when they should not have been as the server doesn't know B.")
}
Loading