from typing import TYPE_CHECKING, Optional, Union, List, cast, Dict, Any, Tuple from chromadb.api.models.CollectionCommon import CollectionCommon from chromadb.api.types import ( URI, CollectionMetadata, Embedding, PyEmbedding, Include, Metadata, Document, Image, Where, IDs, GetResult, QueryResult, ID, OneOrMany, WhereDocument, SearchResult, maybe_cast_one_to_many, ) from chromadb.api.collection_configuration import UpdateCollectionConfiguration from chromadb.execution.expression.plan import Search import logging from chromadb.api.functions import Function if TYPE_CHECKING: from chromadb.api.models.AttachedFunction import AttachedFunction logger = logging.getLogger(__name__) if TYPE_CHECKING: from chromadb.api import ServerAPI # noqa: F401 class Collection(CollectionCommon["ServerAPI"]): def count(self) -> int: """The total number of embeddings added to the database Returns: int: The total number of embeddings added to the database """ return self._client._count( collection_id=self.id, tenant=self.tenant, database=self.database, ) def add( self, ids: OneOrMany[ID], embeddings: Optional[ Union[ OneOrMany[Embedding], OneOrMany[PyEmbedding], ] ] = None, metadatas: Optional[OneOrMany[Metadata]] = None, documents: Optional[OneOrMany[Document]] = None, images: Optional[OneOrMany[Image]] = None, uris: Optional[OneOrMany[URI]] = None, ) -> None: """Add embeddings to the data store. Args: ids: The ids of the embeddings you wish to add embeddings: The embeddings to add. If None, embeddings will be computed based on the documents or images using the embedding_function set for the Collection. Optional. metadatas: The metadata to associate with the embeddings. When querying, you can filter on this metadata. Optional. documents: The documents to associate with the embeddings. Optional. images: The images to associate with the embeddings. Optional. uris: The uris of the images to associate with the embeddings. Optional. Returns: None Raises: ValueError: If you don't provide either embeddings or documents ValueError: If the length of ids, embeddings, metadatas, or documents don't match ValueError: If you don't provide an embedding function and don't provide embeddings ValueError: If you provide both embeddings and documents ValueError: If you provide an id that already exists """ add_request = self._validate_and_prepare_add_request( ids=ids, embeddings=embeddings, metadatas=metadatas, documents=documents, images=images, uris=uris, ) self._client._add( collection_id=self.id, ids=add_request["ids"], embeddings=add_request["embeddings"], metadatas=add_request["metadatas"], documents=add_request["documents"], uris=add_request["uris"], tenant=self.tenant, database=self.database, ) def get( self, ids: Optional[OneOrMany[ID]] = None, where: Optional[Where] = None, limit: Optional[int] = None, offset: Optional[int] = None, where_document: Optional[WhereDocument] = None, include: Include = ["metadatas", "documents"], ) -> GetResult: """Get embeddings and their associate data from the data store. If no ids or where filter is provided returns all embeddings up to limit starting at offset. Args: ids: The ids of the embeddings to get. Optional. where: A Where type dict used to filter results by. E.g. `{"$and": [{"color" : "red"}, {"price": {"$gte": 4.20}}]}`. Optional. limit: The number of documents to return. Optional. offset: The offset to start returning results from. Useful for paging results with limit. Optional. where_document: A WhereDocument type dict used to filter by the documents. E.g. `{"$contains": "hello"}`. Optional. include: A list of what to include in the results. Can contain `"embeddings"`, `"metadatas"`, `"documents"`. Ids are always included. Defaults to `["metadatas", "documents"]`. Optional. Returns: GetResult: A GetResult object containing the results. """ get_request = self._validate_and_prepare_get_request( ids=ids, where=where, where_document=where_document, include=include, ) get_results = self._client._get( collection_id=self.id, ids=get_request["ids"], where=get_request["where"], where_document=get_request["where_document"], include=get_request["include"], limit=limit, offset=offset, tenant=self.tenant, database=self.database, ) return self._transform_get_response( response=get_results, include=get_request["include"] ) def peek(self, limit: int = 10) -> GetResult: """Get the first few results in the database up to limit Args: limit: The number of results to return. Returns: GetResult: A GetResult object containing the results. """ return self._transform_peek_response( self._client._peek( collection_id=self.id, n=limit, tenant=self.tenant, database=self.database, ) ) def query( self, query_embeddings: Optional[ Union[ OneOrMany[Embedding], OneOrMany[PyEmbedding], ] ] = None, query_texts: Optional[OneOrMany[Document]] = None, query_images: Optional[OneOrMany[Image]] = None, query_uris: Optional[OneOrMany[URI]] = None, ids: Optional[OneOrMany[ID]] = None, n_results: int = 10, where: Optional[Where] = None, where_document: Optional[WhereDocument] = None, include: Include = [ "metadatas", "documents", "distances", ], ) -> QueryResult: """Get the n_results nearest neighbor embeddings for provided query_embeddings or query_texts. Args: query_embeddings: The embeddings to get the closes neighbors of. Optional. query_texts: The document texts to get the closes neighbors of. Optional. query_images: The images to get the closes neighbors of. Optional. query_uris: The URIs to be used with data loader. Optional. ids: A subset of ids to search within. Optional. n_results: The number of neighbors to return for each query_embedding or query_texts. Optional. where: A Where type dict used to filter results by. E.g. `{"$and": [{"color" : "red"}, {"price": {"$gte": 4.20}}]}`. Optional. where_document: A WhereDocument type dict used to filter by the documents. E.g. `{"$contains": "hello"}`. Optional. include: A list of what to include in the results. Can contain `"embeddings"`, `"metadatas"`, `"documents"`, `"distances"`. Ids are always included. Defaults to `["metadatas", "documents", "distances"]`. Optional. Returns: QueryResult: A QueryResult object containing the results. Raises: ValueError: If you don't provide either query_embeddings, query_texts, or query_images ValueError: If you provide both query_embeddings and query_texts ValueError: If you provide both query_embeddings and query_images ValueError: If you provide both query_texts and query_images """ query_request = self._validate_and_prepare_query_request( query_embeddings=query_embeddings, query_texts=query_texts, query_images=query_images, query_uris=query_uris, ids=ids, n_results=n_results, where=where, where_document=where_document, include=include, ) query_results = self._client._query( collection_id=self.id, ids=query_request["ids"], query_embeddings=query_request["embeddings"], n_results=query_request["n_results"], where=query_request["where"], where_document=query_request["where_document"], include=query_request["include"], tenant=self.tenant, database=self.database, ) return self._transform_query_response( response=query_results, include=query_request["include"] ) def modify( self, name: Optional[str] = None, metadata: Optional[CollectionMetadata] = None, configuration: Optional[UpdateCollectionConfiguration] = None, ) -> None: """Modify the collection name or metadata Args: name: The updated name for the collection. Optional. metadata: The updated metadata for the collection. Optional. Returns: None """ self._validate_modify_request(metadata) # Note there is a race condition here where the metadata can be updated # but another thread sees the cached local metadata. # TODO: fixme self._client._modify( id=self.id, new_name=name, new_metadata=metadata, new_configuration=configuration, tenant=self.tenant, database=self.database, ) self._update_model_after_modify_success(name, metadata, configuration) def fork( self, new_name: str, ) -> "Collection": """Fork the current collection under a new name. The returning collection should contain identical data to the current collection. This is an experimental API that only works for Hosted Chroma for now. Args: new_name: The name of the new collection. Returns: Collection: A new collection with the specified name and containing identical data to the current collection. """ model = self._client._fork( collection_id=self.id, new_name=new_name, tenant=self.tenant, database=self.database, ) return Collection( client=self._client, model=model, embedding_function=self._embedding_function, data_loader=self._data_loader, ) def search( self, searches: OneOrMany[Search], ) -> SearchResult: """Perform hybrid search on the collection. This is an experimental API that only works for Hosted Chroma for now. Args: searches: A single Search object or a list of Search objects, each containing: - where: Where expression for filtering - rank: Ranking expression for hybrid search (defaults to Val(0.0)) - limit: Limit configuration for pagination (defaults to no limit) - select: Select configuration for keys to return (defaults to empty) Returns: SearchResult: Column-major format response with: - ids: List of result IDs for each search payload - documents: Optional documents for each payload - embeddings: Optional embeddings for each payload - metadatas: Optional metadata for each payload - scores: Optional scores for each payload - select: List of selected keys for each payload Raises: NotImplementedError: For local/segment API implementations Examples: # Using builder pattern with Key constants from chromadb.execution.expression import ( Search, Key, K, Knn, Val ) # Note: K is an alias for Key, so K.DOCUMENT == Key.DOCUMENT search = (Search() .where((K("category") == "science") & (K("score") > 0.5)) .rank(Knn(query=[0.1, 0.2, 0.3]) * 0.8 + Val(0.5) * 0.2) .limit(10, offset=0) .select(K.DOCUMENT, K.SCORE, "title")) # Direct construction from chromadb.execution.expression import ( Search, Eq, And, Gt, Knn, Limit, Select, Key ) search = Search( where=And([Eq("category", "science"), Gt("score", 0.5)]), rank=Knn(query=[0.1, 0.2, 0.3]), limit=Limit(offset=0, limit=10), select=Select(keys={Key.DOCUMENT, Key.SCORE, "title"}) ) # Single search result = collection.search(search) # Multiple searches at once searches = [ Search().where(K("type") == "article").rank(Knn(query=[0.1, 0.2])), Search().where(K("type") == "paper").rank(Knn(query=[0.3, 0.4])) ] results = collection.search(searches) """ # Convert single search to list for consistent handling searches_list = maybe_cast_one_to_many(searches) if searches_list is None: searches_list = [] # Embed any string queries in Knn objects embedded_searches = [ self._embed_search_string_queries(search) for search in searches_list ] return self._client._search( collection_id=self.id, searches=cast(List[Search], embedded_searches), tenant=self.tenant, database=self.database, ) def update( self, ids: OneOrMany[ID], embeddings: Optional[ Union[ OneOrMany[Embedding], OneOrMany[PyEmbedding], ] ] = None, metadatas: Optional[OneOrMany[Metadata]] = None, documents: Optional[OneOrMany[Document]] = None, images: Optional[OneOrMany[Image]] = None, uris: Optional[OneOrMany[URI]] = None, ) -> None: """Update the embeddings, metadatas or documents for provided ids. Args: ids: The ids of the embeddings to update embeddings: The embeddings to update. If None, embeddings will be computed based on the documents or images using the embedding_function set for the Collection. Optional. metadatas: The metadata to associate with the embeddings. When querying, you can filter on this metadata. Optional. documents: The documents to associate with the embeddings. Optional. images: The images to associate with the embeddings. Optional. Returns: None """ update_request = self._validate_and_prepare_update_request( ids=ids, embeddings=embeddings, metadatas=metadatas, documents=documents, images=images, uris=uris, ) self._client._update( collection_id=self.id, ids=update_request["ids"], embeddings=update_request["embeddings"], metadatas=update_request["metadatas"], documents=update_request["documents"], uris=update_request["uris"], tenant=self.tenant, database=self.database, ) def upsert( self, ids: OneOrMany[ID], embeddings: Optional[ Union[ OneOrMany[Embedding], OneOrMany[PyEmbedding], ] ] = None, metadatas: Optional[OneOrMany[Metadata]] = None, documents: Optional[OneOrMany[Document]] = None, images: Optional[OneOrMany[Image]] = None, uris: Optional[OneOrMany[URI]] = None, ) -> None: """Update the embeddings, metadatas or documents for provided ids, or create them if they don't exist. Args: ids: The ids of the embeddings to update embeddings: The embeddings to add. If None, embeddings will be computed based on the documents using the embedding_function set for the Collection. Optional. metadatas: The metadata to associate with the embeddings. When querying, you can filter on this metadata. Optional. documents: The documents to associate with the embeddings. Optional. Returns: None """ upsert_request = self._validate_and_prepare_upsert_request( ids=ids, embeddings=embeddings, metadatas=metadatas, documents=documents, images=images, uris=uris, ) self._client._upsert( collection_id=self.id, ids=upsert_request["ids"], embeddings=upsert_request["embeddings"], metadatas=upsert_request["metadatas"], documents=upsert_request["documents"], uris=upsert_request["uris"], tenant=self.tenant, database=self.database, ) def delete( self, ids: Optional[IDs] = None, where: Optional[Where] = None, where_document: Optional[WhereDocument] = None, ) -> None: """Delete the embeddings based on ids and/or a where filter Args: ids: The ids of the embeddings to delete where: A Where type dict used to filter the delection by. E.g. `{"$and": [{"color" : "red"}, {"price": {"$gte": 4.20}]}}`. Optional. where_document: A WhereDocument type dict used to filter the deletion by the document content. E.g. `{"$contains": "hello"}`. Optional. Returns: None Raises: ValueError: If you don't provide either ids, where, or where_document """ delete_request = self._validate_and_prepare_delete_request( ids, where, where_document ) self._client._delete( collection_id=self.id, ids=delete_request["ids"], where=delete_request["where"], where_document=delete_request["where_document"], tenant=self.tenant, database=self.database, ) def attach_function( self, function: Function, name: str, output_collection: str, params: Optional[Dict[str, Any]] = None, ) -> Tuple["AttachedFunction", bool]: """Attach a function to this collection. Args: function: A Function enum value (e.g., STATISTICS_FUNCTION, RECORD_COUNTER_FUNCTION) name: Unique name for this attached function output_collection: Name of the collection where function output will be stored params: Optional dictionary with function-specific parameters Returns: Tuple of (AttachedFunction, created) where created is True if newly created, False if already existed (idempotent request) Example: >>> from chromadb.api.functions import STATISTICS_FUNCTION >>> attached_fn = collection.attach_function( ... function=STATISTICS_FUNCTION, ... name="mycoll_stats_fn", ... output_collection="mycoll_stats", ... ) >>> if created: ... print("New function attached") ... else: ... print("Function already existed") """ function_id = function.value if isinstance(function, Function) else function return self._client.attach_function( function_id=function_id, name=name, input_collection_id=self.id, output_collection=output_collection, params=params, tenant=self.tenant, database=self.database, ) def get_attached_function(self, name: str) -> "AttachedFunction": """Get an attached function by name for this collection. Args: name: Name of the attached function Returns: AttachedFunction: The attached function object Raises: NotFoundError: If the attached function doesn't exist """ return self._client.get_attached_function( name=name, input_collection_id=self.id, tenant=self.tenant, database=self.database, ) def detach_function( self, name: str, delete_output_collection: bool = False, ) -> bool: """Detach a function from this collection. Args: name: The name of the attached function delete_output_collection: Whether to also delete the output collection. Defaults to False. Returns: bool: True if successful Example: >>> success = collection.detach_function("my_function", delete_output_collection=True) """ return self._client.detach_function( name=name, input_collection_id=self.id, delete_output=delete_output_collection, tenant=self.tenant, database=self.database, )