mirror of
https://github.com/kennethreitz/langchain.git
synced 2026-06-05 23:00:18 +00:00
e41b382e1c
### Description: Updated the delete function in the Pinecone integration to allow for deletion of vectors by specifying a filter condition, and to delete all vectors in a namespace. Made the ids parameter optional in the delete function in the base VectorStore class and allowed for additional keyword arguments. Updated the delete function in several classes (Redis, Chroma, Supabase, Deeplake, Elastic, Weaviate, and Cassandra) to match the changes made in the base VectorStore class. This involved making the ids parameter optional and allowing for additional keyword arguments.
460 lines
16 KiB
Python
460 lines
16 KiB
Python
"""Interface for vector stores."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import warnings
|
|
from abc import ABC, abstractmethod
|
|
from functools import partial
|
|
from typing import (
|
|
Any,
|
|
ClassVar,
|
|
Collection,
|
|
Dict,
|
|
Iterable,
|
|
List,
|
|
Optional,
|
|
Tuple,
|
|
Type,
|
|
TypeVar,
|
|
)
|
|
|
|
from pydantic import BaseModel, Field, root_validator
|
|
|
|
from langchain.callbacks.manager import (
|
|
AsyncCallbackManagerForRetrieverRun,
|
|
CallbackManagerForRetrieverRun,
|
|
)
|
|
from langchain.docstore.document import Document
|
|
from langchain.embeddings.base import Embeddings
|
|
from langchain.schema import BaseRetriever
|
|
|
|
VST = TypeVar("VST", bound="VectorStore")
|
|
|
|
|
|
class VectorStore(ABC):
|
|
"""Interface for vector stores."""
|
|
|
|
@abstractmethod
|
|
def add_texts(
|
|
self,
|
|
texts: Iterable[str],
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[str]:
|
|
"""Run more texts through the embeddings and add to the vectorstore.
|
|
|
|
Args:
|
|
texts: Iterable of strings to add to the vectorstore.
|
|
metadatas: Optional list of metadatas associated with the texts.
|
|
kwargs: vectorstore specific parameters
|
|
|
|
Returns:
|
|
List of ids from adding the texts into the vectorstore.
|
|
"""
|
|
|
|
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
|
|
"""Delete by vector ID or other criteria.
|
|
|
|
Args:
|
|
ids: List of ids to delete.
|
|
**kwargs: Other keyword arguments that subclasses might use.
|
|
|
|
Returns:
|
|
Optional[bool]: True if deletion is successful,
|
|
False otherwise, None if not implemented.
|
|
"""
|
|
|
|
raise NotImplementedError("delete method must be implemented by subclass.")
|
|
|
|
async def aadd_texts(
|
|
self,
|
|
texts: Iterable[str],
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> List[str]:
|
|
"""Run more texts through the embeddings and add to the vectorstore."""
|
|
raise NotImplementedError
|
|
|
|
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
|
|
"""Run more documents through the embeddings and add to the vectorstore.
|
|
|
|
Args:
|
|
documents (List[Document]: Documents to add to the vectorstore.
|
|
|
|
Returns:
|
|
List[str]: List of IDs of the added texts.
|
|
"""
|
|
# TODO: Handle the case where the user doesn't provide ids on the Collection
|
|
texts = [doc.page_content for doc in documents]
|
|
metadatas = [doc.metadata for doc in documents]
|
|
return self.add_texts(texts, metadatas, **kwargs)
|
|
|
|
async def aadd_documents(
|
|
self, documents: List[Document], **kwargs: Any
|
|
) -> List[str]:
|
|
"""Run more documents through the embeddings and add to the vectorstore.
|
|
|
|
Args:
|
|
documents (List[Document]: Documents to add to the vectorstore.
|
|
|
|
Returns:
|
|
List[str]: List of IDs of the added texts.
|
|
"""
|
|
texts = [doc.page_content for doc in documents]
|
|
metadatas = [doc.metadata for doc in documents]
|
|
return await self.aadd_texts(texts, metadatas, **kwargs)
|
|
|
|
def search(self, query: str, search_type: str, **kwargs: Any) -> List[Document]:
|
|
"""Return docs most similar to query using specified search type."""
|
|
if search_type == "similarity":
|
|
return self.similarity_search(query, **kwargs)
|
|
elif search_type == "mmr":
|
|
return self.max_marginal_relevance_search(query, **kwargs)
|
|
else:
|
|
raise ValueError(
|
|
f"search_type of {search_type} not allowed. Expected "
|
|
"search_type to be 'similarity' or 'mmr'."
|
|
)
|
|
|
|
async def asearch(
|
|
self, query: str, search_type: str, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to query using specified search type."""
|
|
if search_type == "similarity":
|
|
return await self.asimilarity_search(query, **kwargs)
|
|
elif search_type == "mmr":
|
|
return await self.amax_marginal_relevance_search(query, **kwargs)
|
|
else:
|
|
raise ValueError(
|
|
f"search_type of {search_type} not allowed. Expected "
|
|
"search_type to be 'similarity' or 'mmr'."
|
|
)
|
|
|
|
@abstractmethod
|
|
def similarity_search(
|
|
self, query: str, k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to query."""
|
|
|
|
def similarity_search_with_relevance_scores(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
**kwargs: Any,
|
|
) -> List[Tuple[Document, float]]:
|
|
"""Return docs and relevance scores in the range [0, 1].
|
|
|
|
0 is dissimilar, 1 is most similar.
|
|
|
|
Args:
|
|
query: input text
|
|
k: Number of Documents to return. Defaults to 4.
|
|
**kwargs: kwargs to be passed to similarity search. Should include:
|
|
score_threshold: Optional, a floating point value between 0 to 1 to
|
|
filter the resulting set of retrieved docs
|
|
|
|
Returns:
|
|
List of Tuples of (doc, similarity_score)
|
|
"""
|
|
docs_and_similarities = self._similarity_search_with_relevance_scores(
|
|
query, k=k, **kwargs
|
|
)
|
|
if any(
|
|
similarity < 0.0 or similarity > 1.0
|
|
for _, similarity in docs_and_similarities
|
|
):
|
|
warnings.warn(
|
|
"Relevance scores must be between"
|
|
f" 0 and 1, got {docs_and_similarities}"
|
|
)
|
|
|
|
score_threshold = kwargs.get("score_threshold")
|
|
if score_threshold is not None:
|
|
docs_and_similarities = [
|
|
(doc, similarity)
|
|
for doc, similarity in docs_and_similarities
|
|
if similarity >= score_threshold
|
|
]
|
|
if len(docs_and_similarities) == 0:
|
|
warnings.warn(
|
|
"No relevant docs were retrieved using the relevance score"
|
|
f" threshold {score_threshold}"
|
|
)
|
|
return docs_and_similarities
|
|
|
|
def _similarity_search_with_relevance_scores(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
**kwargs: Any,
|
|
) -> List[Tuple[Document, float]]:
|
|
"""Return docs and relevance scores, normalized on a scale from 0 to 1.
|
|
|
|
0 is dissimilar, 1 is most similar.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def asimilarity_search_with_relevance_scores(
|
|
self, query: str, k: int = 4, **kwargs: Any
|
|
) -> List[Tuple[Document, float]]:
|
|
"""Return docs most similar to query."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(self.similarity_search_with_relevance_scores, query, k, **kwargs)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
async def asimilarity_search(
|
|
self, query: str, k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to query."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(self.similarity_search, query, k, **kwargs)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
def similarity_search_by_vector(
|
|
self, embedding: List[float], k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to embedding vector.
|
|
|
|
Args:
|
|
embedding: Embedding to look up documents similar to.
|
|
k: Number of Documents to return. Defaults to 4.
|
|
|
|
Returns:
|
|
List of Documents most similar to the query vector.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def asimilarity_search_by_vector(
|
|
self, embedding: List[float], k: int = 4, **kwargs: Any
|
|
) -> List[Document]:
|
|
"""Return docs most similar to embedding vector."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(self.similarity_search_by_vector, embedding, k, **kwargs)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
def max_marginal_relevance_search(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance.
|
|
|
|
Maximal marginal relevance optimizes for similarity to query AND diversity
|
|
among selected documents.
|
|
|
|
Args:
|
|
query: Text to look up documents similar to.
|
|
k: Number of Documents to return. Defaults to 4.
|
|
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
|
lambda_mult: Number between 0 and 1 that determines the degree
|
|
of diversity among the results with 0 corresponding
|
|
to maximum diversity and 1 to minimum diversity.
|
|
Defaults to 0.5.
|
|
Returns:
|
|
List of Documents selected by maximal marginal relevance.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def amax_marginal_relevance_search(
|
|
self,
|
|
query: str,
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance."""
|
|
|
|
# This is a temporary workaround to make the similarity search
|
|
# asynchronous. The proper solution is to make the similarity search
|
|
# asynchronous in the vector store implementations.
|
|
func = partial(
|
|
self.max_marginal_relevance_search, query, k, fetch_k, lambda_mult, **kwargs
|
|
)
|
|
return await asyncio.get_event_loop().run_in_executor(None, func)
|
|
|
|
def max_marginal_relevance_search_by_vector(
|
|
self,
|
|
embedding: List[float],
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance.
|
|
|
|
Maximal marginal relevance optimizes for similarity to query AND diversity
|
|
among selected documents.
|
|
|
|
Args:
|
|
embedding: Embedding to look up documents similar to.
|
|
k: Number of Documents to return. Defaults to 4.
|
|
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
|
lambda_mult: Number between 0 and 1 that determines the degree
|
|
of diversity among the results with 0 corresponding
|
|
to maximum diversity and 1 to minimum diversity.
|
|
Defaults to 0.5.
|
|
Returns:
|
|
List of Documents selected by maximal marginal relevance.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def amax_marginal_relevance_search_by_vector(
|
|
self,
|
|
embedding: List[float],
|
|
k: int = 4,
|
|
fetch_k: int = 20,
|
|
lambda_mult: float = 0.5,
|
|
**kwargs: Any,
|
|
) -> List[Document]:
|
|
"""Return docs selected using the maximal marginal relevance."""
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def from_documents(
|
|
cls: Type[VST],
|
|
documents: List[Document],
|
|
embedding: Embeddings,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from documents and embeddings."""
|
|
texts = [d.page_content for d in documents]
|
|
metadatas = [d.metadata for d in documents]
|
|
return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)
|
|
|
|
@classmethod
|
|
async def afrom_documents(
|
|
cls: Type[VST],
|
|
documents: List[Document],
|
|
embedding: Embeddings,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from documents and embeddings."""
|
|
texts = [d.page_content for d in documents]
|
|
metadatas = [d.metadata for d in documents]
|
|
return await cls.afrom_texts(texts, embedding, metadatas=metadatas, **kwargs)
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def from_texts(
|
|
cls: Type[VST],
|
|
texts: List[str],
|
|
embedding: Embeddings,
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from texts and embeddings."""
|
|
|
|
@classmethod
|
|
async def afrom_texts(
|
|
cls: Type[VST],
|
|
texts: List[str],
|
|
embedding: Embeddings,
|
|
metadatas: Optional[List[dict]] = None,
|
|
**kwargs: Any,
|
|
) -> VST:
|
|
"""Return VectorStore initialized from texts and embeddings."""
|
|
raise NotImplementedError
|
|
|
|
def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
|
|
return VectorStoreRetriever(vectorstore=self, **kwargs)
|
|
|
|
|
|
class VectorStoreRetriever(BaseRetriever, BaseModel):
|
|
vectorstore: VectorStore
|
|
search_type: str = "similarity"
|
|
search_kwargs: dict = Field(default_factory=dict)
|
|
allowed_search_types: ClassVar[Collection[str]] = (
|
|
"similarity",
|
|
"similarity_score_threshold",
|
|
"mmr",
|
|
)
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
@root_validator()
|
|
def validate_search_type(cls, values: Dict) -> Dict:
|
|
"""Validate search type."""
|
|
search_type = values["search_type"]
|
|
if search_type not in cls.allowed_search_types:
|
|
raise ValueError(
|
|
f"search_type of {search_type} not allowed. Valid values are: "
|
|
f"{cls.allowed_search_types}"
|
|
)
|
|
if search_type == "similarity_score_threshold":
|
|
score_threshold = values["search_kwargs"].get("score_threshold")
|
|
if (score_threshold is None) or (not isinstance(score_threshold, float)):
|
|
raise ValueError(
|
|
"`score_threshold` is not specified with a float value(0~1) "
|
|
"in `search_kwargs`."
|
|
)
|
|
return values
|
|
|
|
def _get_relevant_documents(
|
|
self, query: str, *, run_manager: Optional[CallbackManagerForRetrieverRun]
|
|
) -> List[Document]:
|
|
if self.search_type == "similarity":
|
|
docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
|
|
elif self.search_type == "similarity_score_threshold":
|
|
docs_and_similarities = (
|
|
self.vectorstore.similarity_search_with_relevance_scores(
|
|
query, **self.search_kwargs
|
|
)
|
|
)
|
|
docs = [doc for doc, _ in docs_and_similarities]
|
|
elif self.search_type == "mmr":
|
|
docs = self.vectorstore.max_marginal_relevance_search(
|
|
query, **self.search_kwargs
|
|
)
|
|
else:
|
|
raise ValueError(f"search_type of {self.search_type} not allowed.")
|
|
return docs
|
|
|
|
async def _aget_relevant_documents(
|
|
self, query: str, *, run_manager: Optional[AsyncCallbackManagerForRetrieverRun]
|
|
) -> List[Document]:
|
|
if self.search_type == "similarity":
|
|
docs = await self.vectorstore.asimilarity_search(
|
|
query, **self.search_kwargs
|
|
)
|
|
elif self.search_type == "similarity_score_threshold":
|
|
docs_and_similarities = (
|
|
await self.vectorstore.asimilarity_search_with_relevance_scores(
|
|
query, **self.search_kwargs
|
|
)
|
|
)
|
|
docs = [doc for doc, _ in docs_and_similarities]
|
|
elif self.search_type == "mmr":
|
|
docs = await self.vectorstore.amax_marginal_relevance_search(
|
|
query, **self.search_kwargs
|
|
)
|
|
else:
|
|
raise ValueError(f"search_type of {self.search_type} not allowed.")
|
|
return docs
|
|
|
|
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
|
|
"""Add documents to vectorstore."""
|
|
return self.vectorstore.add_documents(documents, **kwargs)
|
|
|
|
async def aadd_documents(
|
|
self, documents: List[Document], **kwargs: Any
|
|
) -> List[str]:
|
|
"""Add documents to vectorstore."""
|
|
return await self.vectorstore.aadd_documents(documents, **kwargs)
|