-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathcustom_retriever.py
56 lines (37 loc) · 2.06 KB
/
custom_retriever.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from typing import List
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.retrievers import BaseRetriever, Document
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from openinference.instrumentation.langchain import LangChainInstrumentor
endpoint = "http://127.0.0.1:6006/v1/traces"
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint)))
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
class CustomRetriever(BaseRetriever):
"""
This example is taken from langchain docs.
https://python.langchain.com/v0.1/docs/modules/data_connection/retrievers/custom_retriever/
A custom retriever that contains the top k documents that contain the user query.
This retriever only implements the sync method _get_relevant_documents.
If the retriever were to involve file access or network access, it could benefit
from a native async implementation of `_aget_relevant_documents`.
As usual, with Runnables, there's a default async implementation that's provided
that delegates to the sync implementation running on another thread.
"""
k: int
"""Number of top results to return"""
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
"""Sync implementations for retriever."""
matching_documents: List[Document] = []
# Custom logic to find the top k documents that contain the query
for index in range(self.k):
matching_documents.append(Document(page_content=f"dummy content at {index}", score=1.0))
return matching_documents
retriever = CustomRetriever(k=3)
if __name__ == "__main__":
documents = retriever.invoke("what is the meaning of life?")