-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpaperless.py
More file actions
157 lines (140 loc) · 5.68 KB
/
paperless.py
File metadata and controls
157 lines (140 loc) · 5.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
title: Paperless Document Search
description: A Tool to interact with a running paperless server. Inspired by https://github.com/JLeine/open-webui
author: Stefan Ostermann
git_url: https://github.com/soster/openwebui-ai-tools/paperless.py
version: 0.0.1
license: MIT
"""
import json
import requests
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from pydantic import BaseModel, Field
from typing import Callable, Any, Iterator, Optional
class DocumentEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Document):
return {"page_content": obj.page_content, "metadata": obj.metadata}
return super().default(obj)
class PaperlessDocumentLoader(BaseLoader):
"""Paperless document loader that retrieves documents based on a general search query."""
def __init__(
self,
query: Optional[str] = "",
url: Optional[str] = "",
token: Optional[str] = "",
) -> None:
"""Initialize the loader with a search query.
Args:
query: The general search query to load documents.
url: The URL to load documents from (optional).
token: The authorization token for API access (optional).
"""
self.url = url if url else "/"
if len(self.url) > 0 and not self.url.endswith("/"):
self.url += "/"
self.url += "api/documents/"
self.token = token if token else ""
self.query = query
def lazy_load(self) -> Iterator[Document]:
"""A lazy loader that requests documents from paperless based on the search query."""
querystring = {"query": self.query}
headers = {"Authorization": f"Token {self.token}"}
response = requests.get(self.url, headers=headers, params=querystring)
if response.status_code == 200:
data = response.json()
for result in data["results"]:
metadata = {
"source": f"{self.url.replace('/api', '')}{result['id']}",
**result,
}
metadata = {
k: v
for k, v in metadata.items()
if v is not None and not isinstance(v, list)
}
yield Document(
page_content=result["content"],
metadata=metadata,
)
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Tools:
class Valves(BaseModel):
PAPERLESS_URL: str = Field(
default="https://paperless.yourdomain.com/",
description="The domain of your paperless service",
)
PAPERLESS_TOKEN: str = Field(
default="",
description="The token to read docs from paperless",
)
def __init__(self):
self.valves = self.Valves()
async def get_paperless_documents(
self,
query: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Search for paperless documents using a general search query and retrieve the content of relevant documents.
:param query: The general search query to filter documents.
:return: All documents as a JSON string or an error as a string
"""
emitter = EventEmitter(__event_emitter__)
try:
await emitter.progress_update(f"Searching documents for query: {query}")
loader = PaperlessDocumentLoader(
query=query,
url=self.valves.PAPERLESS_URL,
token=self.valves.PAPERLESS_TOKEN,
)
documents = loader.load()
if len(documents) == 0:
error_message = f"Query returned 0 results for query: {query}"
await emitter.error_update(error_message)
return error_message
encoded_documents = json.dumps(
documents, cls=DocumentEncoder, ensure_ascii=False
)
decoded_documents = json.loads(encoded_documents)
if __event_emitter__:
for document in decoded_documents:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [document["page_content"]],
"metadata": [{"source": document["metadata"]["title"]}],
"source": {"name": document["metadata"]["source"]},
},
}
)
await emitter.success_update(
f"Received {len(decoded_documents)} documents for query: {query}"
)
return encoded_documents
except Exception as e:
error_message = f"Error: {str(e)}"
await emitter.error_update(error_message)
return error_message