| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- import binascii
- from base64 import b64decode
- from typing import Optional
- from fastapi.exceptions import HTTPException
- from fastapi.openapi.models import HTTPBase as HTTPBaseModel
- from fastapi.openapi.models import HTTPBearer as HTTPBearerModel
- from fastapi.security.base import SecurityBase
- from fastapi.security.utils import get_authorization_scheme_param
- from pydantic import BaseModel
- from starlette.requests import Request
- from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN
- from typing_extensions import Annotated, Doc
- class HTTPBasicCredentials(BaseModel):
- """
- The HTTP Basic credentials given as the result of using `HTTPBasic` in a
- dependency.
- Read more about it in the
- [FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/).
- """
- username: Annotated[str, Doc("The HTTP Basic username.")]
- password: Annotated[str, Doc("The HTTP Basic password.")]
- class HTTPAuthorizationCredentials(BaseModel):
- """
- The HTTP authorization credentials in the result of using `HTTPBearer` or
- `HTTPDigest` in a dependency.
- The HTTP authorization header value is split by the first space.
- The first part is the `scheme`, the second part is the `credentials`.
- For example, in an HTTP Bearer token scheme, the client will send a header
- like:
- ```
- Authorization: Bearer deadbeef12346
- ```
- In this case:
- * `scheme` will have the value `"Bearer"`
- * `credentials` will have the value `"deadbeef12346"`
- """
- scheme: Annotated[
- str,
- Doc(
- """
- The HTTP authorization scheme extracted from the header value.
- """
- ),
- ]
- credentials: Annotated[
- str,
- Doc(
- """
- The HTTP authorization credentials extracted from the header value.
- """
- ),
- ]
- class HTTPBase(SecurityBase):
- def __init__(
- self,
- *,
- scheme: str,
- scheme_name: Optional[str] = None,
- description: Optional[str] = None,
- auto_error: bool = True,
- ):
- self.model = HTTPBaseModel(scheme=scheme, description=description)
- self.scheme_name = scheme_name or self.__class__.__name__
- self.auto_error = auto_error
- async def __call__(
- self, request: Request
- ) -> Optional[HTTPAuthorizationCredentials]:
- authorization = request.headers.get("Authorization")
- scheme, credentials = get_authorization_scheme_param(authorization)
- if not (authorization and scheme and credentials):
- if self.auto_error:
- raise HTTPException(
- status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
- )
- else:
- return None
- return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
- class HTTPBasic(HTTPBase):
- """
- HTTP Basic authentication.
- ## Usage
- Create an instance object and use that object as the dependency in `Depends()`.
- The dependency result will be an `HTTPBasicCredentials` object containing the
- `username` and the `password`.
- Read more about it in the
- [FastAPI docs for HTTP Basic Auth](https://fastapi.tiangolo.com/advanced/security/http-basic-auth/).
- ## Example
- ```python
- from typing import Annotated
- from fastapi import Depends, FastAPI
- from fastapi.security import HTTPBasic, HTTPBasicCredentials
- app = FastAPI()
- security = HTTPBasic()
- @app.get("/users/me")
- def read_current_user(credentials: Annotated[HTTPBasicCredentials, Depends(security)]):
- return {"username": credentials.username, "password": credentials.password}
- ```
- """
- def __init__(
- self,
- *,
- scheme_name: Annotated[
- Optional[str],
- Doc(
- """
- Security scheme name.
- It will be included in the generated OpenAPI (e.g. visible at `/docs`).
- """
- ),
- ] = None,
- realm: Annotated[
- Optional[str],
- Doc(
- """
- HTTP Basic authentication realm.
- """
- ),
- ] = None,
- description: Annotated[
- Optional[str],
- Doc(
- """
- Security scheme description.
- It will be included in the generated OpenAPI (e.g. visible at `/docs`).
- """
- ),
- ] = None,
- auto_error: Annotated[
- bool,
- Doc(
- """
- By default, if the HTTP Basic authentication is not provided (a
- header), `HTTPBasic` will automatically cancel the request and send the
- client an error.
- If `auto_error` is set to `False`, when the HTTP Basic authentication
- is not available, instead of erroring out, the dependency result will
- be `None`.
- This is useful when you want to have optional authentication.
- It is also useful when you want to have authentication that can be
- provided in one of multiple optional ways (for example, in HTTP Basic
- authentication or in an HTTP Bearer token).
- """
- ),
- ] = True,
- ):
- self.model = HTTPBaseModel(scheme="basic", description=description)
- self.scheme_name = scheme_name or self.__class__.__name__
- self.realm = realm
- self.auto_error = auto_error
- async def __call__( # type: ignore
- self, request: Request
- ) -> Optional[HTTPBasicCredentials]:
- authorization = request.headers.get("Authorization")
- scheme, param = get_authorization_scheme_param(authorization)
- if self.realm:
- unauthorized_headers = {"WWW-Authenticate": f'Basic realm="{self.realm}"'}
- else:
- unauthorized_headers = {"WWW-Authenticate": "Basic"}
- if not authorization or scheme.lower() != "basic":
- if self.auto_error:
- raise HTTPException(
- status_code=HTTP_401_UNAUTHORIZED,
- detail="Not authenticated",
- headers=unauthorized_headers,
- )
- else:
- return None
- invalid_user_credentials_exc = HTTPException(
- status_code=HTTP_401_UNAUTHORIZED,
- detail="Invalid authentication credentials",
- headers=unauthorized_headers,
- )
- try:
- data = b64decode(param).decode("ascii")
- except (ValueError, UnicodeDecodeError, binascii.Error):
- raise invalid_user_credentials_exc # noqa: B904
- username, separator, password = data.partition(":")
- if not separator:
- raise invalid_user_credentials_exc
- return HTTPBasicCredentials(username=username, password=password)
- class HTTPBearer(HTTPBase):
- """
- HTTP Bearer token authentication.
- ## Usage
- Create an instance object and use that object as the dependency in `Depends()`.
- The dependency result will be an `HTTPAuthorizationCredentials` object containing
- the `scheme` and the `credentials`.
- ## Example
- ```python
- from typing import Annotated
- from fastapi import Depends, FastAPI
- from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
- app = FastAPI()
- security = HTTPBearer()
- @app.get("/users/me")
- def read_current_user(
- credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
- ):
- return {"scheme": credentials.scheme, "credentials": credentials.credentials}
- ```
- """
- def __init__(
- self,
- *,
- bearerFormat: Annotated[Optional[str], Doc("Bearer token format.")] = None,
- scheme_name: Annotated[
- Optional[str],
- Doc(
- """
- Security scheme name.
- It will be included in the generated OpenAPI (e.g. visible at `/docs`).
- """
- ),
- ] = None,
- description: Annotated[
- Optional[str],
- Doc(
- """
- Security scheme description.
- It will be included in the generated OpenAPI (e.g. visible at `/docs`).
- """
- ),
- ] = None,
- auto_error: Annotated[
- bool,
- Doc(
- """
- By default, if the HTTP Bearer token is not provided (in an
- `Authorization` header), `HTTPBearer` will automatically cancel the
- request and send the client an error.
- If `auto_error` is set to `False`, when the HTTP Bearer token
- is not available, instead of erroring out, the dependency result will
- be `None`.
- This is useful when you want to have optional authentication.
- It is also useful when you want to have authentication that can be
- provided in one of multiple optional ways (for example, in an HTTP
- Bearer token or in a cookie).
- """
- ),
- ] = True,
- ):
- self.model = HTTPBearerModel(bearerFormat=bearerFormat, description=description)
- self.scheme_name = scheme_name or self.__class__.__name__
- self.auto_error = auto_error
- async def __call__(
- self, request: Request
- ) -> Optional[HTTPAuthorizationCredentials]:
- authorization = request.headers.get("Authorization")
- scheme, credentials = get_authorization_scheme_param(authorization)
- if not (authorization and scheme and credentials):
- if self.auto_error:
- raise HTTPException(
- status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
- )
- else:
- return None
- if scheme.lower() != "bearer":
- if self.auto_error:
- raise HTTPException(
- status_code=HTTP_403_FORBIDDEN,
- detail="Invalid authentication credentials",
- )
- else:
- return None
- return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
- class HTTPDigest(HTTPBase):
- """
- HTTP Digest authentication.
- ## Usage
- Create an instance object and use that object as the dependency in `Depends()`.
- The dependency result will be an `HTTPAuthorizationCredentials` object containing
- the `scheme` and the `credentials`.
- ## Example
- ```python
- from typing import Annotated
- from fastapi import Depends, FastAPI
- from fastapi.security import HTTPAuthorizationCredentials, HTTPDigest
- app = FastAPI()
- security = HTTPDigest()
- @app.get("/users/me")
- def read_current_user(
- credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
- ):
- return {"scheme": credentials.scheme, "credentials": credentials.credentials}
- ```
- """
- def __init__(
- self,
- *,
- scheme_name: Annotated[
- Optional[str],
- Doc(
- """
- Security scheme name.
- It will be included in the generated OpenAPI (e.g. visible at `/docs`).
- """
- ),
- ] = None,
- description: Annotated[
- Optional[str],
- Doc(
- """
- Security scheme description.
- It will be included in the generated OpenAPI (e.g. visible at `/docs`).
- """
- ),
- ] = None,
- auto_error: Annotated[
- bool,
- Doc(
- """
- By default, if the HTTP Digest is not provided, `HTTPDigest` will
- automatically cancel the request and send the client an error.
- If `auto_error` is set to `False`, when the HTTP Digest is not
- available, instead of erroring out, the dependency result will
- be `None`.
- This is useful when you want to have optional authentication.
- It is also useful when you want to have authentication that can be
- provided in one of multiple optional ways (for example, in HTTP
- Digest or in a cookie).
- """
- ),
- ] = True,
- ):
- self.model = HTTPBaseModel(scheme="digest", description=description)
- self.scheme_name = scheme_name or self.__class__.__name__
- self.auto_error = auto_error
- async def __call__(
- self, request: Request
- ) -> Optional[HTTPAuthorizationCredentials]:
- authorization = request.headers.get("Authorization")
- scheme, credentials = get_authorization_scheme_param(authorization)
- if not (authorization and scheme and credentials):
- if self.auto_error:
- raise HTTPException(
- status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
- )
- else:
- return None
- if scheme.lower() != "digest":
- if self.auto_error:
- raise HTTPException(
- status_code=HTTP_403_FORBIDDEN,
- detail="Invalid authentication credentials",
- )
- else:
- return None
- return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|