diff --git a/tests/federation_room_get_missing_events_test.go b/tests/federation_room_get_missing_events_test.go index 7c2c0bca..9899f88b 100644 --- a/tests/federation_room_get_missing_events_test.go +++ b/tests/federation_room_get_missing_events_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/gorilla/mux" "github.com/matrix-org/complement" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/fclient" @@ -18,10 +19,12 @@ import ( "github.com/matrix-org/complement/b" "github.com/matrix-org/complement/client" + "github.com/matrix-org/complement/ct" "github.com/matrix-org/complement/federation" "github.com/matrix-org/complement/helpers" "github.com/matrix-org/complement/match" "github.com/matrix-org/complement/must" + "github.com/matrix-org/complement/runtime" ) // TODO: @@ -598,3 +601,316 @@ func TestOutboundFederationEventSizeGetMissingEvents(t *testing.T) { // Alice should receive the sent event, even though the "bad" event has a too large state key alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(room.RoomID, sentEvent.EventID())) } + +// Test that if you respond to /state_ids, and fail some /event requests, we end up +// with correctly persisted auth information for the event. This creates an _auth graph_ like so: +// +// A <- B <- C <- D <- E m.room.member,bob +// +// Complement needs the HS to hit /state_ids and /event for missing events so it does some work to manipulate this: +// - it sends 100 unrelated state events. This ensures that any statistical analysis done on the number of missing events +// in /state_ids means we will bias to using /event and not /state. The test needs /event. +// - it sends an unrelated event to the HS with unknown prev_events. +// - it returns an unrelated event for /get_missing_events. +// - then /state_ids should be hit. +// +// When /state_ids is hit, we will include A,B,C,D,E in the response. This will be the first time the HS sees these events. +// Because we've gamed the number of state events in the room, HSes _should_ hit /event for each event ID. +// Now the actual test can begin: +// - We fail the /event request for B. +// - We ensure that we do not see C,D,E in the final room state. +// +// This is a regression test where a HS could have code which does the following: +// - Sort events topologically (A,B,C,D,E) +// - for each event, check you have the auth events and then auth it. +// - If you don't have the auth events, drop it, else persist it (incl. whether it was rejected). +// +// This has a subtle bug IF "check you have the auth events" uses an in-memory event map AND dropping the event doesn't remove +// the entry from that event map. If this happens: A is processed, B is missing, C is dropped due to missing B, +// crucially D and E ARE PERSISTED because C exists in-memory. +// This breaks the auth chain for the room, which matters when doing state resolution. +func TestCorruptedAuthChain(t *testing.T) { + // Dendrite doesn't make exactly the same requests as it seems to fallback to /event_auth. + // As this is intended for a synapse bugfix, we'll skip dendrite for now. + runtime.SkipIf(t, runtime.Dendrite) + deployment := complement.Deploy(t, 1) + defer deployment.Destroy(t) + + srv := federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + federation.HandleMakeSendJoinRequests(), + federation.HandleTransactionRequests(nil, nil), + federation.HandleInviteRequests(nil), + ) + // We expect to be pushed events that we don't care about responding to (not relevant to the test) + srv.UnexpectedRequestsAreErrors = false + cancel := srv.Listen() + defer cancel() + + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "alice"}) + // ensure the server under test remains in the room when alice rejoins + sentinel := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "sentinel"}) + roomID := alice.MustCreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + "room_version": "10", + }) + sentinel.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"}) + // Pad out the room state + for i := 0; i < 100; i++ { + if i%2 == 0 { + alice.MustLeaveRoom(t, roomID) + } else { + alice.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"}) + } + } + bob := srv.UserID("bob") + defaultImpl := federation.ServerRoomImplDefault{} + var existingAuthChain []gomatrixserverlib.PDU + srvRoom := srv.MustJoinRoom(t, deployment, spec.ServerName("hs1"), roomID, bob, federation.WithRoomOpts(federation.WithImpl(&federation.ServerRoomImplCustom{ + ServerRoomImplDefault: defaultImpl, + PopulateFromSendJoinResponseFn: func(def federation.ServerRoomImpl, room *federation.ServerRoom, joinEvent gomatrixserverlib.PDU, resp fclient.RespSendJoin) { + defaultImpl.PopulateFromSendJoinResponse(room, joinEvent, resp) + existingAuthChain = resp.AuthEvents.TrustedEvents(joinEvent.Version(), false) + }, + }))) + // we should have at least 100 events in the auth chain + if len(existingAuthChain) < 100 { + ct.Fatalf(t, "not enough events in the auth chain, got %d want >100", len(existingAuthChain)) + } + createEvent := srvRoom.CurrentState(spec.MRoomCreate, "") + plEvent := srvRoom.CurrentState(spec.MRoomPowerLevels, "") + jrEvent := srvRoom.CurrentState(spec.MRoomJoinRules, "") + bobOriginalJoinEvent := srvRoom.CurrentState(spec.MRoomMember, bob) + + // Create A,B,C,D,E which will be profile changes for Bob (where each event is dependent on the next) + eventA := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: spec.MRoomMember, + Sender: bob, + StateKey: &bob, + Content: map[string]interface{}{ + "membership": "join", + "displayname": "A", + }, + }) + eventB := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: spec.MRoomMember, + Sender: bob, + StateKey: &bob, + Content: map[string]interface{}{ + "membership": "join", + "displayname": "B", + }, + PrevEvents: []string{eventA.EventID()}, + AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventA.EventID()}, + }) + eventC := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: spec.MRoomMember, + Sender: bob, + StateKey: &bob, + Content: map[string]interface{}{ + "membership": "join", + "displayname": "C", + }, + PrevEvents: []string{eventB.EventID()}, + AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventB.EventID()}, + }) + eventD := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: spec.MRoomMember, + Sender: bob, + StateKey: &bob, + Content: map[string]interface{}{ + "membership": "join", + "displayname": "D", + }, + PrevEvents: []string{eventC.EventID()}, + AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventC.EventID()}, + }) + eventE := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: spec.MRoomMember, + Sender: bob, + StateKey: &bob, + Content: map[string]interface{}{ + "membership": "join", + "displayname": "E", + }, + PrevEvents: []string{eventD.EventID()}, + AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventD.EventID()}, + }) + // We include this in auth_events for subsequent events below. + srvRoom.AddEvent(eventE) + + // Create 3 unrelated events (one for /send, one for /gme, one for /state_ids snapshot) + stateIDsEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: "m.room.message", + Sender: bob, + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "for /state_ids", + }, + PrevEvents: []string{eventE.EventID()}, + }) + srvRoom.AddEvent(stateIDsEvent) + gmeEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: "m.room.message", + Sender: bob, + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "for /get_missing_events", + }, + PrevEvents: []string{stateIDsEvent.EventID()}, + }) + srvRoom.AddEvent(gmeEvent) + sendTxnEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: "m.room.message", + Sender: bob, + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "for /send", + }, + PrevEvents: []string{gmeEvent.EventID()}, + }) + srvRoom.AddEvent(sendTxnEvent) + + // the possible events to return in /event. This omits B. + allEventsToShare := []gomatrixserverlib.PDU{ + stateIDsEvent, gmeEvent, sendTxnEvent, eventA, eventC, eventD, eventE, + } + t.Logf("event A: %s", eventA.EventID()) + t.Logf("event B: %s", eventB.EventID()) + t.Logf("event C: %s", eventC.EventID()) + t.Logf("event D: %s", eventD.EventID()) + t.Logf("event E: %s", eventE.EventID()) + t.Logf("event for /state_ids: %s", stateIDsEvent.EventID()) + t.Logf("event for /get_missing_events: %s", gmeEvent.EventID()) + t.Logf("event for /send: %s", sendTxnEvent.EventID()) + + // add handlers for them + gmeWaiter := helpers.NewWaiter() + // We will send 'sendTxnEvent' via /send. The homeserver will see the event has unknown prev_events and hit /get_missing_events + srv.Mux().HandleFunc("/_matrix/federation/v1/get_missing_events/{roomID}", func(w http.ResponseWriter, req *http.Request) { + defer gmeWaiter.Finish() + body := must.ParseJSON(t, req.Body) + t.Logf("/get_missing_events req for room %s => %s", mux.Vars(req)["roomID"], body.Raw) + must.Equal(t, body.Get("latest_events").Array()[0].String(), sendTxnEvent.EventID(), "unexpected event provided to /get_missing_events") + w.WriteHeader(200) + res := struct { + Events []gomatrixserverlib.PDU `json:"events"` + }{ + Events: []gomatrixserverlib.PDU{gmeEvent}, + } + t.Logf("/get_missing_events req for room %s responding with %s in room %s", mux.Vars(req)["roomID"], res.Events[0].EventID(), res.Events[0].RoomID()) + var responseBytes []byte + responseBytes, err := json.Marshal(&res) + must.NotError(t, "failed to marshal response", err) + w.Write(responseBytes) + }) + stateIDWaiter := helpers.NewWaiter() + // The homeserver won't be able to link up the events returned via /get_missing_events to what it previously knew, so it will + // ask for a state snapshot via /state_ids. + srv.Mux().HandleFunc("/_matrix/federation/v1/state_ids/{roomID}", func(w http.ResponseWriter, req *http.Request) { + defer stateIDWaiter.Finish() + t.Logf("/state_ids req for room %s => %s", mux.Vars(req)["roomID"], req.URL.Query().Encode()) + reqEventID := req.URL.Query().Get("event_id") + must.Equal(t, reqEventID, stateIDsEvent.EventID(), "unexpected event provided to /state_ids") + w.WriteHeader(200) + + var authChainIDs []string + for _, ev := range existingAuthChain { + authChainIDs = append(authChainIDs, ev.EventID()) + } + // include A,B,C,D + authChainIDs = append(authChainIDs, eventA.EventID(), eventB.EventID(), eventC.EventID(), eventD.EventID()) + // the current state is the same as before but with E as the member event for bob + var pduIDs []string + for _, ev := range srvRoom.AllCurrentState() { + if ev.Type() == spec.MRoomMember && ev.StateKeyEquals(bob) { + continue + } + pduIDs = append(pduIDs, ev.EventID()) + } + pduIDs = append(pduIDs, eventE.EventID()) + res := struct { + AuthChainIDs []string `json:"auth_chain_ids"` + PDUIDs []string `json:"pdu_ids"` + }{ + AuthChainIDs: authChainIDs, + PDUIDs: pduIDs, + } + var responseBytes []byte + responseBytes, err := json.Marshal(&res) + must.NotError(t, "failed to marshal response", err) + w.Write(responseBytes) + }) + eventBWaiter := helpers.NewWaiter() + // /state_ids will return some unknown events which the homeserver will try to fetch via /event + srv.Mux().Handle("/_matrix/federation/v1/event/{eventID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + eventID := vars["eventID"] + var event gomatrixserverlib.PDU + // find the event + for _, ev := range allEventsToShare { + if ev.EventID() == eventID { + event = ev + break + } + } + // we should see a request for event B + if eventID == eventB.EventID() { + eventBWaiter.Finish() + } + + if event == nil { + t.Logf("/event returning 404 for event %v", eventID) + w.WriteHeader(404) + w.Write([]byte(fmt.Sprintf(`complement: failed to find event: %s`, eventID))) + return + } + + txn := gomatrixserverlib.Transaction{ + Origin: spec.ServerName(srv.ServerName()), + OriginServerTS: spec.AsTimestamp(time.Now()), + PDUs: []json.RawMessage{ + event.JSON(), + }, + } + resp, err := json.Marshal(txn) + if err != nil { + w.WriteHeader(500) + w.Write([]byte(fmt.Sprintf(`complement: failed to marshal JSON response: %s`, err))) + return + } + w.WriteHeader(200) + w.Write(resp) + })) + + srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sendTxnEvent.JSON()}, nil) + + // wait for the server to make the requests + gmeWaiter.Wait(t, 5*time.Second) + stateIDWaiter.Wait(t, 5*time.Second) + eventBWaiter.Wait(t, 5*time.Second) + + // At this point all we know is that the server requested event B when doing /state_ids. + // We don't know if sendTxnEvent has been fully processed / the room state has been updated. + // If the server is functioning correctly, sendTxnEvent will never be delivered to the client + // as the server will be unable to fetch room state for it. So send another event as a sentinel. + // Wait until we see sendTxnEvent in the sync timeline before asserting that the room state is correct. + sentinelEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{ + Type: "m.room.message", + Sender: bob, + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "finished", + }, + PrevEvents: []string{bobOriginalJoinEvent.EventID()}, + AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), bobOriginalJoinEvent.EventID()}, + }) + srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sentinelEvent.JSON()}, nil) + alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, sentinelEvent.EventID())) + + // we should not see event E as the current state for bob. + content := alice.MustGetStateEventContent(t, roomID, spec.MRoomMember, bob) + t.Logf("bob's membership content: %v", content.Raw) + // assert bob's member event was his initial join, not any of the others. Technically you can argue A should be valid. + must.Equal(t, content.Get("displayname").Str, "", "Events C/D/E were processed when they should not have been as the server doesn't know B.") +}