Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions codecov_auth/tests/unit/test_authentication.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from http.cookies import SimpleCookie
from unittest.mock import call, patch
from unittest.mock import AsyncMock, call, patch

import pytest
from django.conf import settings
Expand Down Expand Up @@ -198,7 +198,9 @@ def test_impersonation(self):
assert res.json()["data"]["me"] == {"user": {"username": "impersonateme"}}

@patch("core.commands.repository.repository.RepositoryCommands.fetch_repository")
def test_impersonation_with_okta(self, mock_call_to_fetch_repository):
def test_impersonation_with_okta(
self, mock_call_to_fetch_repository, new_callable=AsyncMock
):
repo = RepositoryFactory(author=self.owner_to_impersonate, private=True)
query_repositories = """{ owner(username: "%s") { repository(name: "%s") { ... on Repository { name } } } }"""
query = query_repositories % (repo.author.username, repo.name)
Expand Down Expand Up @@ -228,12 +230,16 @@ def test_impersonation_with_okta(self, mock_call_to_fetch_repository):
repo.name,
[],
exclude_okta_enforced_repos=True,
needs_coverage=False,
needs_commits=False,
),
call(
self.owner_to_impersonate,
repo.name,
[],
exclude_okta_enforced_repos=False,
needs_coverage=False,
needs_commits=False,
),
]
)
Expand Down
19 changes: 7 additions & 12 deletions core/commands/repository/interactors/fetch_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,20 @@ def execute(
name: str,
okta_authenticated_accounts: list[int],
exclude_okta_enforced_repos: bool = True,
needs_coverage: bool = True,
needs_commits: bool = True,
) -> Repository | None:
queryset = Repository.objects.viewable_repos(self.current_owner)
if exclude_okta_enforced_repos:
queryset = queryset.exclude_accounts_enforced_okta(
okta_authenticated_accounts
)

# TODO(swatinem): We should find a way to avoid these combinators:
# The `with_recent_coverage` combinator is quite expensive.
# We only need that in case we want to query these props via graphql:
# `coverageAnalytics.{percentCovered,commitSha,hits,misses,lines}`
# Similarly, `with_oldest_commit_at` is only needed for `oldestCommitAt`.
if needs_coverage:
queryset = queryset.with_recent_coverage()
if needs_commits:
queryset = queryset.with_oldest_commit_at()

repo = (
queryset.filter(author=owner, name=name)
.with_recent_coverage()
.with_oldest_commit_at()
.select_related("author")
.first()
)
repo = queryset.filter(author=owner, name=name).select_related("author").first()

return repo
15 changes: 2 additions & 13 deletions core/commands/repository/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,8 @@


class RepositoryCommands(BaseCommand):
def fetch_repository(
self,
owner: Owner,
name: str,
okta_authenticated_accounts: list[int],
exclude_okta_enforced_repos: bool = True,
) -> Repository | None:
return self.get_interactor(FetchRepositoryInteractor).execute(
owner,
name,
okta_authenticated_accounts,
exclude_okta_enforced_repos=exclude_okta_enforced_repos,
)
def fetch_repository(self, *args, **kwargs) -> Repository | None:
return self.get_interactor(FetchRepositoryInteractor).execute(*args, **kwargs)

