Skip to content

Commit 4b83da2

Browse files
committed
Regression test for corrupted auth chains
1 parent bdeba28 commit 4b83da2

1 file changed

Lines changed: 286 additions & 0 deletions

File tree

tests/federation_room_get_missing_events_test.go

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/gorilla/mux"
1213
"github.com/matrix-org/complement"
1314
"github.com/matrix-org/gomatrixserverlib"
1415
"github.com/matrix-org/gomatrixserverlib/fclient"
@@ -18,6 +19,7 @@ import (
1819

1920
"github.com/matrix-org/complement/b"
2021
"github.com/matrix-org/complement/client"
22+
"github.com/matrix-org/complement/ct"
2123
"github.com/matrix-org/complement/federation"
2224
"github.com/matrix-org/complement/helpers"
2325
"github.com/matrix-org/complement/match"
@@ -598,3 +600,287 @@ func TestOutboundFederationEventSizeGetMissingEvents(t *testing.T) {
598600
// Alice should receive the sent event, even though the "bad" event has a too large state key
599601
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(room.RoomID, sentEvent.EventID()))
600602
}
603+
604+
// Test that if you respond to /state_ids, and fail some /event requests, we end up
605+
// with correctly persisted auth information for the event. This creates an _auth graph_ like so:
606+
//
607+
// A <- B <- C <- D <- E m.room.member,bob
608+
//
609+
// Complement needs the HS to hit /state_ids and /event for missing events so it does some work to manipulate this:
610+
// - it sends 100 unrelated state events. This ensures that any statistical analysis done on the number of missing events
611+
// in /state_ids means we will bias to using /event and not /state. The test needs /event.
612+
// - it sends an unrelated event to the HS with unknown prev_events.
613+
// - it returns an unrelated event for /get_missing_events.
614+
// - then /state_ids should be hit.
615+
//
616+
// When /state_ids is hit, we will include A,B,C,D,E in the response. This will be the first time the HS sees these events.
617+
// Because we've gamed the number of state events in the room, HSes _should_ hit /event for each event ID.
618+
// Now the actual test can begin:
619+
// - We fail the /event request for B.
620+
// - We ensure that we do not see C,D,E in the final room state.
621+
//
622+
// This is a regression test where a HS could have code which does the following:
623+
// - Sort events topologically (A,B,C,D,E)
624+
// - for each event, check you have the auth events and then auth it.
625+
// - If you don't have the auth events, drop it, else persist it (incl. whether it was rejected).
626+
//
627+
// This has a subtle bug IF "check you have the auth events" uses an in-memory event map AND dropping the event doesn't remove
628+
// the entry from that event map. If this happens: A is processed, B is missing, C is dropped due to missing B,
629+
// crucially D and E ARE PERSISTED because C exists in-memory.
630+
// This breaks the auth chain for the room, which matters when doing state resolution.
631+
func TestCorruptedAuthChain(t *testing.T) {
632+
deployment := complement.Deploy(t, 1)
633+
defer deployment.Destroy(t)
634+
635+
srv := federation.NewServer(t, deployment,
636+
federation.HandleKeyRequests(),
637+
federation.HandleMakeSendJoinRequests(),
638+
federation.HandleTransactionRequests(nil, nil),
639+
federation.HandleInviteRequests(nil),
640+
)
641+
srv.UnexpectedRequestsAreErrors = false // we expect to be pushed events
642+
cancel := srv.Listen()
643+
defer cancel()
644+
645+
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "alice"})
646+
sentinel := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "sentinel"})
647+
roomID := alice.MustCreateRoom(t, map[string]interface{}{
648+
"preset": "public_chat",
649+
"room_version": "10",
650+
})
651+
sentinel.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
652+
// pack out the room state
653+
for i := 0; i < 100; i++ {
654+
if i%2 == 0 {
655+
alice.MustLeaveRoom(t, roomID)
656+
} else {
657+
alice.MustJoinRoom(t, roomID, []spec.ServerName{"hs1"})
658+
}
659+
}
660+
bob := srv.UserID("bob")
661+
defaultImpl := federation.ServerRoomImplDefault{}
662+
var existingAuthChain []gomatrixserverlib.PDU
663+
srvRoom := srv.MustJoinRoom(t, deployment, spec.ServerName("hs1"), roomID, bob, federation.WithRoomOpts(federation.WithImpl(&federation.ServerRoomImplCustom{
664+
ServerRoomImplDefault: defaultImpl,
665+
PopulateFromSendJoinResponseFn: func(def federation.ServerRoomImpl, room *federation.ServerRoom, joinEvent gomatrixserverlib.PDU, resp fclient.RespSendJoin) {
666+
defaultImpl.PopulateFromSendJoinResponse(room, joinEvent, resp)
667+
existingAuthChain = resp.AuthEvents.TrustedEvents(joinEvent.Version(), false)
668+
},
669+
})))
670+
// we should have at least 100 events in the auth chain
671+
if len(existingAuthChain) < 100 {
672+
ct.Fatalf(t, "not enough events in the auth chain, got %d want >100", len(existingAuthChain))
673+
}
674+
createEvent := srvRoom.CurrentState(spec.MRoomCreate, "")
675+
plEvent := srvRoom.CurrentState(spec.MRoomPowerLevels, "")
676+
jrEvent := srvRoom.CurrentState(spec.MRoomJoinRules, "")
677+
678+
// Create A,B,C,D,E which will be profile changes for Bob
679+
eventA := srv.MustCreateEvent(t, srvRoom, federation.Event{
680+
Type: spec.MRoomMember,
681+
Sender: bob,
682+
StateKey: &bob,
683+
Content: map[string]interface{}{
684+
"membership": "join",
685+
"displayname": "A",
686+
},
687+
})
688+
eventB := srv.MustCreateEvent(t, srvRoom, federation.Event{
689+
Type: spec.MRoomMember,
690+
Sender: bob,
691+
StateKey: &bob,
692+
Content: map[string]interface{}{
693+
"membership": "join",
694+
"displayname": "B",
695+
},
696+
PrevEvents: []string{eventA.EventID()},
697+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventA.EventID()},
698+
})
699+
eventC := srv.MustCreateEvent(t, srvRoom, federation.Event{
700+
Type: spec.MRoomMember,
701+
Sender: bob,
702+
StateKey: &bob,
703+
Content: map[string]interface{}{
704+
"membership": "join",
705+
"displayname": "C",
706+
},
707+
PrevEvents: []string{eventB.EventID()},
708+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventB.EventID()},
709+
})
710+
eventD := srv.MustCreateEvent(t, srvRoom, federation.Event{
711+
Type: spec.MRoomMember,
712+
Sender: bob,
713+
StateKey: &bob,
714+
Content: map[string]interface{}{
715+
"membership": "join",
716+
"displayname": "D",
717+
},
718+
PrevEvents: []string{eventC.EventID()},
719+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventC.EventID()},
720+
})
721+
eventE := srv.MustCreateEvent(t, srvRoom, federation.Event{
722+
Type: spec.MRoomMember,
723+
Sender: bob,
724+
StateKey: &bob,
725+
Content: map[string]interface{}{
726+
"membership": "join",
727+
"displayname": "E",
728+
},
729+
PrevEvents: []string{eventD.EventID()},
730+
AuthEvents: []string{createEvent.EventID(), plEvent.EventID(), jrEvent.EventID(), eventD.EventID()},
731+
})
732+
srvRoom.AddEvent(eventE) // so we include this in auth_events for subsequent events below.
733+
734+
// Create 3 unrelated events (one for /send, one for /gme, one for /state_ids snapshot)
735+
stateIDsEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
736+
Type: "m.room.message",
737+
Sender: bob,
738+
Content: map[string]interface{}{
739+
"msgtype": "m.text",
740+
"body": "for /state_ids",
741+
},
742+
PrevEvents: []string{eventE.EventID()},
743+
})
744+
srvRoom.AddEvent(stateIDsEvent)
745+
gmeEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
746+
Type: "m.room.message",
747+
Sender: bob,
748+
Content: map[string]interface{}{
749+
"msgtype": "m.text",
750+
"body": "for /get_missing_events",
751+
},
752+
PrevEvents: []string{stateIDsEvent.EventID()},
753+
})
754+
srvRoom.AddEvent(gmeEvent)
755+
sendTxnEvent := srv.MustCreateEvent(t, srvRoom, federation.Event{
756+
Type: "m.room.message",
757+
Sender: bob,
758+
Content: map[string]interface{}{
759+
"msgtype": "m.text",
760+
"body": "for /send",
761+
},
762+
PrevEvents: []string{gmeEvent.EventID()},
763+
})
764+
srvRoom.AddEvent(sendTxnEvent)
765+
766+
// the possible events to return in /event. This omits B.
767+
allEvents := []gomatrixserverlib.PDU{
768+
stateIDsEvent, gmeEvent, sendTxnEvent, eventA, eventC, eventD, eventE,
769+
}
770+
t.Logf("event A: %s", eventA.EventID())
771+
t.Logf("event B: %s", eventB.EventID())
772+
t.Logf("event C: %s", eventC.EventID())
773+
t.Logf("event D: %s", eventD.EventID())
774+
t.Logf("event E: %s", eventE.EventID())
775+
776+
// add handlers for them
777+
gmeWaiter := helpers.NewWaiter()
778+
srv.Mux().HandleFunc("/_matrix/federation/v1/get_missing_events/{roomID}", func(w http.ResponseWriter, req *http.Request) {
779+
defer gmeWaiter.Finish()
780+
body := must.ParseJSON(t, req.Body)
781+
t.Logf("/get_missing_events req for room %s => %s", mux.Vars(req)["roomID"], body.Raw)
782+
must.Equal(t, body.Get("latest_events").Array()[0].String(), sendTxnEvent.EventID(), "unexpected event provided to /get_missing_events")
783+
w.WriteHeader(200)
784+
res := struct {
785+
Events []gomatrixserverlib.PDU `json:"events"`
786+
}{
787+
Events: []gomatrixserverlib.PDU{gmeEvent},
788+
}
789+
t.Logf("/get_missing_events req for room %s responding with %s in room %s", mux.Vars(req)["roomID"], res.Events[0].EventID(), res.Events[0].RoomID())
790+
var responseBytes []byte
791+
responseBytes, err := json.Marshal(&res)
792+
must.NotError(t, "failed to marshal response", err)
793+
w.Write(responseBytes)
794+
})
795+
stateIDWaiter := helpers.NewWaiter()
796+
srv.Mux().HandleFunc("/_matrix/federation/v1/state_ids/{roomID}", func(w http.ResponseWriter, req *http.Request) {
797+
defer stateIDWaiter.Finish()
798+
t.Logf("/state_ids req for room %s => %s", mux.Vars(req)["roomID"], req.URL.Query().Encode())
799+
reqEventID := req.URL.Query().Get("event_id")
800+
must.Equal(t, reqEventID, stateIDsEvent.EventID(), "unexpected event provided to /state_ids")
801+
w.WriteHeader(200)
802+
803+
var authChainIDs []string
804+
for _, ev := range existingAuthChain {
805+
authChainIDs = append(authChainIDs, ev.EventID())
806+
}
807+
// include A,B,C,D
808+
authChainIDs = append(authChainIDs, eventA.EventID(), eventB.EventID(), eventC.EventID(), eventD.EventID())
809+
// the current state is the same as before but with E as the member event for bob
810+
var pduIDs []string
811+
for _, ev := range srvRoom.AllCurrentState() {
812+
if ev.Type() == spec.MRoomMember && ev.StateKeyEquals(bob) {
813+
continue
814+
}
815+
pduIDs = append(pduIDs, ev.EventID())
816+
}
817+
pduIDs = append(pduIDs, eventE.EventID())
818+
res := struct {
819+
AuthChainIDs []string `json:"auth_chain_ids"`
820+
PDUIDs []string `json:"pdu_ids"`
821+
}{
822+
AuthChainIDs: authChainIDs,
823+
PDUIDs: pduIDs,
824+
}
825+
var responseBytes []byte
826+
responseBytes, err := json.Marshal(&res)
827+
must.NotError(t, "failed to marshal response", err)
828+
w.Write(responseBytes)
829+
})
830+
eventWaiter := helpers.NewWaiter()
831+
srv.Mux().Handle("/_matrix/federation/v1/event/{eventID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
832+
vars := mux.Vars(req)
833+
eventID := vars["eventID"]
834+
var event gomatrixserverlib.PDU
835+
// find the event
836+
for _, ev := range allEvents {
837+
if ev.EventID() == eventID {
838+
event = ev
839+
break
840+
}
841+
}
842+
// we should see a request for event B
843+
if eventID == eventB.EventID() {
844+
eventWaiter.Finish()
845+
}
846+
847+
if event == nil {
848+
t.Logf("/event returning 404 for event %v", eventID)
849+
w.WriteHeader(404)
850+
w.Write([]byte(fmt.Sprintf(`complement: failed to find event: %s`, eventID)))
851+
return
852+
}
853+
854+
txn := gomatrixserverlib.Transaction{
855+
Origin: spec.ServerName(srv.ServerName()),
856+
OriginServerTS: spec.AsTimestamp(time.Now()),
857+
PDUs: []json.RawMessage{
858+
event.JSON(),
859+
},
860+
}
861+
resp, err := json.Marshal(txn)
862+
if err != nil {
863+
w.WriteHeader(500)
864+
w.Write([]byte(fmt.Sprintf(`complement: failed to marshal JSON response: %s`, err)))
865+
return
866+
}
867+
w.WriteHeader(200)
868+
w.Write(resp)
869+
}))
870+
871+
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{sendTxnEvent.JSON()}, nil)
872+
873+
// wait for the server to make the requests
874+
gmeWaiter.Wait(t, 5*time.Second)
875+
stateIDWaiter.Wait(t, 5*time.Second)
876+
eventWaiter.Wait(t, 5*time.Second)
877+
878+
// let things settle
879+
time.Sleep(time.Second)
880+
881+
// we should not see event E as the current state for bob.
882+
content := alice.MustGetStateEventContent(t, roomID, spec.MRoomMember, bob)
883+
t.Logf("bob's membership content: %v", content.Raw)
884+
// assert bob's member event was his initial join, not any of the others. Technically you can argue A should be valid.
885+
must.Equal(t, content.Get("displayname").Str, "", "Events C/D/E were processed when they should not have been as the server doesn't know B.")
886+
}

0 commit comments

Comments
 (0)