Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
# get database
db = client[MONGO_DATABASE]
membersdb = db.members
certificatesdb = db.certificates

try:
# check if the members index exists
Expand All @@ -50,3 +51,12 @@
print(membersdb.index_information())
except Exception:
pass

try:
if "unique_certificate_number" not in certificatesdb.index_information():
certificatesdb.create_index(
[("certificate_number", 1)], unique=True, name="unique_certificate_number"
)
print("The certificate_number index was created.")
except Exception:
pass
70 changes: 69 additions & 1 deletion models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from typing import Any, List
import pytz
import strawberry
from datetime import datetime
from enum import Enum

from bson import ObjectId
from pydantic import (
Expand Down Expand Up @@ -38,6 +42,69 @@ def validate(cls, v):
def __get_pydantic_json_schema__(cls, field_schema):
field_schema.update(type="string")

def get_ist_time():
"""
Function to get the current time in IST
"""
return datetime.now(pytz.timezone("Asia/Kolkata")).isoformat()


@strawberry.enum
class CertificateStates(Enum):
pending_cc = "pending_cc"
pending_slo = "pending_slo"
approved = "approved"
rejected = "rejected"


@strawberry.type
class Certificate_Status:
requested_at: str = Field(default_factory=get_ist_time)

cc_approved_at: str | None = None
cc_approver: str | None = None

slo_approved_at: str | None = None
slo_approver: str | None = None

def __init__(
self,
requested_at: str = get_ist_time(),
cc_approved_at: str | None = None,
cc_approver: str | None = None,
slo_approved_at: str | None = None,
slo_approver: str | None = None,
) -> None:
self.requested_at = requested_at

self.cc_approved_at = cc_approved_at
self.cc_approver = cc_approver

self.slo_approved_at = slo_approved_at
self.slo_approver = slo_approver


class Certificate(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
user_id: str
certificate_number: str
status: Certificate_Status = Certificate_Status()
state: CertificateStates = CertificateStates.pending_cc
certificate_data: str
key: str
request_reason: str | None = None

model_config = ConfigDict(
arbitrary_types_allowed=True,
str_strip_whitespace=True,
str_max_length=3000,
validate_assignment=True,
validate_default=True,
validate_return=True,
extra="forbid",
json_encoders={ObjectId: str},
populate_by_name=True,
)

class Roles(BaseModel):
"""
Expand Down Expand Up @@ -125,7 +192,8 @@ class Member(BaseModel):
)

poc: bool = Field(default_factory=(lambda: 0 == 1), description="Club POC")

certificates: List[Certificate] = Field(default_factory=list)

@field_validator("uid", mode="before")
@classmethod
def transform_uid(cls, v):
Expand Down
210 changes: 204 additions & 6 deletions mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,30 @@
Mutation resolvers
"""

from datetime import datetime
from datetime import datetime, timedelta
from os import getenv

import json
import pytz
import strawberry
from fastapi.encoders import jsonable_encoder

from db import membersdb
from models import Member
from db import membersdb, certificatesdb
from models import Member, Certificate, CertificateStates

# import all models and types
from otypes import FullMemberInput, Info, MemberType, SimpleMemberInput
from utils import getUser, non_deleted_members, unique_roles_id
from otypes import (
FullMemberInput,
Info,
MemberType,
SimpleMemberInput,
CertificateInput,
CertificateType
)
from queries import memberRolesHelper
from utils import getUser, non_deleted_members, unique_roles_id, generate_key, getClubs

inter_communication_secret_global = getenv("INTER_COMMUNICATION_SECRET")
key_length = int(getenv("KEY_LENGTH", 8))
ist = pytz.timezone("Asia/Kolkata")


Expand Down Expand Up @@ -518,6 +527,192 @@ def updateMembersCid(
return upd_ref.modified_count


@strawberry.mutation
def requestCertificate(
certificate_input: CertificateInput, info: Info
) -> CertificateType:
user = info.context.user
if user is None:
raise Exception("Not Authenticated")

last_certificate = certificatesdb.find_one(
{"user_id": user["uid"]},
sort=[("requested_at", -1)]
)

if last_certificate:
last_applied_date = last_certificate["status"]["requested_at"]
last_applied_date = datetime.fromisoformat(last_applied_date)
if last_applied_date:
time_diff = datetime.now(pytz.timezone("Asia/Kolkata")) - last_applied_date
if time_diff < timedelta(days=15):
raise Exception("You can only request a certificate once every 15 days.")

# Generate certificate number
year = datetime.now().year
next_year = year + 1
year_code = f"{str(year)[2:]}{str(next_year)[2:]}"

count = (
certificatesdb.count_documents(
{"certificate_number": {"$regex": f"^SLC/{year_code}/"}}
)
+ 1
)
certificate_number = f"SLC/{year_code}/{count:04d}"

user_memberships = memberRolesHelper(user["uid"], info)
if not user_memberships:
raise Exception("No Memberships Found")

clubs = getClubs(info.context.cookies)
club_name_map = {club["cid"]: club["name"] for club in clubs}
memberships = []
for m in user_memberships:
roles = m.roles
cid = m.cid
for role in roles:
memberships.append(
{
"startYear": role.start_year,
"endYear": role.end_year,
"name": role.name,
"cid": cid,
"clubName": club_name_map.get(cid, "Unknown Club"),
}
)

certificate_data = {
"user_id": user["uid"],
"memberships": memberships,
}

new_certificate = Certificate(
user_id=user["uid"],
certificate_number=certificate_number,
certificate_data=json.dumps(certificate_data),
request_reason=certificate_input.request_reason,
key=generate_key(key_length),
)

created_id = certificatesdb.insert_one(
jsonable_encoder(new_certificate)
).inserted_id

created_certificate = Certificate.parse_obj(
certificatesdb.find_one({"_id": created_id})
)

return CertificateType.from_pydantic(created_certificate)


@strawberry.mutation
def approveCertificate(certificate_number: str, info: Info) -> CertificateType:
noaccess_error = Exception(
"Can not access certificate. Either it does not exist or user does not have perms." # noqa: E501
)
user = info.context.user

if user is None or user["role"] not in ["cc", "slo"]:
raise Exception("Not Authenticated or Unauthorized")

certificate = certificatesdb.find_one(
{"certificate_number": certificate_number}
)

if not certificate:
raise noaccess_error

current_status = certificate["state"]
updation = {}

current_time = datetime.now(pytz.timezone("Asia/Kolkata")).isoformat() # Convert to ISO format string

if (
current_status == CertificateStates.pending_cc.value
and user["role"] == "cc"
):
new_status = CertificateStates.pending_slo.value
updation = {
"$set": {
"state": new_status,
"status.cc_approved_at": current_time,
"status.cc_approver": user["uid"],
}
}
elif (
current_status == CertificateStates.pending_slo.value
and user["role"] == "slo"
):
new_status = CertificateStates.approved.value
updation = {
"$set": {
"state": new_status,
"status.slo_approved_at": current_time,
"status.slo_approver": user["uid"],
}
}
else:
raise noaccess_error

certificatesdb.update_one(
{"certificate_number": certificate_number}, updation
)

updated_certificate = Certificate.parse_obj(
certificatesdb.find_one({"certificate_number": certificate_number})
)
return CertificateType.from_pydantic(updated_certificate)


@strawberry.mutation
def rejectCertificate(certificate_number: str, info: Info) -> CertificateType:
user = info.context.user
if user is None or user["role"] not in ["cc", "slo"]:
raise Exception("Not Authenticated or Unauthorized")

certificate = certificatesdb.find_one(
{"certificate_number": certificate_number}
)

if not certificate:
raise Exception("No such certificate found")

current_status = certificate["state"]
updation = {}

if (
current_status == CertificateStates.pending_cc.value
and user["role"] == "cc"
):
new_status = CertificateStates.rejected.value
updation = {
"$set": {
"state": new_status,
}
}
elif (
current_status == CertificateStates.pending_slo.value
and user["role"] == "slo"
):
new_status = CertificateStates.rejected.value
updation = {
"$set": {
"state": new_status,
}
}
else:
raise Exception("Can not reject certificate")

certificatesdb.update_one(
{"certificate_number": certificate_number}, updation
)

updated_certificate = Certificate.parse_obj(
certificatesdb.find_one({"certificate_number": certificate_number})
)
return CertificateType.from_pydantic(updated_certificate)

# register all mutations
mutations = [
createMember,
Expand All @@ -526,4 +721,7 @@ def updateMembersCid(
approveMember,
rejectMember,
updateMembersCid,
requestCertificate,
approveCertificate,
rejectCertificate,
]
24 changes: 23 additions & 1 deletion otypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from strawberry.types import Info as _Info
from strawberry.types.info import RootValueType

from models import Member, PyObjectId, Roles
from models import Member, PyObjectId, Roles, Certificate


# custom context class
Expand Down Expand Up @@ -99,6 +99,9 @@ class FullMemberInput:

poc: Optional[bool] = strawberry.UNSET

@strawberry.input
class CertificateInput:
request_reason: Optional[str] = None

@strawberry.input
class SimpleMemberInput:
Expand Down Expand Up @@ -135,3 +138,22 @@ class MemberCSVResponse:
csvFile: str
successMessage: str
errorMessage: str

@strawberry.experimental.pydantic.type(model=Certificate, all_fields=True)
class CertificateType:
pass

@strawberry.type
class MembershipType:
startYear: int
endYear: Optional[int]
name: str
cid: str


@strawberry.input
class MembershipInput:
startYear: int
endYear: Optional[int]
name: str
cid: str
Loading