summaryrefslogtreecommitdiff
path: root/rag/rag.py
blob: cd4537ecccde577ea8eac547c2fda3397cb3ada8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
from typing import List

from dotenv import load_dotenv
from loguru import logger as log
from qdrant_client.models import StrictFloat


try:
    from rag.db.vector import VectorDB
    from rag.db.document import DocumentDB
    from rag.llm.encoder import Encoder
    from rag.llm.generator import Generator, Prompt
    from rag.parser.pdf import PDFParser
except ModuleNotFoundError:
    from db.vector import VectorDB
    from db.document import DocumentDB
    from llm.encoder import Encoder
    from llm.generator import Generator, Prompt
    from parser.pdf import PDFParser


@dataclass
class Response:
    query: str
    context: List[str]
    answer: str


class RAG:
    def __init__(self) -> None:
        # FIXME: load this somewhere else?
        load_dotenv()
        self.pdf_parser = PDFParser()
        self.generator = Generator()
        self.encoder = Encoder()
        self.vector_db = VectorDB()
        self.doc_db = DocumentDB()

    def add_pdf_from_path(self, path: Path):
        blob = self.pdf_parser.from_path(path)
        self.add_pdf_from_blob(blob)

    def add_pdf_from_blob(self, blob: BytesIO):
        if self.doc_db.add(blob):
            log.debug("Adding pdf to vector database...")
            chunks = self.pdf_parser.from_data(blob)
            points = self.encoder.encode_document(chunks)
            self.vector_db.add(points)
        else:
            log.debug("Document already exists!")

    def __context(self, query_emb: List[StrictFloat], limit: int) -> str:
        hits = self.vector_db.search(query_emb, limit)
        log.debug(f"Got {len(hits)} hits in the vector db with limit={limit}")
        return [h.payload["text"] for h in hits]

    def retrive(self, query: str, limit: int = 5) -> Response:
        query_emb = self.encoder.encode_query(query)
        context = self.__context(query_emb, limit)
        prompt = Prompt(query, "\n".join(context))
        answer = self.generator.generate(prompt)["response"]
        return Response(query, context, answer)