"""The endpoints for restriction objects.
SPDX-License-Identifier: AGPL-3.0-only OR BSD-3-Clause-Clear
"""
from __future__ import annotations
import os
import typing as t
import cg_request_args as rqa
from cg_maybe import Maybe, Nothing
from cg_maybe.utils import maybe_from_nullable
from .. import paginated, parsers, utils
if t.TYPE_CHECKING or os.getenv("CG_EAGERIMPORT", False):
from .. import client
from ..models.create_entry_ticket_restriction_data import (
CreateEntryTicketRestrictionData,
)
from ..models.enter_restriction_data import EnterRestrictionData
from ..models.entry_overview_entry import EntryOverviewEntry
from ..models.entry_ticket_response import EntryTicketResponse
from ..models.heartbeat_response import HeartbeatResponse
from ..models.ip_range import IPRange
from ..models.patch_entry_override_restriction_data import (
PatchEntryOverrideRestrictionData,
)
from ..models.patch_restriction_data import PatchRestrictionData
from ..models.restriction import Restriction
from ..models.user_login_response import UserLoginResponse
_ClientT = t.TypeVar("_ClientT", bound="client._BaseClient")
[docs]
class RestrictionService(t.Generic[_ClientT]):
__slots__ = ("__client",)
def __init__(self, client: _ClientT) -> None:
self.__client = client
[docs]
def create_entry_ticket(
self: RestrictionService[client.AuthenticatedClient],
json_body: CreateEntryTicketRestrictionData,
*,
restriction_id: str,
) -> EntryTicketResponse:
"""Create a short-lived entry ticket.
Authenticates the caller via bearer token, validates the password (when
set), and pre-checks the entry limit so a dead ticket isn't issued.
Writes an unused ticket to Redis and returns the ticket string.
Rejects when the calling token is itself an exam token for this course,
since activating another ticket would kill the active session.
:param json_body: The body of the request. See
:class:`.CreateEntryTicketRestrictionData` for information about
the possible fields. You can provide this data as a
:class:`.CreateEntryTicketRestrictionData` or as a dictionary.
:param restriction_id: The id of the restriction.
:returns: A tagged response (EntryTicketResponse) carrying the ticket.
"""
url = "/api/v1/restrictions/{restrictionId}/entry_tickets/".format(
restrictionId=restriction_id
)
params = None
with self.__client as client:
resp = client.http.put(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.entry_ticket_response import EntryTicketResponse
return parsers.JsonResponseParser(
parsers.ParserFor.make(EntryTicketResponse)
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
[docs]
def put_allowed_ip(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
ip_range: str,
) -> IPRange:
"""Add a single IP range to a restriction.
:param restriction_id: The id of the restriction.
:param ip_range: The IP range to add in CIDR notation.
:returns: The added IP range.
"""
url = "/api/v1/restrictions/{restrictionId}/allowed_ips/{ipRange}".format(
restrictionId=restriction_id, ipRange=ip_range
)
params = None
with self.__client as client:
resp = client.http.put(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.ip_range import IPRange
return parsers.JsonResponseParser(
parsers.ParserFor.make(IPRange)
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
[docs]
def delete_allowed_ip(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
ip_range: str,
) -> None:
"""Remove a single IP range from a restriction.
:param restriction_id: The ID of the restriction.
:param ip_range: The IP range to remove.
:returns: An empty response.
"""
url = "/api/v1/restrictions/{restrictionId}/allowed_ips/{ipRange}".format(
restrictionId=restriction_id, ipRange=ip_range
)
params = None
with self.__client as client:
resp = client.http.delete(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 204):
return parsers.ConstantlyParser(None).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
[docs]
def get_all_entries(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
q: str = "",
page_size: int = 50,
) -> paginated.Response[EntryOverviewEntry]:
"""List all enrolled students with their entry counts and overrides.
:param restriction_id: The ID of the restriction.
:param q: Only retrieve students whose name or username matches this
value.
:param page_size: The size of a single page, maximum is 100.
:returns: A paginated list of entry overview entries per student.
"""
url = "/api/v1/restrictions/{restrictionId}/entries/".format(
restrictionId=restriction_id
)
params: t.Dict[str, str | int | bool] = {
"q": q,
"page-size": page_size,
}
if t.TYPE_CHECKING:
import httpx
def do_request(next_token: str | None) -> httpx.Response:
if next_token is None:
params.pop("next-token", "")
else:
params["next-token"] = next_token
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
return resp
def parse_response(
resp: httpx.Response,
) -> t.Sequence[EntryOverviewEntry]:
if utils.response_code_matches(resp.status_code, 200):
from ..models.entry_overview_entry import EntryOverviewEntry
return parsers.JsonResponseParser(
rqa.List(parsers.ParserFor.make(EntryOverviewEntry))
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
return paginated.Response(do_request, parse_response)
[docs]
def enter(
self,
json_body: EnterRestrictionData,
*,
restriction_id: str,
) -> UserLoginResponse:
"""Enter a restricted course, recording an entry event.
Two paths, dispatched on which field the body carries:
1. `{password}`: legacy entry. Authenticates with the caller's
bearer token and reuses it as the session token. 2.
`{entry_ticket}`: activates a previously created entry ticket.
The ticket is the credential, the `Authorization` header is
ignored, and a new session token is created while every
previously issued token for the user is invalidated.
:param json_body: The body of the request. See
:class:`.EnterRestrictionData` for information about the possible
fields. You can provide this data as a
:class:`.EnterRestrictionData` or as a dictionary.
:param restriction_id: The ID of the restriction.
:returns: A login response containing the (updated) access token.
"""
url = "/api/v1/restrictions/{restrictionId}/entries/".format(
restrictionId=restriction_id
)
params = None
with self.__client as client:
resp = client.http.put(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.user_login_response import UserLoginResponse
return parsers.JsonResponseParser(
parsers.ParserFor.make(UserLoginResponse)
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
[docs]
def get_all_allowed_ips(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
page_size: int = 50,
) -> paginated.Response[IPRange]:
"""Get all IP ranges for a restriction.
:param restriction_id: The ID of the restriction.
:param page_size: The size of a single page, maximum is 100.
:returns: A paginated list of IP ranges.
"""
url = "/api/v1/restrictions/{restrictionId}/allowed_ips/".format(
restrictionId=restriction_id
)
params: t.Dict[str, str | int | bool] = {
"page-size": page_size,
}
if t.TYPE_CHECKING:
import httpx
def do_request(next_token: str | None) -> httpx.Response:
if next_token is None:
params.pop("next-token", "")
else:
params["next-token"] = next_token
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
return resp
def parse_response(resp: httpx.Response) -> t.Sequence[IPRange]:
if utils.response_code_matches(resp.status_code, 200):
from ..models.ip_range import IPRange
return parsers.JsonResponseParser(
rqa.List(parsers.ParserFor.make(IPRange))
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
return paginated.Response(do_request, parse_response)
[docs]
def get(
self: RestrictionService[client.AuthenticatedClient],
*,
restriction_id: str,
) -> Restriction:
"""Get the full settings for a restriction (admin only).
:param restriction_id: The id of the restriction.
:returns: The restriction object including the password.
"""
url = "/api/v1/restrictions/{restrictionId}".format(
restrictionId=restriction_id
)
params = None
with self.__client as client:
resp = client.http.get(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.restriction import Restriction
return parsers.JsonResponseParser(
parsers.ParserFor.make(Restriction)
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
[docs]
def patch(
self: RestrictionService[client.AuthenticatedClient],
json_body: PatchRestrictionData,
*,
restriction_id: str,
) -> Restriction:
"""Update restriction settings.
The request body mirrors the restriction JSON shape. All fields are
optional; omitted fields are left unchanged.
:param json_body: The body of the request. See
:class:`.PatchRestrictionData` for information about the possible
fields. You can provide this data as a
:class:`.PatchRestrictionData` or as a dictionary.
:param restriction_id: The id of the restriction.
:returns: The updated restriction.
"""
url = "/api/v1/restrictions/{restrictionId}".format(
restrictionId=restriction_id
)
params = None
with self.__client as client:
resp = client.http.patch(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.restriction import Restriction
return parsers.JsonResponseParser(
parsers.ParserFor.make(Restriction)
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
[docs]
def patch_entry_override(
self: RestrictionService[client.AuthenticatedClient],
json_body: PatchEntryOverrideRestrictionData,
*,
restriction_id: str,
user_id: int,
) -> EntryOverviewEntry:
"""Set or remove a per-student entry override for a restriction.
Sending `null` removes the override row entirely (the student falls
back to the global `session_lockdown` limit). Sending a positive
integer creates or updates the override for this student.
:param json_body: The body of the request. See
:class:`.PatchEntryOverrideRestrictionData` for information about
the possible fields. You can provide this data as a
:class:`.PatchEntryOverrideRestrictionData` or as a dictionary.
:param restriction_id: The ID of the restriction.
:param user_id: The ID of the student.
:returns: The updated entry overview for this student.
"""
url = "/api/v1/restrictions/{restrictionId}/entries/{userId}".format(
restrictionId=restriction_id, userId=user_id
)
params = None
with self.__client as client:
resp = client.http.patch(
url=url, json=utils.to_dict(json_body), params=params
)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.entry_overview_entry import EntryOverviewEntry
return parsers.JsonResponseParser(
parsers.ParserFor.make(EntryOverviewEntry)
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)
[docs]
def heartbeat(
self: RestrictionService[client.AuthenticatedClient],
) -> HeartbeatResponse:
"""Send a heartbeat to keep an isolated exam session alive.
When heartbeat enforcement is active, the session's `expires_at` is set
to a short window (5 minutes). This endpoint extends it forward, capped
by `max_expires_at`.
If the session is already expired, the auth flow rejects the request
before it reaches this endpoint (401).
:returns: The session's `expires_at` after the refresh.
"""
url = "/api/v1/restrictions/heartbeat"
params = None
with self.__client as client:
resp = client.http.post(url=url, params=params)
utils.log_warnings(resp)
if utils.response_code_matches(resp.status_code, 200):
from ..models.heartbeat_response import HeartbeatResponse
return parsers.JsonResponseParser(
parsers.ParserFor.make(HeartbeatResponse)
).try_parse(resp)
from ..models.any_error import AnyErrorParser
raise utils.get_error(
resp, (((400, 409, 401, 403, 404, 429, 500), AnyErrorParser),)
)