238 lines
8.0 KiB
Python
238 lines
8.0 KiB
Python
from __future__ import annotations
|
|
|
|
from abc import abstractmethod
|
|
from enum import Enum
|
|
from typing import (
|
|
Any,
|
|
List,
|
|
Optional,
|
|
Dict,
|
|
Tuple,
|
|
TypeVar,
|
|
)
|
|
from dataclasses import dataclass
|
|
|
|
from pydantic import SecretStr
|
|
|
|
from chromadb.config import (
|
|
Component,
|
|
System,
|
|
)
|
|
|
|
T = TypeVar("T")
|
|
S = TypeVar("S")
|
|
|
|
|
|
class AuthError(Exception):
|
|
pass
|
|
|
|
|
|
ClientAuthHeaders = Dict[str, SecretStr]
|
|
|
|
|
|
class ClientAuthProvider(Component):
|
|
"""
|
|
ClientAuthProvider is responsible for providing authentication headers for
|
|
client requests. Client implementations (in our case, just the FastAPI
|
|
client) must inject these headers into their requests.
|
|
"""
|
|
|
|
def __init__(self, system: System) -> None:
|
|
super().__init__(system)
|
|
|
|
@abstractmethod
|
|
def authenticate(self) -> ClientAuthHeaders:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class UserIdentity:
|
|
"""
|
|
UserIdentity represents the identity of a user. In general, not all fields
|
|
will be populated, and the fields that are populated will depend on the
|
|
authentication provider.
|
|
|
|
The idea is that the AuthenticationProvider is responsible for populating
|
|
_all_ information known about the user, and the AuthorizationProvider is
|
|
responsible for making decisions based on that information.
|
|
"""
|
|
|
|
user_id: str
|
|
tenant: Optional[str] = None
|
|
databases: Optional[List[str]] = None
|
|
# This can be used for any additional auth context which needs to be
|
|
# propagated from the authentication provider to the authorization
|
|
# provider.
|
|
attributes: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class ServerAuthenticationProvider(Component):
|
|
"""
|
|
ServerAuthenticationProvider is responsible for authenticating requests. If
|
|
a ServerAuthenticationProvider is configured, it will be called by the
|
|
server to authenticate requests. If no ServerAuthenticationProvider is
|
|
configured, all requests will be authenticated.
|
|
|
|
The ServerAuthenticationProvider should return a UserIdentity object if the
|
|
request is authenticated for use by the ServerAuthorizationProvider.
|
|
"""
|
|
|
|
def __init__(self, system: System) -> None:
|
|
super().__init__(system)
|
|
self._ignore_auth_paths: Dict[
|
|
str, List[str]
|
|
] = system.settings.chroma_server_auth_ignore_paths
|
|
self.overwrite_singleton_tenant_database_access_from_auth = (
|
|
system.settings.chroma_overwrite_singleton_tenant_database_access_from_auth
|
|
)
|
|
|
|
@abstractmethod
|
|
def authenticate_or_raise(self, headers: Dict[str, str]) -> UserIdentity:
|
|
pass
|
|
|
|
def ignore_operation(self, verb: str, path: str) -> bool:
|
|
if (
|
|
path in self._ignore_auth_paths.keys()
|
|
and verb.upper() in self._ignore_auth_paths[path]
|
|
):
|
|
return True
|
|
return False
|
|
|
|
def read_creds_or_creds_file(self) -> List[str]:
|
|
_creds_file = None
|
|
_creds = None
|
|
|
|
if self._system.settings.chroma_server_authn_credentials_file:
|
|
_creds_file = str(
|
|
self._system.settings["chroma_server_authn_credentials_file"]
|
|
)
|
|
if self._system.settings.chroma_server_authn_credentials:
|
|
_creds = str(self._system.settings["chroma_server_authn_credentials"])
|
|
if not _creds_file and not _creds:
|
|
raise ValueError(
|
|
"No credentials file or credentials found in "
|
|
"[chroma_server_authn_credentials]."
|
|
)
|
|
if _creds_file and _creds:
|
|
raise ValueError(
|
|
"Both credentials file and credentials found."
|
|
"Please provide only one."
|
|
)
|
|
if _creds:
|
|
return [c for c in _creds.split("\n") if c]
|
|
elif _creds_file:
|
|
with open(_creds_file, "r") as f:
|
|
return f.readlines()
|
|
raise ValueError("Should never happen")
|
|
|
|
def singleton_tenant_database_if_applicable(
|
|
self, user: Optional[UserIdentity]
|
|
) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
If settings.chroma_overwrite_singleton_tenant_database_access_from_auth
|
|
is False, this function always returns (None, None).
|
|
|
|
If settings.chroma_overwrite_singleton_tenant_database_access_from_auth
|
|
is True, follows the following logic:
|
|
- If the user only has access to a single tenant, this function will
|
|
return that tenant as its first return value.
|
|
- If the user only has access to a single database, this function will
|
|
return that database as its second return value. If the user has
|
|
access to multiple tenants and/or databases, including "*", this
|
|
function will return None for the corresponding value(s).
|
|
- If the user has access to multiple tenants and/or databases this
|
|
function will return None for the corresponding value(s).
|
|
"""
|
|
if not self.overwrite_singleton_tenant_database_access_from_auth or not user:
|
|
return None, None
|
|
tenant = None
|
|
database = None
|
|
if user.tenant and user.tenant != "*":
|
|
tenant = user.tenant
|
|
if user.databases and len(user.databases) == 1 and user.databases[0] != "*":
|
|
database = user.databases[0]
|
|
return tenant, database
|
|
|
|
|
|
class AuthzAction(str, Enum):
|
|
"""
|
|
The set of actions that can be authorized by the authorization provider.
|
|
"""
|
|
|
|
RESET = "system:reset"
|
|
CREATE_TENANT = "tenant:create_tenant"
|
|
GET_TENANT = "tenant:get_tenant"
|
|
CREATE_DATABASE = "db:create_database"
|
|
GET_DATABASE = "db:get_database"
|
|
DELETE_DATABASE = "db:delete_database"
|
|
LIST_DATABASES = "db:list_databases"
|
|
LIST_COLLECTIONS = "db:list_collections"
|
|
COUNT_COLLECTIONS = "db:count_collections"
|
|
CREATE_COLLECTION = "db:create_collection"
|
|
GET_OR_CREATE_COLLECTION = "db:get_or_create_collection"
|
|
GET_COLLECTION = "collection:get_collection"
|
|
DELETE_COLLECTION = "collection:delete_collection"
|
|
UPDATE_COLLECTION = "collection:update_collection"
|
|
ADD = "collection:add"
|
|
DELETE = "collection:delete"
|
|
GET = "collection:get"
|
|
QUERY = "collection:query"
|
|
COUNT = "collection:count"
|
|
UPDATE = "collection:update"
|
|
UPSERT = "collection:upsert"
|
|
|
|
|
|
@dataclass
|
|
class AuthzResource:
|
|
"""
|
|
The resource being accessed in an authorization request.
|
|
"""
|
|
|
|
tenant: Optional[str]
|
|
database: Optional[str]
|
|
collection: Optional[str]
|
|
|
|
|
|
class ServerAuthorizationProvider(Component):
|
|
"""
|
|
ServerAuthorizationProvider is responsible for authorizing requests. If a
|
|
ServerAuthorizationProvider is configured, it will be called by the server
|
|
to authorize requests. If no ServerAuthorizationProvider is configured, all
|
|
requests will be authorized.
|
|
|
|
ServerAuthorizationProvider should raise an exception if the request is not
|
|
authorized.
|
|
"""
|
|
|
|
def __init__(self, system: System) -> None:
|
|
super().__init__(system)
|
|
|
|
@abstractmethod
|
|
def authorize_or_raise(
|
|
self, user: UserIdentity, action: AuthzAction, resource: AuthzResource
|
|
) -> None:
|
|
pass
|
|
|
|
def read_config_or_config_file(self) -> List[str]:
|
|
_config_file = None
|
|
_config = None
|
|
if self._system.settings.chroma_server_authz_config_file:
|
|
_config_file = self._system.settings["chroma_server_authz_config_file"]
|
|
if self._system.settings.chroma_server_authz_config:
|
|
_config = str(self._system.settings["chroma_server_authz_config"])
|
|
if not _config_file and not _config:
|
|
raise ValueError(
|
|
"No authz configuration file or authz configuration found."
|
|
)
|
|
if _config_file and _config:
|
|
raise ValueError(
|
|
"Both authz configuration file and authz configuration found."
|
|
"Please provide only one."
|
|
)
|
|
if _config:
|
|
return [c for c in _config.split("\n") if c]
|
|
elif _config_file:
|
|
with open(_config_file, "r") as f:
|
|
return f.readlines()
|
|
raise ValueError("Should never happen")
|