diff --git a/accesscontrol/casbin_rbac.go b/accesscontrol/casbin_rbac.go index d3e812e06..f18d32ba8 100644 --- a/accesscontrol/casbin_rbac.go +++ b/accesscontrol/casbin_rbac.go @@ -21,6 +21,7 @@ import ( "log/slog" "os" "strings" + "sync" "github.com/casbin/casbin/v3" gormadapter "github.com/casbin/gorm-adapter/v3" @@ -34,6 +35,10 @@ import ( var _ shared.AccessControl = &casbinRBAC{} var casbinEnforcer *casbin.ContextEnforcer +// protect against concurrent access on shared rbac structures like maps +// in practical terms this means that whenever we call a function of the casbin context enforcer, we wrap the call inside a mutex lock and unlock +var concurrencyMutex sync.RWMutex + type casbinRBAC struct { domain string // scopes this to a specific domain - or organization enforcer *casbin.ContextEnforcer @@ -55,7 +60,9 @@ func (c *casbinRBAC) GetExternalEntityProviderID() *string { } func (c *casbinRBAC) GetOwnerOfOrganization() (string, error) { + concurrencyMutex.Lock() listOfUsers := c.enforcer.GetUsersForRoleInDomain("role::owner", "domain::"+c.domain) + concurrencyMutex.Unlock() if len(listOfUsers) == 0 { return "", fmt.Errorf("no owner found for organization") } @@ -66,7 +73,9 @@ func (c *casbinRBAC) GetOwnerOfOrganization() (string, error) { } func (c *casbinRBAC) GetAllMembersOfOrganization() ([]string, error) { + concurrencyMutex.Lock() users, err := c.enforcer.GetAllUsersByDomain("domain::" + c.domain) + concurrencyMutex.Unlock() if err != nil { return nil, err } @@ -78,7 +87,9 @@ func (c *casbinRBAC) GetAllMembersOfOrganization() ([]string, error) { } func (c *casbinRBAC) GetAllMembersOfProject(projectID string) ([]string, error) { + concurrencyMutex.Lock() users, err := c.enforcer.GetImplicitUsersForRole("project::"+projectID+"|role::member", "domain::"+c.domain) + concurrencyMutex.Unlock() if err != nil { return nil, err } @@ -90,7 +101,9 @@ func (c *casbinRBAC) GetAllMembersOfProject(projectID string) ([]string, error) } func (c *casbinRBAC) GetAllMembersOfAsset(assetID string) ([]string, error) { + concurrencyMutex.Lock() users, err := c.enforcer.GetImplicitUsersForRole("asset::"+assetID+"|role::member", "domain::"+c.domain) + concurrencyMutex.Unlock() if err != nil { return nil, err } @@ -102,13 +115,17 @@ func (c *casbinRBAC) GetAllMembersOfAsset(assetID string) ([]string, error) { } func (c *casbinRBAC) HasAccess(ctx context.Context, user string) (bool, error) { + concurrencyMutex.Lock() roles := c.enforcer.GetRolesForUserInDomain("user::"+user, "domain::"+c.domain) + concurrencyMutex.Unlock() return len(roles) > 0, nil } func (c *casbinRBAC) GetAllProjectsForUser(user string) ([]string, error) { projectIDs := []string{} + concurrencyMutex.Lock() roles, _ := c.enforcer.GetImplicitRolesForUser("user::"+user, "domain::"+c.domain) + concurrencyMutex.Unlock() for _, role := range roles { if !strings.HasPrefix(role, "project::") || !strings.Contains(role, "role::") { continue @@ -120,7 +137,9 @@ func (c *casbinRBAC) GetAllProjectsForUser(user string) ([]string, error) { func (c *casbinRBAC) GetAllAssetsForUser(user string) ([]string, error) { assetIDs := []string{} + concurrencyMutex.Lock() roles, _ := c.enforcer.GetImplicitRolesForUser("user::"+user, "domain::"+c.domain) + concurrencyMutex.Unlock() for _, role := range roles { if !strings.HasPrefix(role, "asset::") || !strings.Contains(role, "role::") { continue @@ -131,7 +150,9 @@ func (c *casbinRBAC) GetAllAssetsForUser(user string) ([]string, error) { } func (c *casbinRBAC) GetAllRoles(user string) []string { + concurrencyMutex.Lock() roles, err := c.enforcer.GetImplicitRolesForUser("user::"+user, "domain::"+c.domain) + concurrencyMutex.Unlock() if err != nil { slog.Error("GetAllRoles failed", "err", err) return []string{} @@ -204,32 +225,44 @@ func (c *casbinRBAC) getAssetRoleName(role shared.Role, asset string) string { } func (c *casbinRBAC) GrantRole(ctx context.Context, user string, role shared.Role) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, "user::"+user, "role::"+string(role), "domain::"+c.domain) return err } func (c *casbinRBAC) RevokeRole(ctx context.Context, user string, role shared.Role) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.DeleteRoleForUserInDomainCtx(ctx, "user::"+user, "role::"+string(role), "domain::"+c.domain) return err } func (c *casbinRBAC) GrantRoleInProject(ctx context.Context, user string, role shared.Role, project string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, "user::"+user, "project::"+project+"|role::"+string(role), "domain::"+c.domain) return err } func (c *casbinRBAC) GrantRoleInAsset(ctx context.Context, user string, role shared.Role, asset string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, "user::"+user, "asset::"+asset+"|role::"+string(role), "domain::"+c.domain) return err } func (c *casbinRBAC) RevokeRoleInProject(ctx context.Context, user string, role shared.Role, project string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.DeleteRoleForUserInDomainCtx(ctx, "user::"+user, "project::"+project+"|role::"+string(role), "domain::"+c.domain) return err } -func (c *casbinRBAC) RevokeRoleInAsset(ctx context.Context, user string, role shared.Role, project string) error { - _, err := c.enforcer.DeleteRoleForUserInDomainCtx(ctx, "user::"+user, "asset::"+project+"|role::"+string(role), "domain::"+c.domain) +func (c *casbinRBAC) RevokeRoleInAsset(ctx context.Context, user string, role shared.Role, asset string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() + _, err := c.enforcer.DeleteRoleForUserInDomainCtx(ctx, "user::"+user, "asset::"+asset+"|role::"+string(role), "domain::"+c.domain) return err } @@ -252,31 +285,43 @@ func (c *casbinRBAC) RevokeAllRolesInAssetForUser(ctx context.Context, user stri } func (c *casbinRBAC) InheritRole(ctx context.Context, roleWhichGetsPermissions, roleWhichProvidesPermissions shared.Role) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, "role::"+string(roleWhichGetsPermissions), "role::"+string(roleWhichProvidesPermissions), "domain::"+c.domain) return err } func (c *casbinRBAC) InheritProjectRole(ctx context.Context, roleWhichGetsPermissions, roleWhichProvidesPermissions shared.Role, project string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, c.getProjectRoleName(roleWhichGetsPermissions, project), c.getProjectRoleName(roleWhichProvidesPermissions, project), "domain::"+c.domain) return err } func (c *casbinRBAC) InheritAssetRole(ctx context.Context, roleWhichGetsPermissions, roleWhichProvidesPermissions shared.Role, asset string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, c.getAssetRoleName(roleWhichGetsPermissions, asset), c.getAssetRoleName(roleWhichProvidesPermissions, asset), "domain::"+c.domain) return err } func (c *casbinRBAC) InheritProjectRolesAcrossProjects(ctx context.Context, roleWhichGetsPermissions, roleWhichProvidesPermissions shared.ProjectRole) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, c.getProjectRoleName(roleWhichGetsPermissions.Role, roleWhichGetsPermissions.Project), c.getProjectRoleName(roleWhichProvidesPermissions.Role, roleWhichProvidesPermissions.Project), "domain::"+c.domain) return err } func (c *casbinRBAC) LinkDomainAndProjectRole(ctx context.Context, domainRoleWhichGetsPermission, projectRoleWhichProvidesPermissions shared.Role, project string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, "role::"+string(domainRoleWhichGetsPermission), c.getProjectRoleName(projectRoleWhichProvidesPermissions, project), "domain::"+c.domain) return err } func (c *casbinRBAC) LinkProjectAndAssetRole(ctx context.Context, projectRoleWhichGetsPermission, assetRoleWhichProvidesPermissions shared.Role, project string, asset string) error { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddRoleForUserInDomainCtx(ctx, c.getProjectRoleName(projectRoleWhichGetsPermission, project), c.getAssetRoleName(assetRoleWhichProvidesPermissions, asset), "domain::"+c.domain) return err } @@ -286,6 +331,8 @@ func (c *casbinRBAC) AllowRole(ctx context.Context, role shared.Role, object sha for i, ac := range action { policies[i] = []string{"role::" + string(role), "domain::" + c.domain, "obj::" + string(object), "act::" + string(ac)} } + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddPoliciesCtx(ctx, policies) return err } @@ -295,6 +342,8 @@ func (c *casbinRBAC) AllowRoleInProject(ctx context.Context, project string, rol for i, ac := range action { policies[i] = []string{"project::" + project + "|role::" + string(role), "domain::" + c.domain, "project::" + project + "|obj::" + string(object), "act::" + string(ac)} } + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddPoliciesCtx(ctx, policies) return err } @@ -304,12 +353,16 @@ func (c *casbinRBAC) AllowRoleInAsset(ctx context.Context, asset string, role sh for i, ac := range action { policies[i] = []string{"asset::" + asset + "|role::" + string(role), "domain::" + c.domain, "asset::" + asset + "|obj::" + string(object), "act::" + string(ac)} } + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() _, err := c.enforcer.AddPoliciesCtx(ctx, policies) return err } func (c *casbinRBAC) IsAllowed(ctx context.Context, user string, object shared.Object, action shared.Action) (bool, error) { + concurrencyMutex.Lock() permissions, err := c.enforcer.GetImplicitPermissionsForUser("user::"+user, "domain::"+c.domain) + concurrencyMutex.Unlock() if err != nil { return false, err } @@ -322,7 +375,9 @@ func (c *casbinRBAC) IsAllowed(ctx context.Context, user string, object shared.O } func (c *casbinRBAC) IsAllowedInProject(ctx context.Context, project *models.Project, user string, object shared.Object, action shared.Action) (bool, error) { + concurrencyMutex.Lock() permissions, err := c.enforcer.GetImplicitPermissionsForUser("user::"+user, "domain::"+c.domain) + concurrencyMutex.Unlock() if err != nil { return false, err } @@ -336,7 +391,9 @@ func (c *casbinRBAC) IsAllowedInProject(ctx context.Context, project *models.Pro } func (c *casbinRBAC) IsAllowedInAsset(ctx context.Context, asset *models.Asset, user string, object shared.Object, action shared.Action) (bool, error) { + concurrencyMutex.Lock() permissions, err := c.enforcer.GetImplicitPermissionsForUser("user::"+user, "domain::"+c.domain) + concurrencyMutex.Unlock() if err != nil { return false, err } @@ -350,7 +407,9 @@ func (c *casbinRBAC) IsAllowedInAsset(ctx context.Context, asset *models.Asset, } func (c casbinRBACProvider) DomainsOfUser(user string) ([]string, error) { + concurrencyMutex.Lock() domains, err := c.enforcer.GetDomainsForUser("user::" + user) + concurrencyMutex.Unlock() if err != nil { return nil, err } @@ -396,6 +455,8 @@ func buildEnforcer(db *gorm.DB, broker shared.PubSubBroker) (*casbin.ContextEnfo return nil, fmt.Errorf("could not set watcher: %w", err) } err = watcher.SetUpdateCallback(func(string) { + concurrencyMutex.Lock() + defer concurrencyMutex.Unlock() err := e.LoadPolicy() if err != nil { slog.Error("error while loading policy after update", "err", err) diff --git a/accesscontrol/casbin_rbac_test.go b/accesscontrol/casbin_rbac_test.go new file mode 100644 index 000000000..cd97f729c --- /dev/null +++ b/accesscontrol/casbin_rbac_test.go @@ -0,0 +1,170 @@ +package accesscontrol + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/casbin/casbin/v3" + casbinModel "github.com/casbin/casbin/v3/model" + "github.com/casbin/casbin/v3/persist" + "github.com/l3montree-dev/devguard/shared" +) + +// noopAdapter is a minimal adapter for tests. Policy rules are kept +// in-memory by casbin's model layer; this adapter only satisfies the interface. +type noopAdapter struct{} + +func (noopAdapter) LoadPolicy(_ casbinModel.Model) error { return nil } +func (noopAdapter) SavePolicy(_ casbinModel.Model) error { return nil } +func (noopAdapter) AddPolicy(_, _ string, _ []string) error { return nil } +func (noopAdapter) RemovePolicy(_, _ string, _ []string) error { return nil } +func (noopAdapter) RemoveFilteredPolicy(_, _ string, _ int, _ ...string) error { return nil } +func (noopAdapter) LoadPolicyCtx(_ context.Context, _ casbinModel.Model) error { return nil } +func (noopAdapter) SavePolicyCtx(_ context.Context, _ casbinModel.Model) error { return nil } +func (noopAdapter) AddPolicyCtx(_ context.Context, _, _ string, _ []string) error { return nil } +func (noopAdapter) RemovePolicyCtx(_ context.Context, _, _ string, _ []string) error { return nil } +func (noopAdapter) RemoveFilteredPolicyCtx(_ context.Context, _, _ string, _ int, _ ...string) error { + return nil +} + +var _ persist.ContextAdapter = noopAdapter{} + +// testModelText mirrors config/rbac_model.conf so tests are self-contained. +const testModelText = ` +[request_definition] +r = sub, dom, obj, act + +[policy_definition] +p = sub, dom, obj, act + +[role_definition] +g = _, _, _ + +[policy_effect] +e = some(where (p.eft == allow)) + +[matchers] +m = g(r.sub, p.sub, r.dom) && r.dom == p.dom && r.obj == p.obj && r.act == p.act +` + +func newTestEnforcer(t *testing.T) *casbin.ContextEnforcer { + t.Helper() + m, err := casbinModel.NewModelFromString(testModelText) + if err != nil { + t.Fatalf("create casbin model: %v", err) + } + iface, err := casbin.NewContextEnforcer(m, noopAdapter{}) + if err != nil { + t.Fatalf("create casbin enforcer: %v", err) + } + return iface.(*casbin.ContextEnforcer) +} + +func newTestCasbinRBAC(t *testing.T, domain string) *casbinRBAC { + t.Helper() + return &casbinRBAC{domain: domain, enforcer: newTestEnforcer(t)} +} + +// TestCasbinRBAC_ConcurrentWrites is a regression test for the panic: +// "fatal error: concurrent map read and map write" inside casbin's Model.RemovePolicy. +// Before the casbinMu fix, concurrent goroutines (e.g. different users triggering sync +// via ErrGroup) wrote to casbin's internal policy maps without any synchronisation. +func TestCasbinRBAC_ConcurrentWrites(t *testing.T) { + rbac := newTestCasbinRBAC(t, "org-1") + + const goroutines = 30 + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := 0; i < goroutines; i++ { + i := i + go func() { + defer wg.Done() + user := fmt.Sprintf("user-%d", i) + project := fmt.Sprintf("project-%d", i%5) + _ = rbac.GrantRoleInProject(context.Background(), user, shared.RoleMember, project) + _ = rbac.RevokeRoleInProject(context.Background(), user, shared.RoleMember, project) + }() + } + wg.Wait() +} + +func TestCasbinRBAC_ConcurrentReads(t *testing.T) { + rbac := newTestCasbinRBAC(t, "org-1") + + // Seed some data first. + for i := 0; i < 5; i++ { + _ = rbac.GrantRoleInProject(context.Background(), fmt.Sprintf("user-%d", i), shared.RoleMember, "project-0") + } + + const goroutines = 30 + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := 0; i < goroutines; i++ { + i := i + go func() { + defer wg.Done() + user := fmt.Sprintf("user-%d", i%5) + _ = rbac.GetAllRoles(user) + _, _ = rbac.GetAllProjectsForUser(user) + }() + } + wg.Wait() +} + +func TestCasbinRBAC_ConcurrentReadsAndWrites(t *testing.T) { + rbac := newTestCasbinRBAC(t, "org-1") + + const goroutines = 40 + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := 0; i < goroutines; i++ { + i := i + go func() { + defer wg.Done() + user := fmt.Sprintf("user-%d", i%10) + project := fmt.Sprintf("project-%d", i%3) + if i%2 == 0 { + _ = rbac.GrantRoleInProject(context.Background(), user, shared.RoleMember, project) + } else { + _ = rbac.GetAllRoles(user) + _, _ = rbac.GetAllProjectsForUser(user) + } + }() + } + wg.Wait() +} + +// TestCasbinRBAC_TwoUsersConcurrentOrgSync mirrors the exact scenario from the panic: +// two users trigger RefreshExternalEntityProviderProjects for the same org simultaneously. +// singleflight deduplicates per org+user, so both goroutines run concurrently and both +// call casbin write operations on the shared enforcer. +func TestCasbinRBAC_TwoUsersConcurrentOrgSync(t *testing.T) { + // Both users share the same enforcer (same org, different singleflight keys). + sharedEnforcer := newTestEnforcer(t) + + user1rbac := &casbinRBAC{domain: "org-1", enforcer: sharedEnforcer} + user2rbac := &casbinRBAC{domain: "org-1", enforcer: sharedEnforcer} + + projects := []string{"proj-a", "proj-b", "proj-c", "proj-d", "proj-e"} + + var wg sync.WaitGroup + for _, rbac := range []*casbinRBAC{user1rbac, user2rbac} { + rbac := rbac + wg.Add(1) + go func() { + defer wg.Done() + // Simulate what syncProjectsAndAssets does: grant + read roles per project. + for _, project := range projects { + _ = rbac.GrantRoleInProject(context.Background(), "user", shared.RoleMember, project) + _ = rbac.GetAllRoles("user") + _ = rbac.RevokeAllRolesInProjectForUser(context.Background(), "user", project) + } + }() + } + wg.Wait() +}