def regenerate_repository_upload_token(
self,
Expand Down
4 changes: 1 addition & 3 deletions core/commands/repository/tests/test_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ def setUp(self):
@patch("core.commands.repository.repository.FetchRepositoryInteractor.execute")
def test_fetch_repository_to_interactor(self, interactor_mock):
self.command.fetch_repository(self.org, self.repo.name, [])
interactor_mock.assert_called_once_with(
self.org, self.repo.name, [], exclude_okta_enforced_repos=True
)
interactor_mock.assert_called_once_with(self.org, self.repo.name, [])

@patch("core.commands.repository.repository.FetchRepositoryInteractor.execute")
def test_fetch_repository_to_interactor_with_enforcing_okta(self, interactor_mock):
Expand Down
59 changes: 59 additions & 0 deletions graphql_api/helpers/requested_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# This was adapted from <https://github.com/mirumee/ariadne/discussions/1116#discussioncomment-6508603>
from collections.abc import Generator, Iterable

from graphql import GraphQLResolveInfo
from graphql.language import (
FieldNode,
FragmentSpreadNode,
InlineFragmentNode,
SelectionNode,
)


def selected_fields(info: GraphQLResolveInfo) -> set[str]:
"""
Given a GraphQL "sub-query", this recursively collects all the queried fields.

For example, if the original GraphQL query looks like `owner { repository { name } }`,
this would resolve to `repository` and `repository.name`.

This function works by traversing the parts of the GraphQL Query AST which
are exposed to each "resolver".
"""
names: set[str] = set()
for node in info.field_nodes:
if node.selection_set is None:
continue

Check warning on line 26 in graphql_api/helpers/requested_fields.py

View check run for this annotation

Codecov Notifications / codecov/patch

graphql_api/helpers/requested_fields.py#L26

Added line #L26 was not covered by tests
names.update(_fields_from_selections(info, node.selection_set.selections))
return names


def _fields_from_selections(
info: GraphQLResolveInfo, selections: Iterable[SelectionNode]
) -> Generator[str, None, None]:
for selection in selections:
match selection:
case FieldNode():
name = selection.name.value
yield name

if selection.selection_set is not None:
yield from (
f"{name}.{field}"
for field in _fields_from_selections(
info, selection.selection_set.selections
)
)

case InlineFragmentNode():
yield from _fields_from_selections(
info, selection.selection_set.selections
)
case FragmentSpreadNode():
fragment = info.fragments[selection.name.value]
yield from _fields_from_selections(
info, fragment.selection_set.selections
)

case _:
raise NotImplementedError(f"field type {type(selection)} not supported")

Check warning on line 59 in graphql_api/helpers/requested_fields.py

View check run for this annotation

Codecov Notifications / codecov/patch

graphql_api/helpers/requested_fields.py#L58-L59

Added lines #L58 - L59 were not covered by tests
208 changes: 208 additions & 0 deletions graphql_api/tests/test_requested_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from graphql import GraphQLResolveInfo
from graphql.language import (
FragmentDefinitionNode,
OperationDefinitionNode,
parse,
)

from graphql_api.helpers.requested_fields import selected_fields


def parse_into_resolveinfo(source: str) -> GraphQLResolveInfo:
document = parse(source)

operation: OperationDefinitionNode | None = None
fragments: dict[str, FragmentDefinitionNode] = {}

for definition in document.definitions:
if isinstance(definition, OperationDefinitionNode):
operation = definition
elif isinstance(definition, FragmentDefinitionNode):
fragments[definition.name.value] = definition

assert operation
root_fields = [operation]

return GraphQLResolveInfo(
"__root__",
root_fields, # list[FieldNode]
None,
None,
None,
None,
fragments, # dict[str, FragmentDefinitionNode]
None,
None,
None,
None,
None,
)


QUERY_CoverageForFile = """
query CoverageForFile(
$owner: String!
$repo: String!
$ref: String!
$path: String!
$flags: [String]
$components: [String]
) {
owner(username: $owner) {
repository(name: $repo) {
__typename
... on Repository {
commit(id: $ref) {
...CoverageForFile
}
branch(name: $ref) {
name
head {
...CoverageForFile
}
}
}
... on NotFoundError {
message
}
... on OwnerNotActivatedError {
message
}
}
}
}

fragment CoverageForFile on Commit {
commitid
coverageAnalytics {
flagNames
components {
id
name
}
coverageFile(path: $path, flags: $flags, components: $components) {
hashedPath
content
coverage {
line
coverage
}
totals {
percentCovered # Absolute coverage of the commit
}
}
}
}
"""

QUERY_GetRepoConfigurationStatus = """
query GetRepoConfigurationStatus($owner: String!, $repo: String!) {
owner(username: $owner) {
plan {
isTeamPlan
}
repository(name:$repo) {
__typename
... on Repository {
coverageEnabled
bundleAnalysisEnabled
testAnalyticsEnabled
yaml
languages
coverageAnalytics {
flagsCount
componentsCount
}
}
... on NotFoundError {
message
}
... on OwnerNotActivatedError {
message
}
}
}
}
"""

QUERY_ReposForOwner = """
query ReposForOwner(
$filters: RepositorySetFilters!
$owner: String!
$ordering: RepositoryOrdering!
$direction: OrderingDirection!
$after: String
$first: Int
) {
owner(username: $owner) {
repositories(
filters: $filters
ordering: $ordering
orderingDirection: $direction
first: $first
after: $after
) {
edges {
node {
name
active
activated
private
coverageAnalytics {
percentCovered
lines
}
updatedAt
latestCommitAt
author {
username
}
coverageEnabled
bundleAnalysisEnabled
repositoryConfig {
indicationRange {
upperRange
lowerRange
}
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
}
"""


def test_requested_fields():
info = parse_into_resolveinfo(QUERY_CoverageForFile)
fields = selected_fields(info)

assert "owner.repository.branch.name" in fields
assert "owner.repository.branch.head.commitid" in fields
assert (
"owner.repository.branch.head.coverageAnalytics.coverageFile.totals.percentCovered"
in fields
)

assert "owner.repository.oldestCommitAt" not in fields
assert "owner.repository.coverageAnalytics.percentCovered" not in fields
assert "owner.repository.coverageAnalytics.commitSha" not in fields

info = parse_into_resolveinfo(QUERY_GetRepoConfigurationStatus)
fields = selected_fields(info)

assert "owner.repository.coverageAnalytics" in fields
assert "owner.repository.oldestCommitAt" not in fields
assert "owner.repository.coverageAnalytics.percentCovered" not in fields

info = parse_into_resolveinfo(QUERY_ReposForOwner)
fields = selected_fields(info)

assert "owner.repositories.edges.node.latestCommitAt" in fields
assert "owner.repositories.edges.node.oldestCommitAt" not in fields
assert "owner.repositories.edges.node.coverageAnalytics" in fields
assert "owner.repositories.edges.node.coverageAnalytics.percentCovered" in fields
Loading
Loading