Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/memos/embedders/universal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ async def _create_embeddings():
)
)
logger.info(f"Embeddings request succeeded with {time.time() - init_time} seconds")
logger.info(f"Embeddings request response: {response}")
return [r.embedding for r in response.data]
except Exception as e:
if self.use_backup_client:
Expand Down
14 changes: 8 additions & 6 deletions src/memos/memories/textual/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ def delete_by_memory_ids(self, memory_ids: list[str]) -> None:
except Exception as e:
logger.error(f"An error occurred while deleting memories by memory_ids: {e}")

def delete_all(self) -> None:
def delete_all(self, user_name: str | None = None) -> None:
"""Delete all memories and their relationships from the graph store."""
try:
self.graph_store.clear()
self.graph_store.clear(user_name=user_name)
logger.info("All memories and edges have been deleted from the graph.")
except Exception as e:
logger.error(f"An error occurred while deleting all memories: {e}")
Expand All @@ -424,7 +424,7 @@ def delete_by_filter(
writable_cube_ids=writable_cube_ids, file_ids=file_ids, filter=filter
)

def load(self, dir: str) -> None:
def load(self, dir: str, user_name: str | None = None) -> None:
try:
memory_file = os.path.join(dir, self.config.memory_filename)

Expand All @@ -435,7 +435,7 @@ def load(self, dir: str) -> None:
with open(memory_file, encoding="utf-8") as f:
memories = json.load(f)

self.graph_store.import_graph(memories)
self.graph_store.import_graph(memories, user_name=user_name)
logger.info(f"Loaded {len(memories)} memories from {memory_file}")

except FileNotFoundError:
Expand All @@ -445,10 +445,12 @@ def load(self, dir: str) -> None:
except Exception as e:
logger.error(f"An error occurred while loading memories: {e}")

def dump(self, dir: str, include_embedding: bool = False) -> None:
def dump(self, dir: str, include_embedding: bool = False, user_name: str | None = None) -> None:
"""Dump memories to os.path.join(dir, self.config.memory_filename)"""
try:
json_memories = self.graph_store.export_graph(include_embedding=include_embedding)
json_memories = self.graph_store.export_graph(
include_embedding=include_embedding, user_name=user_name
)

