Source code for nc_py_api._session

"""Session represents one connection to Nextcloud. All related stuff for these live here."""

import builtins
import pathlib
import re
import typing
from abc import ABC, abstractmethod
from base64 import b64encode
from dataclasses import dataclass
from enum import IntEnum
from json import loads
from os import environ

from httpx import AsyncClient, Client, Headers, Limits, ReadTimeout, Request, Response
from httpx import __version__ as httpx_version
from starlette.requests import HTTPConnection

from . import options
from ._exceptions import (
    NextcloudException,
    NextcloudExceptionNotFound,
    NextcloudExceptionNotModified,
    check_error,
)
from ._misc import get_username_secret_from_headers


class OCSRespond(IntEnum):
    """Special Nextcloud respond statuses for OCS calls."""

    RESPOND_SERVER_ERROR = 996
    RESPOND_UNAUTHORISED = 997
    RESPOND_NOT_FOUND = 998
    RESPOND_UNKNOWN_ERROR = 999


[docs] class ServerVersion(typing.TypedDict): """Nextcloud version information.""" major: int """Major version""" minor: int """Minor version""" micro: int """Micro version""" string: str """Full version in string format""" extended_support: bool """Indicates if the subscription has extended support"""
@dataclass class RuntimeOptions: xdebug_session: str timeout: int | None timeout_dav: int | None _nc_cert: str | bool upload_chunk_v2: bool def __init__(self, **kwargs): self.xdebug_session = kwargs.get("xdebug_session", options.XDEBUG_SESSION) self.timeout = kwargs.get("npa_timeout", options.NPA_TIMEOUT) self.timeout_dav = kwargs.get("npa_timeout_dav", options.NPA_TIMEOUT_DAV) self._nc_cert = kwargs.get("npa_nc_cert", options.NPA_NC_CERT) self.upload_chunk_v2 = kwargs.get("chunked_upload_v2", options.CHUNKED_UPLOAD_V2) @property def nc_cert(self) -> str | bool: return self._nc_cert @dataclass class BasicConfig: endpoint: str dav_endpoint: str dav_url_suffix: str options: RuntimeOptions def __init__(self, **kwargs): full_nc_url = self._get_config_value("nextcloud_url", **kwargs) self.endpoint = full_nc_url.removesuffix("/index.php").removesuffix("/") self.dav_url_suffix = self._get_config_value("dav_url_suffix", raise_not_found=False, **kwargs) if not self.dav_url_suffix: self.dav_url_suffix = "remote.php/dav" self.dav_url_suffix = "/" + self.dav_url_suffix.strip("/") self.dav_endpoint = self.endpoint + self.dav_url_suffix self.options = RuntimeOptions(**kwargs) @staticmethod def _get_config_value(value_name: str, raise_not_found=True, **kwargs): if value_name in kwargs: return kwargs[value_name] value_name_upper = value_name.upper() if value_name_upper in environ: return environ[value_name_upper] if raise_not_found: raise ValueError(f"`{value_name}` is not found.") return None @dataclass class Config(BasicConfig): auth: tuple[str, str] = ("", "") def __init__(self, **kwargs): super().__init__(**kwargs) nc_auth_user = self._get_config_value("nc_auth_user", raise_not_found=False, **kwargs) nc_auth_pass = self._get_config_value("nc_auth_pass", raise_not_found=False, **kwargs) if nc_auth_user and nc_auth_pass: self.auth = (nc_auth_user, nc_auth_pass)
[docs] @dataclass class AppConfig(BasicConfig): """Application configuration.""" aa_version: str """AppAPI version""" app_name: str """Application ID""" app_version: str """Application version""" app_secret: str """Application authentication secret""" def __init__(self, **kwargs): super().__init__(**kwargs) self.aa_version = self._get_config_value("aa_version", raise_not_found=False, **kwargs) if not self.aa_version: self.aa_version = "2.2.0" self.app_name = self._get_config_value("app_id", **kwargs) self.app_version = self._get_config_value("app_version", **kwargs) self.app_secret = self._get_config_value("app_secret", **kwargs)
class NcSessionBase(ABC): adapter: AsyncClient | Client adapter_dav: AsyncClient | Client cfg: BasicConfig custom_headers: dict response_headers: Headers _user: str _capabilities: dict @abstractmethod def __init__(self, **kwargs): self._capabilities = {} self._user = kwargs.get("user", "") self.custom_headers = kwargs.get("headers", {}) self.limits = Limits(max_keepalive_connections=20, max_connections=20, keepalive_expiry=60.0) self.init_adapter() self.init_adapter_dav() self.response_headers = Headers() self._ocs_regexp = re.compile(r"/ocs/v[12]\.php/|/apps/groupfolders/") def init_adapter(self, restart=False) -> None: if getattr(self, "adapter", None) is None or restart: self.adapter = self._create_adapter() self.adapter.headers.update({"OCS-APIRequest": "true"}) if self.custom_headers: self.adapter.headers.update(self.custom_headers) if options.XDEBUG_SESSION: self.adapter.cookies.set("XDEBUG_SESSION", options.XDEBUG_SESSION) self._capabilities = {} def init_adapter_dav(self, restart=False) -> None: if getattr(self, "adapter_dav", None) is None or restart: self.adapter_dav = self._create_adapter(dav=True) if self.custom_headers: self.adapter_dav.headers.update(self.custom_headers) if options.XDEBUG_SESSION: self.adapter_dav.cookies.set("XDEBUG_SESSION", options.XDEBUG_SESSION) @abstractmethod def _create_adapter(self, dav: bool = False) -> AsyncClient | Client: pass # pragma: no cover @property def ae_url(self) -> str: """Return base url for the AppAPI endpoints.""" return "/ocs/v1.php/apps/app_api/api/v1" @property def ae_url_v2(self) -> str: """Return base url for the AppAPI endpoints(version 2).""" return "/ocs/v1.php/apps/app_api/api/v2"
[docs] class NcSessionBasic(NcSessionBase, ABC): adapter: Client adapter_dav: Client def ocs( self, method: str, path: str, *, content: bytes | str | typing.Iterable[bytes] | typing.AsyncIterable[bytes] | None = None, json: dict | list | None = None, params: dict | None = None, files: dict | None = None, **kwargs, ): self.init_adapter() info = f"request: {method} {path}" nested_req = kwargs.pop("nested_req", False) try: response = self.adapter.request( method, path, content=content, json=json, params=params, files=files, **kwargs ) except ReadTimeout: raise NextcloudException(408, info=info) from None check_error(response, info) if response.status_code == 204: # NO_CONTENT return [] response_data = loads(response.text) ocs_meta = response_data["ocs"]["meta"] if ocs_meta["status"] != "ok": if ( not nested_req and ocs_meta["statuscode"] == 403 and str(ocs_meta["message"]).lower().find("password confirmation is required") != -1 ): self.adapter.close() self.init_adapter(restart=True) return self.ocs(method, path, **kwargs, content=content, json=json, params=params, nested_req=True) if ocs_meta["statuscode"] in (404, OCSRespond.RESPOND_NOT_FOUND): raise NextcloudExceptionNotFound(reason=ocs_meta["message"], info=info) if ocs_meta["statuscode"] == 304: raise NextcloudExceptionNotModified(reason=ocs_meta["message"], info=info) raise NextcloudException(status_code=ocs_meta["statuscode"], reason=ocs_meta["message"], info=info) return response_data["ocs"]["data"] def update_server_info(self) -> None: self._capabilities = self.ocs("GET", "/ocs/v1.php/cloud/capabilities") @property def capabilities(self) -> dict: if not self._capabilities: self.update_server_info() return self._capabilities["capabilities"] @property def nc_version(self) -> ServerVersion: if not self._capabilities: self.update_server_info() v = self._capabilities["version"] return ServerVersion( major=v["major"], minor=v["minor"], micro=v["micro"], string=v["string"], extended_support=v["extendedSupport"], ) @property def user(self) -> str: """Current user ID. Can be different from the login name.""" if isinstance(self, NcSession) and not self._user: # do not trigger for NextcloudApp self._user = self.ocs("GET", "/ocs/v1.php/cloud/user")["id"] return self._user def set_user(self, user_id: str) -> None: self._user = user_id def download2stream(self, url_path: str, fp, dav: bool = False, **kwargs): if isinstance(fp, str | pathlib.Path): with builtins.open(fp, "wb") as f: self.download2fp(url_path, f, dav, **kwargs) elif hasattr(fp, "write"): self.download2fp(url_path, fp, dav, **kwargs) else: raise TypeError("`fp` must be a path to file or an object with `write` method.") def _get_adapter_kwargs(self, dav: bool) -> dict[str, typing.Any]: if dav: return { "base_url": self.cfg.dav_endpoint, "timeout": self.cfg.options.timeout_dav, "event_hooks": {"request": [], "response": [self._response_event]}, } return { "base_url": self.cfg.endpoint, "timeout": self.cfg.options.timeout, "event_hooks": {"request": [self._request_event_ocs], "response": [self._response_event]}, } def _request_event_ocs(self, request: Request) -> None: str_url = str(request.url) if re.search(self._ocs_regexp, str_url) is not None: # this is OCS call request.url = request.url.copy_merge_params({"format": "json"}) request.headers["Accept"] = "application/json" def _response_event(self, response: Response) -> None: str_url = str(response.request.url) # we do not want ResponseHeaders for those two endpoints, as call to them can occur during DAV calls. for i in ("/ocs/v1.php/cloud/capabilities?format=json", "/ocs/v1.php/cloud/user?format=json"): if str_url.endswith(i): return self.response_headers = response.headers def download2fp(self, url_path: str, fp, dav: bool, params=None, **kwargs): adapter = self.adapter_dav if dav else self.adapter with adapter.stream("GET", url_path, params=params, headers=kwargs.get("headers")) as response: check_error(response) for data_chunk in response.iter_raw(chunk_size=kwargs.get("chunk_size", 5 * 1024 * 1024)): fp.write(data_chunk)
class AsyncNcSessionBasic(NcSessionBase, ABC): adapter: AsyncClient adapter_dav: AsyncClient async def ocs( self, method: str, path: str, *, content: bytes | str | typing.Iterable[bytes] | typing.AsyncIterable[bytes] | None = None, json: dict | list | None = None, params: dict | None = None, files: dict | None = None, **kwargs, ): self.init_adapter() info = f"request: {method} {path}" nested_req = kwargs.pop("nested_req", False) try: response = await self.adapter.request( method, path, content=content, json=json, params=params, files=files, **kwargs ) except ReadTimeout: raise NextcloudException(408, info=info) from None check_error(response, info) if response.status_code == 204: # NO_CONTENT return [] response_data = loads(response.text) ocs_meta = response_data["ocs"]["meta"] if ocs_meta["status"] != "ok": if ( not nested_req and ocs_meta["statuscode"] == 403 and str(ocs_meta["message"]).lower().find("password confirmation is required") != -1 ): await self.adapter.aclose() self.init_adapter(restart=True) return await self.ocs( method, path, **kwargs, content=content, json=json, params=params, nested_req=True ) if ocs_meta["statuscode"] in (404, OCSRespond.RESPOND_NOT_FOUND): raise NextcloudExceptionNotFound(reason=ocs_meta["message"], info=info) if ocs_meta["statuscode"] == 304: raise NextcloudExceptionNotModified(reason=ocs_meta["message"], info=info) raise NextcloudException(status_code=ocs_meta["statuscode"], reason=ocs_meta["message"], info=info) return response_data["ocs"]["data"] async def update_server_info(self) -> None: self._capabilities = await self.ocs("GET", "/ocs/v1.php/cloud/capabilities") @property async def capabilities(self) -> dict: if not self._capabilities: await self.update_server_info() return self._capabilities["capabilities"] @property async def nc_version(self) -> ServerVersion: if not self._capabilities: await self.update_server_info() v = self._capabilities["version"] return ServerVersion( major=v["major"], minor=v["minor"], micro=v["micro"], string=v["string"], extended_support=v["extendedSupport"], ) @property async def user(self) -> str: """Current user ID. Can be different from the login name.""" if isinstance(self, AsyncNcSession) and not self._user: # do not trigger for NextcloudApp self._user = (await self.ocs("GET", "/ocs/v1.php/cloud/user"))["id"] return self._user def set_user(self, user: str) -> None: self._user = user async def download2stream(self, url_path: str, fp, dav: bool = False, **kwargs): if isinstance(fp, str | pathlib.Path): with builtins.open(fp, "wb") as f: await self.download2fp(url_path, f, dav, **kwargs) elif hasattr(fp, "write"): await self.download2fp(url_path, fp, dav, **kwargs) else: raise TypeError("`fp` must be a path to file or an object with `write` method.") def _get_adapter_kwargs(self, dav: bool) -> dict[str, typing.Any]: if dav: return { "base_url": self.cfg.dav_endpoint, "timeout": self.cfg.options.timeout_dav, "event_hooks": {"request": [], "response": [self._response_event]}, } return { "base_url": self.cfg.endpoint, "timeout": self.cfg.options.timeout, "event_hooks": {"request": [self._request_event_ocs], "response": [self._response_event]}, } async def _request_event_ocs(self, request: Request) -> None: str_url = str(request.url) if re.search(self._ocs_regexp, str_url) is not None: # this is OCS call request.url = request.url.copy_merge_params({"format": "json"}) request.headers["Accept"] = "application/json" async def _response_event(self, response: Response) -> None: str_url = str(response.request.url) # we do not want ResponseHeaders for those two endpoints, as call to them can occur during DAV calls. for i in ("/ocs/v1.php/cloud/capabilities?format=json", "/ocs/v1.php/cloud/user?format=json"): if str_url.endswith(i): return self.response_headers = response.headers async def download2fp(self, url_path: str, fp, dav: bool, params=None, **kwargs): adapter = self.adapter_dav if dav else self.adapter async with adapter.stream("GET", url_path, params=params, headers=kwargs.get("headers")) as response: check_error(response) async for data_chunk in response.aiter_raw(chunk_size=kwargs.get("chunk_size", 5 * 1024 * 1024)): fp.write(data_chunk)
[docs] class NcSession(NcSessionBasic): cfg: Config def __init__(self, **kwargs): self.cfg = Config(**kwargs) super().__init__() def _create_adapter(self, dav: bool = False) -> AsyncClient | Client: return Client( follow_redirects=True, limits=self.limits, verify=self.cfg.options.nc_cert, **self._get_adapter_kwargs(dav), auth=self.cfg.auth, )
class AsyncNcSession(AsyncNcSessionBasic): cfg: Config def __init__(self, **kwargs): self.cfg = Config(**kwargs) super().__init__() def _create_adapter(self, dav: bool = False) -> AsyncClient | Client: return AsyncClient( follow_redirects=True, limits=self.limits, verify=self.cfg.options.nc_cert, **self._get_adapter_kwargs(dav), auth=self.cfg.auth, ) class NcSessionAppBasic(ABC): cfg: AppConfig _user: str adapter: AsyncClient | Client adapter_dav: AsyncClient | Client def __init__(self, **kwargs): self.cfg = AppConfig(**kwargs) super().__init__(**kwargs) def sign_check(self, request: HTTPConnection) -> str: headers = { "AA-VERSION": request.headers.get("AA-VERSION", ""), "EX-APP-ID": request.headers.get("EX-APP-ID", ""), "EX-APP-VERSION": request.headers.get("EX-APP-VERSION", ""), "AUTHORIZATION-APP-API": request.headers.get("AUTHORIZATION-APP-API", ""), } empty_headers = [k for k, v in headers.items() if not v] if empty_headers: raise ValueError(f"Missing required headers:{empty_headers}") if headers["EX-APP-ID"] != self.cfg.app_name: raise ValueError(f"Invalid EX-APP-ID:{headers['EX-APP-ID']} != {self.cfg.app_name}") username, app_secret = get_username_secret_from_headers(headers) if app_secret != self.cfg.app_secret: raise ValueError(f"Invalid App secret:{app_secret} != {self.cfg.app_secret}") return username
[docs] class NcSessionApp(NcSessionAppBasic, NcSessionBasic): cfg: AppConfig def _create_adapter(self, dav: bool = False) -> AsyncClient | Client: r = self._get_adapter_kwargs(dav) r["event_hooks"]["request"].append(self._add_auth) return Client( follow_redirects=True, limits=self.limits, verify=self.cfg.options.nc_cert, **r, headers={ "AA-VERSION": self.cfg.aa_version, "EX-APP-ID": self.cfg.app_name, "EX-APP-VERSION": self.cfg.app_version, "user-agent": f"ExApp/{self.cfg.app_name}/{self.cfg.app_version} (httpx/{httpx_version})", }, ) def _add_auth(self, request: Request): request.headers.update( {"AUTHORIZATION-APP-API": b64encode(f"{self._user}:{self.cfg.app_secret}".encode("UTF=8"))} )
class AsyncNcSessionApp(NcSessionAppBasic, AsyncNcSessionBasic): cfg: AppConfig def _create_adapter(self, dav: bool = False) -> AsyncClient | Client: r = self._get_adapter_kwargs(dav) r["event_hooks"]["request"].append(self._add_auth) return AsyncClient( follow_redirects=True, limits=self.limits, verify=self.cfg.options.nc_cert, **r, headers={ "AA-VERSION": self.cfg.aa_version, "EX-APP-ID": self.cfg.app_name, "EX-APP-VERSION": self.cfg.app_version, "User-Agent": f"ExApp/{self.cfg.app_name}/{self.cfg.app_version} (httpx/{httpx_version})", }, ) async def _add_auth(self, request: Request): request.headers.update( {"AUTHORIZATION-APP-API": b64encode(f"{self._user}:{self.cfg.app_secret}".encode("UTF=8"))} )