forked from SpecterOps/openhound-github
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrepository.py
More file actions
286 lines (261 loc) · 10.8 KB
/
Copy pathrepository.py
File metadata and controls
286 lines (261 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
from dataclasses import dataclass, field
from typing import ClassVar
from dlt.common.libs.pydantic import DltConfig
from openhound.core.asset import BaseAsset, EdgeDef, NodeDef
from openhound.core.models.entries_dataclass import Edge, EdgePath, EdgeProperties
from pydantic import BaseModel, Field
from openhound_github.graph import GHNode, GHNodeProperties
from openhound_github.kinds import edges as ek
from openhound_github.kinds import nodes as nk
from openhound_github.main import app
@dataclass
class GHRepositoryProperties(GHNodeProperties):
"""Repository-specific properties and accordion panel queries."""
collected: bool = field(
default=True, metadata={"description": "Collected/generated by OpenHound"}
)
# TODO: Check owner_node_id
full_name: str = field(
default="",
metadata={"description": "The fully qualified name (e.g., `org/repo`)."},
)
private: bool | None = field(
default=None, metadata={"description": "Whether the repository is private."}
)
html_url: str | None = field(
default=None, metadata={"description": "URL to the repository on GitHub."}
)
description: str | None = field(
default=None, metadata={"description": "The repository description."}
)
created_at: str | None = field(
default=None, metadata={"description": "When the repository was created."}
)
updated_at: str | None = field(
default=None, metadata={"description": "When the repository was last updated."}
)
pushed_at: str | None = field(
default=None, metadata={"description": "When the repository last had a push."}
)
archived: bool | None = field(
default=None, metadata={"description": "Whether the repository is archived."}
)
disabled: bool | None = field(
default=None, metadata={"description": "Whether the repository is disabled."}
)
visibility: str | None = field(
default=None,
metadata={
"description": "The visibility level: `public`, `private`, or `internal`."
},
)
default_branch: str | None = field(
default=None,
metadata={"description": "The name of the default branch (e.g., `main`)."},
)
open_issues_count: int | None = field(
default=None, metadata={"description": "Number of open issues."}
)
allow_forking: bool | None = field(
default=None, metadata={"description": "Whether forking is allowed."}
)
web_commit_signoff_required: bool | None = field(
default=None,
metadata={"description": "Whether web-based commits require sign-off."},
)
forks: int | None = field(
default=None, metadata={"description": "Number of forks."}
)
open_issues: int | None = field(
default=None,
metadata={"description": "Number of open issues (includes pull requests)."},
)
watchers: int | None = field(
default=None, metadata={"description": "Number of watchers."}
)
owner_name: str = field(
default="", metadata={"description": "The login of the repository owner."}
)
owner_id: str = ""
environment_name: str = field(
default="",
metadata={"description": "The name of the environment (GitHub organization)."},
)
actions_enabled: bool | None = field(
default=None,
metadata={
"description": "Whether GitHub Actions is enabled for this repository."
},
)
self_hosted_runners_enabled: bool | None = field(
default=None,
metadata={
"description": "Whether the repository may use self-hosted runners."
},
)
secret_scanning: str | None = field(
default=None,
metadata={
"description": "Status of secret scanning (e.g., `enabled`, `disabled`)."
},
)
query_branches: str = ""
query_protected_branches: str = ""
query_branch_protection_rules: str = ""
query_roles: str = ""
query_teams: str = ""
query_workflows: str = ""
query_runners: str = ""
query_environments: str = ""
query_secrets: str = ""
query_variables: str = ""
query_secret_scanning_alerts: str = ""
query_explicit_readers: str = ""
query_unrolled_readers: str = ""
query_explicit_writers: str = ""
query_unrolled_writers: str = ""
class Owner(BaseModel):
login: str
id: int
node_id: str
avatar_url: str
gravatar_id: str
url: str
html_url: str
followers_url: str
following_url: str
gists_url: str
starred_url: str
subscriptions_url: str
organizations_url: str
repos_url: str
events_url: str
received_events_url: str
type: str
site_admin: bool
class PageInfo(BaseModel):
end_cursor: str | None = Field(alias="endCursor", default=None)
has_next_page: bool = Field(alias="hasNextPage")
class Branch(BaseModel):
id: str
name: str
target: dict
branch_protection_rule: dict | None = Field(
alias="branchProtectionRule", default=None
)
class Ref(BaseModel):
page_info: PageInfo = Field(alias="pageInfo")
nodes: list[Branch]
class RepositoryQL(BaseModel):
id: str
name: str
refs: Ref
dlt_config: ClassVar[DltConfig] = {"return_validated_models": True}
@app.asset(
node=NodeDef(
kind=nk.REPOSITORY,
description="GitHub Repository",
icon="box-archive",
properties=GHRepositoryProperties,
),
edges=[
EdgeDef(
start=nk.ORGANIZATION,
end=nk.REPOSITORY,
kind=ek.OWNS,
description="Org owns repository",
traversable=True,
),
],
)
class Repository(BaseAsset):
"""One record from the `repositories` DLT table → one GH_Repository node + GH_Owns edge from owner."""
dlt_config: ClassVar[DltConfig] = {"return_validated_models": True}
id: int
node_id: str
name: str
full_name: str
private: bool
owner: Owner
html_url: str | None = None
description: str | None = None
created_at: str | None = None
updated_at: str | None = None
pushed_at: str | None = None
archived: bool | None = None
disabled: bool | None = None
visibility: str | None = None
default_branch: str | None = None
open_issues_count: int | None = None
allow_forking: bool | None = None
web_commit_signoff_required: bool | None = None
forks: int | None = None
open_issues: int | None = None
watchers: int | None = None
actions_enabled: bool | None = None
self_hosted_runners_enabled: bool | None = None
@property
def owner_id(self) -> str:
return self.owner.node_id
@property
def owner_name(self) -> str:
return self.owner.login
@property
def as_node(self) -> GHNode:
rid = self.node_id
return GHNode(
kinds=[nk.REPOSITORY],
properties=GHRepositoryProperties(
name=self.name,
displayname=self.full_name,
node_id=rid,
full_name=self.full_name,
private=self.private,
html_url=self.html_url,
description=self.description,
created_at=self.created_at,
updated_at=self.updated_at,
pushed_at=self.pushed_at,
archived=self.archived,
disabled=self.disabled,
visibility=self.visibility,
default_branch=self.default_branch,
open_issues_count=self.open_issues_count,
allow_forking=self.allow_forking,
web_commit_signoff_required=self.web_commit_signoff_required,
forks=self.forks,
open_issues=self.open_issues,
watchers=self.watchers,
owner_name=self.owner_name or "",
owner_id=self.owner_id or "",
environment_name=self._lookup.org_login(),
environmentid=self._lookup.org_id(),
actions_enabled=self.actions_enabled,
self_hosted_runners_enabled=self.self_hosted_runners_enabled,
# secret_scanning=self.secret_scanning,
query_branches=f"MATCH p=(:GH_Repository {{node_id: '{rid}'}})-[:GH_HasBranch]->(:GH_Branch) RETURN p",
query_protected_branches=f"MATCH p=(:GH_Repository {{node_id: '{rid}'}})-[:GH_HasBranch]->(:GH_Branch)<-[:GH_ProtectedBy]-(:GH_BranchProtectionRule) RETURN p",
query_branch_protection_rules=f"MATCH p=(:GH_Repository {{node_id: '{rid}'}})-[:GH_Contains]->(:GH_BranchProtectionRule) RETURN p",
query_roles=f"MATCH p=(:GH_RepoRole)-[*1..]->(:GH_Repository {{node_id: '{rid}'}}) RETURN p",
query_teams=f"MATCH p=(:GH_Team)-[:GH_MemberOf|GH_HasRole*1..]->(:GH_RepoRole)-[]->(:GH_Repository {{node_id: '{rid}'}}) RETURN p",
query_workflows=f"MATCH p=(:GH_Repository {{node_id:'{rid}'}})-[:GH_HasWorkflow]->(w:GH_Workflow) RETURN p",
query_runners=f"MATCH p=(:GH_Repository {{node_id:'{rid}'}})-[:GH_CanUseRunner]->(:GH_Runner) RETURN p",
query_environments=f"MATCH p=(:GH_Repository {{node_id: '{rid}'}})-[:GH_HasEnvironment]->(:GH_Environment) RETURN p",
query_secrets=f"MATCH p=(:GH_Repository {{node_id:'{rid}'}})-[:GH_HasSecret]->(:GH_Secret) RETURN p",
query_variables=f"MATCH p=(:GH_Repository {{node_id:'{rid}'}})-[:GH_HasVariable]->(:GH_Variable) RETURN p",
query_secret_scanning_alerts=f"MATCH p=(:GH_Repository {{node_id:'{rid}'}})-[:GH_HasSecretScanningAlert]->(:GH_SecretScanningAlert) RETURN p",
query_explicit_readers=f"MATCH p=(role:GH_Role)-[:GH_HasBaseRole|GH_ReadRepoContents*1..]->(r:GH_Repository {{node_id:'{rid}'}}) MATCH p1=(role)<-[:GH_HasRole]-(:GH_User) RETURN p,p1",
query_unrolled_readers=f"MATCH p=(role:GH_Role)-[:GH_HasRole|GH_HasBaseRole|GH_MemberOf|GH_ReadRepoContents*1..]->(r:GH_Repository {{node_id:'{rid}'}}) MATCH p1=(role)<-[:GH_HasRole]-(:GH_User) RETURN p,p1",
query_explicit_writers=f"MATCH p=(role:GH_Role)-[:GH_HasBaseRole|GH_WriteRepoContents|GH_WriteRepoPullRequests*1..]->(r:GH_Repository {{node_id:'{rid}'}}) MATCH p1=(role)<-[:GH_HasRole]-(:GH_User) RETURN p,p1",
query_unrolled_writers=f"MATCH p=(role:GH_Role)-[:GH_HasRole|GH_HasBaseRole|GH_MemberOf|GH_WriteRepoContents|GH_WriteRepoPullRequests*1..]->(r:GH_Repository {{node_id:'{rid}'}}) MATCH p1=(role)<-[:GH_HasRole]-(:GH_User) RETURN p,p1",
),
)
@property
def edges(self):
if self.owner_id:
yield Edge(
kind=ek.OWNS,
start=EdgePath(value=self.owner_id, match_by="id"),
end=EdgePath(value=self.node_id, match_by="id"),
properties=EdgeProperties(traversable=True),
)