os.makedirs(dir, exist_ok=True)
memory_file = os.path.join(dir, self.config.memory_filename)
Expand Down
64 changes: 46 additions & 18 deletions src/memos/memories/textual/tree_text_memory/organize/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,24 @@ def __init__(self, graph_store: Neo4jGraphDB, llm: BaseLLM, embedder: BaseEmbedd
self.llm = llm
self.embedder = embedder

def detect(self, memory, top_k: int = 5, scope=None):
def detect(self, memory, top_k: int = 5, scope=None, user_name: str | None = None):
# 1. Search for similar memories based on embedding
embedding = memory.metadata.embedding
embedding_candidates_info = self.graph_store.search_by_embedding(
embedding, top_k=top_k, scope=scope, threshold=self.EMBEDDING_THRESHOLD
embedding,
top_k=top_k,
scope=scope,
threshold=self.EMBEDDING_THRESHOLD,
user_name=user_name,
)
# 2. Filter based on similarity threshold
embedding_candidates_ids = [
info["id"] for info in embedding_candidates_info if info["id"] != memory.id
]
# 3. Judge conflicts using LLM
embedding_candidates = self.graph_store.get_nodes(embedding_candidates_ids)
embedding_candidates = self.graph_store.get_nodes(
embedding_candidates_ids, user_name=user_name
)
detected_relationships = []
for embedding_candidate in embedding_candidates:
embedding_candidate = TextualMemoryItem.from_dict(embedding_candidate)
Expand Down Expand Up @@ -67,13 +73,20 @@ def detect(self, memory, top_k: int = 5, scope=None):
pass
return detected_relationships

def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem, relation) -> None:
def resolve(
self,
memory_a: TextualMemoryItem,
memory_b: TextualMemoryItem,
relation,
user_name: str | None = None,
) -> None:
"""
Resolve detected conflicts between two memory items using LLM fusion.
Args:
memory_a: The first conflicting memory item.
memory_b: The second conflicting memory item.
relation: relation
user_name: Optional user name for multi-tenant isolation.
Returns:
A fused TextualMemoryItem representing the resolved memory.
"""
Expand Down Expand Up @@ -105,17 +118,22 @@ def resolve(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem, rela
logger.warning(
f"{relation} between {memory_a.id} and {memory_b.id} could not be resolved. "
)
self._hard_update(memory_a, memory_b)
self._hard_update(memory_a, memory_b, user_name=user_name)
# —————— 2.2 Conflict resolved, update metadata and memory ————
else:
fixed_metadata = self._merge_metadata(answer, memory_a.metadata, memory_b.metadata)
merged_memory = TextualMemoryItem(memory=answer, metadata=fixed_metadata)
logger.info(f"Resolved result: {merged_memory}")
self._resolve_in_graph(memory_a, memory_b, merged_memory)
self._resolve_in_graph(memory_a, memory_b, merged_memory, user_name=user_name)
except json.decoder.JSONDecodeError:
logger.error(f"Failed to parse LLM response: {response}")

def _hard_update(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem):
def _hard_update(
self,
memory_a: TextualMemoryItem,
memory_b: TextualMemoryItem,
user_name: str | None = None,
):
"""
Hard update: compare updated_at, keep the newer one, overwrite the older one's metadata.
"""
Expand All @@ -125,7 +143,7 @@ def _hard_update(self, memory_a: TextualMemoryItem, memory_b: TextualMemoryItem)
newer_mem = memory_a if time_a >= time_b else memory_b
older_mem = memory_b if time_a >= time_b else memory_a

self.graph_store.delete_node(older_mem.id)
self.graph_store.delete_node(older_mem.id, user_name=user_name)
logger.warning(
f"Delete older memory {older_mem.id}: <{older_mem.memory}> due to conflict with {newer_mem.id}: <{newer_mem.memory}>"
)
Expand All @@ -135,13 +153,21 @@ def _resolve_in_graph(
conflict_a: TextualMemoryItem,
conflict_b: TextualMemoryItem,
merged: TextualMemoryItem,
user_name: str | None = None,
):
edges_a = self.graph_store.get_edges(conflict_a.id, type="ANY", direction="ANY")
edges_b = self.graph_store.get_edges(conflict_b.id, type="ANY", direction="ANY")
edges_a = self.graph_store.get_edges(
conflict_a.id, type="ANY", direction="ANY", user_name=user_name
)
edges_b = self.graph_store.get_edges(
conflict_b.id, type="ANY", direction="ANY", user_name=user_name
)
all_edges = edges_a + edges_b

self.graph_store.add_node(
merged.id, merged.memory, merged.metadata.model_dump(exclude_none=True)
merged.id,
merged.memory,
merged.metadata.model_dump(exclude_none=True),
user_name=user_name,
)

for edge in all_edges:
Expand All @@ -150,13 +176,15 @@ def _resolve_in_graph(
if new_from == new_to:
continue
# Check if the edge already exists before adding
if not self.graph_store.edge_exists(new_from, new_to, edge["type"], direction="ANY"):
self.graph_store.add_edge(new_from, new_to, edge["type"])

self.graph_store.update_node(conflict_a.id, {"status": "archived"})
self.graph_store.update_node(conflict_b.id, {"status": "archived"})
self.graph_store.add_edge(conflict_a.id, merged.id, type="MERGED_TO")
self.graph_store.add_edge(conflict_b.id, merged.id, type="MERGED_TO")
if not self.graph_store.edge_exists(
new_from, new_to, edge["type"], direction="ANY", user_name=user_name
):
self.graph_store.add_edge(new_from, new_to, edge["type"], user_name=user_name)

self.graph_store.update_node(conflict_a.id, {"status": "archived"}, user_name=user_name)
self.graph_store.update_node(conflict_b.id, {"status": "archived"}, user_name=user_name)
self.graph_store.add_edge(conflict_a.id, merged.id, type="MERGED_TO", user_name=user_name)
self.graph_store.add_edge(conflict_b.id, merged.id, type="MERGED_TO", user_name=user_name)
logger.debug(
f"Archive {conflict_a.id} and {conflict_b.id}, and inherit their edges to {merged.id}."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def mark_memory_status(
self,
memory_items: list[TextualMemoryItem],
status: Literal["activated", "resolving", "archived", "deleted"],
user_name: str | None = None,
) -> None:
"""
Support status marking operations during history management. Common usages are:
Expand All @@ -157,6 +158,7 @@ def mark_memory_status(
self.graph_db.update_node,
id=mem.id,
fields={"status": status},
user_name=user_name,
)
)

Expand Down
41 changes: 29 additions & 12 deletions src/memos/memories/textual/tree_text_memory/organize/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ def _submit_batches(nodes: list[dict], node_kind: str) -> None:
_submit_batches(graph_nodes, "graph memory")

if graph_node_ids and self.is_reorganize:
self.reorganizer.add_message(QueueMessage(op="add", after_node=graph_node_ids))
self.reorganizer.add_message(
QueueMessage(op="add", after_node=graph_node_ids, user_name=user_name)
)

return added_ids

Expand Down Expand Up @@ -411,16 +413,19 @@ def _add_to_graph_memory(
QueueMessage(
op="add",
after_node=[node_id],
user_name=user_name,
)
)
return node_id

def _inherit_edges(self, from_id: str, to_id: str) -> None:
def _inherit_edges(self, from_id: str, to_id: str, user_name: str | None = None) -> None:
"""
Migrate all non-lineage edges from `from_id` to `to_id`,
and remove them from `from_id` after copying.
"""
edges = self.graph_store.get_edges(from_id, type="ANY", direction="ANY")
edges = self.graph_store.get_edges(
from_id, type="ANY", direction="ANY", user_name=user_name
)

for edge in edges:
if edge["type"] == "MERGED_TO":
Expand All @@ -433,20 +438,29 @@ def _inherit_edges(self, from_id: str, to_id: str) -> None:
continue

# Add edge to merged node if it doesn't already exist
if not self.graph_store.edge_exists(new_from, new_to, edge["type"], direction="ANY"):
self.graph_store.add_edge(new_from, new_to, edge["type"])
if not self.graph_store.edge_exists(
new_from, new_to, edge["type"], direction="ANY", user_name=user_name
):
self.graph_store.add_edge(new_from, new_to, edge["type"], user_name=user_name)

# Remove original edge if it involved the archived node
self.graph_store.delete_edge(edge["from"], edge["to"], edge["type"])
self.graph_store.delete_edge(
edge["from"], edge["to"], edge["type"], user_name=user_name
)

def _ensure_structure_path(
self, memory_type: str, metadata: TreeNodeTextualMemoryMetadata
self,
memory_type: str,
metadata: TreeNodeTextualMemoryMetadata,
user_name: str | None = None,
) -> str:
"""
Ensure structural path exists (ROOT → ... → final node), return last node ID.

Args:
path: like ["hobby", "photography"]
memory_type: Memory type for the structure node.
metadata: Metadata containing key and other fields.
user_name: Optional user name for multi-tenant isolation.

Returns:
Final node ID of the structure path.
Expand All @@ -456,7 +470,8 @@ def _ensure_structure_path(
[
{"field": "memory", "op": "=", "value": metadata.key},
{"field": "memory_type", "op": "=", "value": memory_type},
]
],
user_name=user_name,
)
if existing:
node_id = existing[0] # Use the first match
Expand All @@ -479,14 +494,16 @@ def _ensure_structure_path(
),
)
self.graph_store.add_node(
id=new_node.id,
memory=new_node.memory,
metadata=new_node.metadata.model_dump(exclude_none=True),
new_node.id,
new_node.memory,
new_node.metadata.model_dump(exclude_none=True),
user_name=user_name,
)
self.reorganizer.add_message(
QueueMessage(
op="add",
after_node=[new_node.id],
user_name=user_name,
)
)

Expand Down
Loading