diff --git a/packages/language_server/src/robotcode/language_server/robotframework/parts/diagnostics.py b/packages/language_server/src/robotcode/language_server/robotframework/parts/diagnostics.py index ffdf2b0db..5c1aed639 100644 --- a/packages/language_server/src/robotcode/language_server/robotframework/parts/diagnostics.py +++ b/packages/language_server/src/robotcode/language_server/robotframework/parts/diagnostics.py @@ -18,8 +18,9 @@ EnvironmentVariableDefinition, GlobalVariableDefinition, LibraryArgumentDefinition, + VariableDefinition, ) -from robotcode.robot.diagnostics.library_doc import LibraryDoc +from robotcode.robot.diagnostics.library_doc import KeywordDoc, LibraryDoc from robotcode.robot.diagnostics.namespace import Namespace from ...common.parts.diagnostics import DiagnosticsCollectType, DiagnosticsResult @@ -54,20 +55,18 @@ def _on_initialized(self, sender: Any) -> None: self.parent.documents_cache.variables_changed.add(self._on_variables_changed) def _on_libraries_changed(self, sender: Any, libraries: List[LibraryDoc]) -> None: - for doc in self.parent.documents.documents: - namespace = self.parent.documents_cache.get_only_initialized_namespace(doc) - if namespace is not None: - lib_docs = (e.library_doc for e in namespace.get_libraries().values()) - if any(lib_doc in lib_docs for lib_doc in libraries): - self.parent.diagnostics.force_refresh_document(doc) + docs_to_refresh: set[TextDocument] = set() + for lib_doc in libraries: + docs_to_refresh.update(self.parent.documents_cache.get_library_users(lib_doc)) + for doc in docs_to_refresh: + self.parent.diagnostics.force_refresh_document(doc) def _on_variables_changed(self, sender: Any, variables: List[LibraryDoc]) -> None: - for doc in self.parent.documents.documents: - namespace = self.parent.documents_cache.get_only_initialized_namespace(doc) - if namespace is not None: - lib_docs = (e.library_doc for e in namespace.get_variables_imports().values()) - if any(lib_doc in lib_docs for lib_doc in variables): - self.parent.diagnostics.force_refresh_document(doc) + docs_to_refresh: set[TextDocument] = set() + for var_doc in variables: + docs_to_refresh.update(self.parent.documents_cache.get_variables_users(var_doc)) + for doc in docs_to_refresh: + self.parent.diagnostics.force_refresh_document(doc) @language_id("robotframework") def analyze(self, sender: Any, document: TextDocument) -> None: @@ -83,37 +82,25 @@ def _on_get_related_documents(self, sender: Any, document: TextDocument) -> Opti namespace = self.parent.documents_cache.get_only_initialized_namespace(document) if namespace is None: return None - - result = [] - - lib_doc = namespace.get_library_doc() - for doc in self.parent.documents.documents: - if doc.language_id != "robotframework": - continue - - doc_namespace = self.parent.documents_cache.get_only_initialized_namespace(doc) - if doc_namespace is None: - continue - - if doc_namespace.is_analyzed(): - for ref in doc_namespace.get_namespace_references(): - if ref.library_doc == lib_doc: - result.append(doc) - - return result + source = str(document.uri.to_path()) + return self.parent.documents_cache.get_importers(source) def modify_diagnostics(self, document: TextDocument, diagnostics: List[Diagnostic]) -> List[Diagnostic]: return self.parent.documents_cache.get_diagnostic_modifier(document).modify_diagnostics(diagnostics) @language_id("robotframework") def collect_namespace_diagnostics( - self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType + self, + sender: Any, + document: TextDocument, + diagnostics_type: DiagnosticsCollectType, ) -> DiagnosticsResult: try: namespace = self.parent.documents_cache.get_namespace(document) return DiagnosticsResult( - self.collect_namespace_diagnostics, self.modify_diagnostics(document, namespace.get_diagnostics()) + self.collect_namespace_diagnostics, + self.modify_diagnostics(document, namespace.get_diagnostics()), ) except (CancelledError, SystemExit, KeyboardInterrupt): raise @@ -138,10 +125,47 @@ def collect_namespace_diagnostics( ], ) + def _is_keyword_used_anywhere( + self, + document: TextDocument, + kw: KeywordDoc, + namespace: Namespace, + ) -> bool: + """Check if keyword is used anywhere, using index with safe fallback.""" + if self.parent.documents_cache.get_keyword_ref_users(kw): + return True + + if namespace.get_keyword_references().get(kw): + return True + + # Safe fallback: workspace scan if index might be incomplete + refs = self.parent.robot_references.find_keyword_references(document, kw, False, True) + return bool(refs) + + def _is_variable_used_anywhere( + self, + document: TextDocument, + var: VariableDefinition, + namespace: Namespace, + ) -> bool: + """Check if variable is used anywhere, using index with safe fallback.""" + if self.parent.documents_cache.get_variable_ref_users(var): + return True + + if namespace.get_variable_references().get(var): + return True + + # Safe fallback: workspace scan if index might be incomplete + refs = self.parent.robot_references.find_variable_references(document, var, False, True) + return bool(refs) + @language_id("robotframework") @_logger.call def collect_unused_keyword_references( - self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType + self, + sender: Any, + document: TextDocument, + diagnostics_type: DiagnosticsCollectType, ) -> DiagnosticsResult: config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri) @@ -161,8 +185,7 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost for kw in (namespace.get_library_doc()).keywords.values(): check_current_task_canceled() - references = self.parent.robot_references.find_keyword_references(document, kw, False, True) - if not references: + if not self._is_keyword_used_anywhere(document, kw, namespace): result.append( Diagnostic( range=kw.name_range, @@ -174,7 +197,10 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost ) ) - return DiagnosticsResult(self.collect_unused_keyword_references, self.modify_diagnostics(document, result)) + return DiagnosticsResult( + self.collect_unused_keyword_references, + self.modify_diagnostics(document, result), + ) except (CancelledError, SystemExit, KeyboardInterrupt): raise except BaseException as e: @@ -200,7 +226,10 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost @language_id("robotframework") @_logger.call def collect_unused_variable_references( - self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType + self, + sender: Any, + document: TextDocument, + diagnostics_type: DiagnosticsCollectType, ) -> DiagnosticsResult: config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri) @@ -222,15 +251,19 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos check_current_task_canceled() if isinstance( - var, (LibraryArgumentDefinition, EnvironmentVariableDefinition, GlobalVariableDefinition) + var, + ( + LibraryArgumentDefinition, + EnvironmentVariableDefinition, + GlobalVariableDefinition, + ), ): continue if var.name_token is not None and var.name_token.value and var.name_token.value.startswith("_"): continue - references = self.parent.robot_references.find_variable_references(document, var, False, True) - if not references: + if not self._is_variable_used_anywhere(document, var, namespace): result.append( Diagnostic( range=var.name_range, @@ -243,7 +276,10 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos ) ) - return DiagnosticsResult(self.collect_unused_variable_references, self.modify_diagnostics(document, result)) + return DiagnosticsResult( + self.collect_unused_variable_references, + self.modify_diagnostics(document, result), + ) except (CancelledError, SystemExit, KeyboardInterrupt): raise except BaseException as e: diff --git a/packages/language_server/src/robotcode/language_server/robotframework/parts/references.py b/packages/language_server/src/robotcode/language_server/robotframework/parts/references.py index 3486781dc..b2352f94b 100644 --- a/packages/language_server/src/robotcode/language_server/robotframework/parts/references.py +++ b/packages/language_server/src/robotcode/language_server/robotframework/parts/references.py @@ -224,7 +224,7 @@ def _find_variable_references( include_declaration: bool = True, stop_at_first: bool = False, ) -> List[Location]: - result = [] + result: List[Location] = [] if include_declaration and variable.source: result.append(Location(str(Uri.from_path(variable.source)), variable.name_range)) @@ -232,15 +232,22 @@ def _find_variable_references( if variable.type == VariableDefinitionType.LOCAL_VARIABLE: result.extend(self.find_variable_references_in_file(document, variable, False)) else: - result.extend( - self._find_references_in_workspace( - document, - stop_at_first, - self.find_variable_references_in_file, - variable, - False, + # Use reverse index for lookup instead of workspace scan + docs_to_search = self.parent.documents_cache.get_variable_ref_users(variable) + if docs_to_search: + for doc in docs_to_search: + check_current_task_canceled() + result.extend(self.find_variable_references_in_file(doc, variable, False)) + if result and stop_at_first: + break + else: + # Fallback to workspace scan if index is empty + result.extend( + self._find_references_in_workspace( + document, stop_at_first, self.find_variable_references_in_file, variable, False + ) ) - ) + return result @_logger.call @@ -317,20 +324,26 @@ def _find_keyword_references( include_declaration: bool = True, stop_at_first: bool = False, ) -> List[Location]: - result = [] + result: List[Location] = [] if include_declaration and kw_doc.source: result.append(Location(str(Uri.from_path(kw_doc.source)), kw_doc.range)) - result.extend( - self._find_references_in_workspace( - document, - stop_at_first, - self.find_keyword_references_in_file, - kw_doc, - False, + # Use reverse index for lookup instead of workspace scan + docs_to_search = self.parent.documents_cache.get_keyword_ref_users(kw_doc) + if docs_to_search: + for doc in docs_to_search: + check_current_task_canceled() + result.extend(self.find_keyword_references_in_file(doc, kw_doc, False)) + if result and stop_at_first: + break + else: + # Fallback to workspace scan if index is empty + result.extend( + self._find_references_in_workspace( + document, stop_at_first, self.find_keyword_references_in_file, kw_doc, False + ) ) - ) return result diff --git a/packages/robot/src/robotcode/robot/diagnostics/data_cache.py b/packages/robot/src/robotcode/robot/diagnostics/data_cache.py index bd7bc63d8..7ad713b4f 100644 --- a/packages/robot/src/robotcode/robot/diagnostics/data_cache.py +++ b/packages/robot/src/robotcode/robot/diagnostics/data_cache.py @@ -1,4 +1,6 @@ +import os import pickle +import tempfile from abc import ABC, abstractmethod from enum import Enum from pathlib import Path @@ -12,6 +14,8 @@ class CacheSection(Enum): LIBRARY = "libdoc" VARIABLES = "variables" + RESOURCE = "resource" + NAMESPACE = "namespace" class DataCache(ABC): @@ -85,5 +89,23 @@ def save_cache_data(self, section: CacheSection, entry_name: str, data: Any) -> cached_file = self.build_cache_data_filename(section, entry_name) cached_file.parent.mkdir(parents=True, exist_ok=True) - with cached_file.open("wb") as f: - pickle.dump(data, f) + + # Atomic write: write to temp file, then rename + # This ensures readers never see partial/corrupt data + temp_fd, temp_path = tempfile.mkstemp( + dir=cached_file.parent, + prefix=cached_file.stem + "_", + suffix=".tmp", + ) + try: + with os.fdopen(temp_fd, "wb") as f: + pickle.dump(data, f) + # Atomic rename (POSIX guarantees atomicity; Windows may fail if target exists) + Path(temp_path).replace(cached_file) + except Exception: + # Clean up temp file on failure (temp file may be left behind on SystemExit/KeyboardInterrupt) + try: + os.unlink(temp_path) + except OSError: + pass + raise diff --git a/packages/robot/src/robotcode/robot/diagnostics/document_cache_helper.py b/packages/robot/src/robotcode/robot/diagnostics/document_cache_helper.py index ed32591f2..88589f93f 100644 --- a/packages/robot/src/robotcode/robot/diagnostics/document_cache_helper.py +++ b/packages/robot/src/robotcode/robot/diagnostics/document_cache_helper.py @@ -1,7 +1,9 @@ from __future__ import annotations import ast +import hashlib import io +import sys import threading import weakref from logging import CRITICAL @@ -11,10 +13,6 @@ Callable, Iterable, Iterator, - List, - Optional, - Tuple, - Union, cast, ) @@ -31,9 +29,11 @@ from ..config.model import RobotBaseProfile from ..utils import get_robot_version from ..utils.stubs import Languages +from .data_cache import CacheSection +from .entities import VariableDefinition from .imports_manager import ImportsManager -from .library_doc import LibraryDoc -from .namespace import DocumentType, Namespace +from .library_doc import KeywordDoc, LibraryDoc +from .namespace import DocumentType, Namespace, NamespaceCacheData, NamespaceMetaData from .workspace_config import ( AnalysisDiagnosticModifiersConfig, AnalysisRobotConfig, @@ -42,6 +42,9 @@ WorkspaceAnalysisConfig, ) +# Interval for cleaning up stale entries in dependency maps +_DEPENDENCY_CLEANUP_INTERVAL = 100 + class UnknownFileTypeError(Exception): pass @@ -59,8 +62,8 @@ def __init__( workspace: Workspace, documents_manager: DocumentsManager, file_watcher_manager: FileWatcherManagerBase, - robot_profile: Optional[RobotBaseProfile], - analysis_config: Optional[WorkspaceAnalysisConfig], + robot_profile: RobotBaseProfile | None, + analysis_config: WorkspaceAnalysisConfig | None, ) -> None: self.INITIALIZED_NAMESPACE = _CacheEntry() @@ -71,14 +74,48 @@ def __init__( self.robot_profile = robot_profile or RobotBaseProfile() self.analysis_config = analysis_config or WorkspaceAnalysisConfig() + # Lock ordering (to prevent deadlocks when acquiring multiple locks): + # 1. _imports_managers_lock + # 2. _importers_lock + # 3. _library_users_lock + # 4. _variables_users_lock + # Always acquire in this order if multiple locks are needed in the same operation. + self._imports_managers_lock = threading.RLock() self._imports_managers: weakref.WeakKeyDictionary[WorkspaceFolder, ImportsManager] = weakref.WeakKeyDictionary() - self._default_imports_manager: Optional[ImportsManager] = None - self._workspace_languages: weakref.WeakKeyDictionary[WorkspaceFolder, Optional[Languages]] = ( + self._default_imports_manager: ImportsManager | None = None + self._workspace_languages: weakref.WeakKeyDictionary[WorkspaceFolder, Languages | None] = ( weakref.WeakKeyDictionary() ) - def get_languages_for_document(self, document_or_uri: Union[TextDocument, Uri, str]) -> Optional[Languages]: + # Reverse dependency map: source path -> set of documents that import it + self._importers_lock = threading.RLock() + self._importers: dict[str, weakref.WeakSet[TextDocument]] = {} + + # Reverse dependency maps for libraries and variables (by source path for stable lookup) + # Using source path instead of id() because Python can reuse object IDs after GC + self._library_users_lock = threading.RLock() + self._library_users: dict[str, weakref.WeakSet[TextDocument]] = {} + + self._variables_users_lock = threading.RLock() + self._variables_users: dict[str, weakref.WeakSet[TextDocument]] = {} + + # Reference tracking for O(1) lookup of keyword/variable usages + # Uses (source, name) tuples as keys for stability across cache invalidation + self._ref_tracking_lock = threading.RLock() + self._keyword_ref_users: dict[tuple[str, str], weakref.WeakSet[TextDocument]] = {} + self._variable_ref_users: dict[tuple[str, str], weakref.WeakSet[TextDocument]] = {} + self._doc_keyword_refs: weakref.WeakKeyDictionary[TextDocument, set[tuple[str, str]]] = ( + weakref.WeakKeyDictionary() + ) + self._doc_variable_refs: weakref.WeakKeyDictionary[TextDocument, set[tuple[str, str]]] = ( + weakref.WeakKeyDictionary() + ) + + # Counter for periodic cleanup of stale dependency map entries + self._track_count = 0 + + def get_languages_for_document(self, document_or_uri: TextDocument | Uri | str) -> Languages | None: if get_robot_version() < (6, 0): return None @@ -86,7 +123,7 @@ def get_languages_for_document(self, document_or_uri: Union[TextDocument, Uri, s Languages as RobotLanguages, ) - uri: Union[Uri, str] + uri: Uri | str if isinstance(document_or_uri, TextDocument): uri = document_or_uri.uri @@ -129,7 +166,7 @@ def get_languages_for_document(self, document_or_uri: Union[TextDocument, Uri, s def build_languages_from_model( self, document: TextDocument, model: ast.AST - ) -> Tuple[Optional[Languages], Optional[Languages]]: + ) -> tuple[Languages | None, Languages | None]: if get_robot_version() < (6, 0): return (None, None) @@ -171,12 +208,12 @@ def __get_document_type(self, document: TextDocument) -> DocumentType: return DocumentType.UNKNOWN - def get_tokens(self, document: TextDocument, data_only: bool = False) -> List[Token]: + def get_tokens(self, document: TextDocument, data_only: bool = False) -> list[Token]: if data_only: return self.__get_tokens_data_only(document) return self.__get_tokens(document) - def __get_tokens_data_only(self, document: TextDocument) -> List[Token]: + def __get_tokens_data_only(self, document: TextDocument) -> list[Token]: document_type = self.get_document_type(document) if document_type == DocumentType.INIT: return self.get_init_tokens(document, True) @@ -187,7 +224,7 @@ def __get_tokens_data_only(self, document: TextDocument) -> List[Token]: raise UnknownFileTypeError(str(document.uri)) - def __get_tokens(self, document: TextDocument) -> List[Token]: + def __get_tokens(self, document: TextDocument) -> list[Token]: document_type = self.get_document_type(document) if document_type == DocumentType.INIT: return self.get_init_tokens(document) @@ -198,7 +235,7 @@ def __get_tokens(self, document: TextDocument) -> List[Token]: raise UnknownFileTypeError(str(document.uri)) - def get_general_tokens(self, document: TextDocument, data_only: bool = False) -> List[Token]: + def get_general_tokens(self, document: TextDocument, data_only: bool = False) -> list[Token]: if document.version is None: if data_only: return self.__get_general_tokens_data_only(document) @@ -266,28 +303,28 @@ def __internal_get_init_tokens( return robot.api.get_init_tokens(source, data_only=data_only, tokenize_variables=tokenize_variables) - def __get_general_tokens_data_only(self, document: TextDocument) -> List[Token]: + def __get_general_tokens_data_only(self, document: TextDocument) -> list[Token]: lang = self.get_languages_for_document(document) - def get(text: str) -> List[Token]: + def get(text: str) -> list[Token]: with io.StringIO(text) as content: return [e for e in self.__internal_get_tokens(content, True, lang=lang)] return self.__get_tokens_internal(document, get) - def __get_general_tokens(self, document: TextDocument) -> List[Token]: + def __get_general_tokens(self, document: TextDocument) -> list[Token]: lang = self.get_languages_for_document(document) - def get(text: str) -> List[Token]: + def get(text: str) -> list[Token]: with io.StringIO(text) as content: return [e for e in self.__internal_get_tokens(content, lang=lang)] return self.__get_tokens_internal(document, get) - def __get_tokens_internal(self, document: TextDocument, get: Callable[[str], List[Token]]) -> List[Token]: + def __get_tokens_internal(self, document: TextDocument, get: Callable[[str], list[Token]]) -> list[Token]: return get(document.text()) - def get_resource_tokens(self, document: TextDocument, data_only: bool = False) -> List[Token]: + def get_resource_tokens(self, document: TextDocument, data_only: bool = False) -> list[Token]: if document.version is None: if data_only: return self.__get_resource_tokens_data_only(document) @@ -299,25 +336,25 @@ def get_resource_tokens(self, document: TextDocument, data_only: bool = False) - return document.get_cache(self.__get_resource_tokens) - def __get_resource_tokens_data_only(self, document: TextDocument) -> List[Token]: + def __get_resource_tokens_data_only(self, document: TextDocument) -> list[Token]: lang = self.get_languages_for_document(document) - def get(text: str) -> List[Token]: + def get(text: str) -> list[Token]: with io.StringIO(text) as content: return [e for e in self.__internal_get_resource_tokens(content, True, lang=lang)] return self.__get_tokens_internal(document, get) - def __get_resource_tokens(self, document: TextDocument) -> List[Token]: + def __get_resource_tokens(self, document: TextDocument) -> list[Token]: lang = self.get_languages_for_document(document) - def get(text: str) -> List[Token]: + def get(text: str) -> list[Token]: with io.StringIO(text) as content: return [e for e in self.__internal_get_resource_tokens(content, lang=lang)] return self.__get_tokens_internal(document, get) - def get_init_tokens(self, document: TextDocument, data_only: bool = False) -> List[Token]: + def get_init_tokens(self, document: TextDocument, data_only: bool = False) -> list[Token]: if document.version is None: if data_only: return self.__get_init_tokens_data_only(document) @@ -328,19 +365,19 @@ def get_init_tokens(self, document: TextDocument, data_only: bool = False) -> Li return document.get_cache(self.__get_init_tokens_data_only) return document.get_cache(self.__get_init_tokens) - def __get_init_tokens_data_only(self, document: TextDocument) -> List[Token]: + def __get_init_tokens_data_only(self, document: TextDocument) -> list[Token]: lang = self.get_languages_for_document(document) - def get(text: str) -> List[Token]: + def get(text: str) -> list[Token]: with io.StringIO(text) as content: return [e for e in self.__internal_get_init_tokens(content, True, lang=lang)] return self.__get_tokens_internal(document, get) - def __get_init_tokens(self, document: TextDocument) -> List[Token]: + def __get_init_tokens(self, document: TextDocument) -> list[Token]: lang = self.get_languages_for_document(document) - def get(text: str) -> List[Token]: + def get(text: str) -> list[Token]: with io.StringIO(text) as content: return [e for e in self.__internal_get_init_tokens(content, lang=lang)] @@ -483,20 +520,351 @@ def __invalidate_namespace(self, sender: Namespace) -> None: def __namespace_initialized(self, sender: Namespace) -> None: if sender.document is not None: sender.document.set_data(self.INITIALIZED_NAMESPACE, sender) + + # Track reverse dependencies: record that this document imports its resources + self._track_imports(sender.document, sender) + + # Save to disk cache for faster restart (initial save without analysis data) + imports_manager = self.get_imports_manager(sender.document) + self._save_namespace_to_cache(sender, imports_manager) + self.namespace_initialized(self, sender) + def __namespace_analysed(self, sender: Namespace) -> None: + """Re-save namespace to cache after analysis to include diagnostics and analysis results.""" + if sender.document is not None: + self._track_references(sender.document, sender) + + imports_manager = self.get_imports_manager(sender.document) + self._save_namespace_to_cache(sender, imports_manager) + + def _track_imports(self, document: TextDocument, namespace: Namespace) -> None: + """Update the reverse dependency map for a namespace's imports.""" + with self._importers_lock: + # Track resource imports + for source in namespace.get_resources().keys(): + if source not in self._importers: + self._importers[source] = weakref.WeakSet() + self._importers[source].add(document) + + # Track library users (by source path for stable lookup) + with self._library_users_lock: + for entry in namespace.get_libraries().values(): + lib_key = entry.library_doc.source or entry.library_doc.name + if lib_key and lib_key not in self._library_users: + self._library_users[lib_key] = weakref.WeakSet() + if lib_key: + self._library_users[lib_key].add(document) + + # Track variables users (by source path for stable lookup) + with self._variables_users_lock: + for entry in namespace.get_variables_imports().values(): + var_key = entry.library_doc.source or entry.library_doc.name + if var_key and var_key not in self._variables_users: + self._variables_users[var_key] = weakref.WeakSet() + if var_key: + self._variables_users[var_key].add(document) + + # Periodically cleanup stale entries + self._track_count += 1 + if self._track_count >= _DEPENDENCY_CLEANUP_INTERVAL: + self._track_count = 0 + self._cleanup_stale_dependency_maps() + + def get_importers(self, source: str) -> list[TextDocument]: + """Get all documents that import a given source file (O(1) lookup).""" + with self._importers_lock: + if source in self._importers: + return list(self._importers[source]) + return [] + + def clear_importers(self, source: str) -> None: + """Clear the importers set for a source (called when source is modified).""" + with self._importers_lock: + if source in self._importers: + del self._importers[source] + + def get_library_users(self, library_doc: LibraryDoc) -> list[TextDocument]: + """Get all documents that use a given library (O(1) lookup by source path).""" + with self._library_users_lock: + lib_key = library_doc.source or library_doc.name + if lib_key and lib_key in self._library_users: + return list(self._library_users[lib_key]) + return [] + + def get_variables_users(self, variables_doc: LibraryDoc) -> list[TextDocument]: + """Get all documents that use a given variables file (O(1) lookup by source path).""" + with self._variables_users_lock: + var_key = variables_doc.source or variables_doc.name + if var_key and var_key in self._variables_users: + return list(self._variables_users[var_key]) + return [] + + def get_keyword_ref_users(self, kw_doc: KeywordDoc) -> list[TextDocument]: + """Get documents that reference a keyword.""" + with self._ref_tracking_lock: + key = (kw_doc.source or "", kw_doc.name) + if key in self._keyword_ref_users: + return list(self._keyword_ref_users[key]) + return [] + + def get_variable_ref_users(self, var_def: VariableDefinition) -> list[TextDocument]: + """Get documents that reference a variable.""" + with self._ref_tracking_lock: + key = (var_def.source or "", var_def.name) + if key in self._variable_ref_users: + return list(self._variable_ref_users[key]) + return [] + + def _cleanup_stale_dependency_maps(self) -> None: + """Remove entries with empty WeakSets from dependency maps. + + Called periodically to prevent memory accumulation from stale entries + where the object IDs have been reused after garbage collection. + """ + with self._importers_lock: + stale_importer_keys = [k for k, v in self._importers.items() if len(v) == 0] + for key in stale_importer_keys: + del self._importers[key] + + with self._library_users_lock: + stale_lib_keys = [k for k, v in self._library_users.items() if len(v) == 0] + for lib_key in stale_lib_keys: + del self._library_users[lib_key] + + with self._variables_users_lock: + stale_var_keys = [k for k, v in self._variables_users.items() if len(v) == 0] + for var_key in stale_var_keys: + del self._variables_users[var_key] + + with self._ref_tracking_lock: + stale_kw_ref_keys = [k for k, v in self._keyword_ref_users.items() if len(v) == 0] + for kw_ref_key in stale_kw_ref_keys: + del self._keyword_ref_users[kw_ref_key] + + stale_var_ref_keys = [k for k, v in self._variable_ref_users.items() if len(v) == 0] + for var_ref_key in stale_var_ref_keys: + del self._variable_ref_users[var_ref_key] + + def _track_references(self, document: TextDocument, namespace: Namespace) -> None: + """Track keyword/variable references. + + Uses diff-based updates: compares current references against previous + to handle documents that stop referencing items after edits. + """ + with self._ref_tracking_lock: + self._update_keyword_refs(document, namespace) + self._update_variable_refs(document, namespace) + + def _update_keyword_refs(self, document: TextDocument, namespace: Namespace) -> None: + """Update reverse index for keyword references.""" + keyword_refs = namespace.get_keyword_references() + new_keys = {(kw.source or "", kw.name) for kw in keyword_refs} + old_keys = self._doc_keyword_refs.get(document, set()) + + for key in old_keys - new_keys: + if key in self._keyword_ref_users: + self._keyword_ref_users[key].discard(document) + + for key in new_keys - old_keys: + if key not in self._keyword_ref_users: + self._keyword_ref_users[key] = weakref.WeakSet() + self._keyword_ref_users[key].add(document) + + self._doc_keyword_refs[document] = new_keys + + def _update_variable_refs(self, document: TextDocument, namespace: Namespace) -> None: + """Update reverse index for variable references.""" + variable_refs = namespace.get_variable_references() + new_keys = {(var.source or "", var.name) for var in variable_refs} + old_keys = self._doc_variable_refs.get(document, set()) + + for key in old_keys - new_keys: + if key in self._variable_ref_users: + self._variable_ref_users[key].discard(document) + + for key in new_keys - old_keys: + if key not in self._variable_ref_users: + self._variable_ref_users[key] = weakref.WeakSet() + self._variable_ref_users[key].add(document) + + self._doc_variable_refs[document] = new_keys + def get_initialized_namespace(self, document: TextDocument) -> Namespace: - result: Optional[Namespace] = document.get_data(self.INITIALIZED_NAMESPACE) + result: Namespace | None = document.get_data(self.INITIALIZED_NAMESPACE) if result is None: self._logger.debug(lambda: f"There is no initialized Namespace: {document.uri if document else None}") result = self.get_namespace(document) return result - def get_only_initialized_namespace(self, document: TextDocument) -> Optional[Namespace]: - return cast(Optional[Namespace], document.get_data(self.INITIALIZED_NAMESPACE)) + def get_only_initialized_namespace(self, document: TextDocument) -> Namespace | None: + return cast(Namespace | None, document.get_data(self.INITIALIZED_NAMESPACE)) + + def _try_load_namespace_from_cache( + self, + document: TextDocument, + model: ast.AST, + imports_manager: ImportsManager, + document_type: DocumentType | None, + languages: Languages | None, + workspace_languages: Languages | None, + ) -> Namespace | None: + """Attempt to load namespace from disk cache.""" + source = str(document.uri.to_path()) + source_path = Path(source) + + if not source_path.exists(): + return None + + try: + source_stat = source_path.stat() + current_mtime = source_stat.st_mtime_ns + current_size = source_stat.st_size + except OSError: + return None + + # Build cache filename using SHA256 for collision resistance + normalized = str(source_path.resolve()) + cache_key = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16] + cache_file = cache_key + ".cache" + + # Check if cache file exists + if not imports_manager.data_cache.cache_data_exists(CacheSection.NAMESPACE, cache_file): + return None + + # Load cache data (single file contains (meta, spec) tuple) + try: + saved_meta, cache_data = imports_manager.data_cache.read_cache_data( + CacheSection.NAMESPACE, cache_file, tuple + ) + except Exception: + self._logger.debug(lambda: f"Failed to read namespace cache for {source}", context_name="import") + return None + + # Type check the loaded data + if not isinstance(saved_meta, NamespaceMetaData) or not isinstance(cache_data, NamespaceCacheData): + self._logger.debug(lambda: f"Namespace cache type mismatch for {source}", context_name="import") + return None + + # Validate source file mtime + if saved_meta.mtime != current_mtime: + self._logger.debug(lambda: f"Namespace cache mtime mismatch for {source}", context_name="import") + return None + + # Fast path: if mtime AND size both match, skip expensive content hash computation + if saved_meta.file_size != current_size: + # Size changed - need content hash to validate + try: + _, current_hash = Namespace._compute_content_hash(source_path) + except OSError: + return None + + if saved_meta.content_hash != current_hash: + self._logger.debug(lambda: f"Namespace cache content hash mismatch for {source}", context_name="import") + return None + + # Validate environment identity (detects venv changes, PYTHONPATH changes, etc.) + if saved_meta.python_executable != sys.executable: + self._logger.debug( + lambda: f"Namespace cache Python executable mismatch for {source}", context_name="import" + ) + return None + + current_sys_path_hash = hashlib.sha256("\n".join(sys.path).encode("utf-8")).hexdigest()[:16] + if saved_meta.sys_path_hash != current_sys_path_hash: + self._logger.debug(lambda: f"Namespace cache sys.path hash mismatch for {source}", context_name="import") + return None + + # Validate all library source mtimes + for lib_source, lib_mtime in saved_meta.library_sources_mtimes: + lib_path = Path(lib_source) + try: + if not lib_path.exists() or lib_path.stat().st_mtime_ns != lib_mtime: + self._logger.debug( + lambda: f"Namespace cache library mtime mismatch for {lib_source}", context_name="import" + ) + return None + except OSError: + return None + + # Validate all resource source mtimes + for res_source, res_mtime in saved_meta.resource_sources_mtimes: + res_path = Path(res_source) + try: + if not res_path.exists() or res_path.stat().st_mtime_ns != res_mtime: + self._logger.debug( + lambda: f"Namespace cache resource mtime mismatch for {res_source}", context_name="import" + ) + return None + except OSError: + return None + + # Validate all variables source mtimes + for var_source, var_mtime in saved_meta.variables_sources_mtimes: + var_path = Path(var_source) + try: + if not var_path.exists() or var_path.stat().st_mtime_ns != var_mtime: + self._logger.debug( + lambda: f"Namespace cache variables mtime mismatch for {var_source}", context_name="import" + ) + return None + except OSError: + return None + + # Create namespace from cache data + result = Namespace.from_cache_data( + cache_data=cache_data, + imports_manager=imports_manager, + model=model, + source=source, + document=document, + document_type=document_type, + languages=languages, + workspace_languages=workspace_languages, + ) + + if result is not None: + self._logger.debug( + lambda: f"Loaded namespace from cache for {source} " + f"(fully_analyzed={cache_data.fully_analyzed}, _analyzed={result._analyzed})", + context_name="import", + ) + + return result + + def _save_namespace_to_cache(self, namespace: Namespace, imports_manager: ImportsManager) -> None: + """Save initialized namespace to disk cache. + + Uses single-file format with atomic writes for consistency. + The cache file contains a (meta, spec) tuple. + """ + if not namespace._initialized: + return + + meta = namespace.get_cache_metadata() + if meta is None: + return + + cache_data = namespace.to_cache_data() + + # Build cache filename using SHA256 for collision resistance + source_path = Path(namespace.source) + normalized = str(source_path.resolve()) + cache_key = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16] + cache_file = cache_key + ".cache" + + # Save as single tuple (meta, spec) - atomic and consistent + try: + imports_manager.data_cache.save_cache_data(CacheSection.NAMESPACE, cache_file, (meta, cache_data)) + self._logger.debug( + lambda: f"Saved namespace to cache for {namespace.source} (fully_analyzed={cache_data.fully_analyzed})", + context_name="import", + ) + except OSError: + self._logger.debug(lambda: f"Failed to save namespace cache for {namespace.source}", context_name="import") def __get_namespace_for_document_type( - self, document: TextDocument, document_type: Optional[DocumentType] + self, document: TextDocument, document_type: DocumentType | None ) -> Namespace: if document_type is not None and document_type == DocumentType.INIT: model = self.get_init_model(document) @@ -511,6 +879,24 @@ def __get_namespace_for_document_type( languages, workspace_languages = self.build_languages_from_model(document, model) + # Try loading from disk cache first + cached = self._try_load_namespace_from_cache( + document, model, imports_manager, document_type, languages, workspace_languages + ) + if cached is not None: + cached.has_invalidated.add(self.__invalidate_namespace) + cached.has_initialized.add(self.__namespace_initialized) + cached.has_analysed.add(self.__namespace_analysed) + # Mark as initialized in document data and track imports + document.set_data(self.INITIALIZED_NAMESPACE, cached) + self._track_imports(document, cached) + # If fully analyzed from cache, also track references + # (since has_analysed event won't fire for already-analyzed namespaces) + if cached._analyzed: + self._track_references(document, cached) + return cached + + # Cache miss - create new namespace result = Namespace( imports_manager, model, @@ -522,6 +908,7 @@ def __get_namespace_for_document_type( ) result.has_invalidated.add(self.__invalidate_namespace) result.has_initialized.add(self.__namespace_initialized) + result.has_analysed.add(self.__namespace_analysed) return result @@ -572,21 +959,21 @@ def create_imports_manager(self, root_uri: Uri) -> ImportsManager: return result @event - def libraries_changed(sender, libraries: List[LibraryDoc]) -> None: ... + def libraries_changed(sender, libraries: list[LibraryDoc]) -> None: ... @event - def resources_changed(sender, resources: List[LibraryDoc]) -> None: ... + def resources_changed(sender, resources: list[LibraryDoc]) -> None: ... @event - def variables_changed(sender, variables: List[LibraryDoc]) -> None: ... + def variables_changed(sender, variables: list[LibraryDoc]) -> None: ... - def _on_libraries_changed(self, sender: ImportsManager, libraries: List[LibraryDoc]) -> None: + def _on_libraries_changed(self, sender: ImportsManager, libraries: list[LibraryDoc]) -> None: self.libraries_changed(self, libraries) - def _on_resources_changed(self, sender: ImportsManager, resources: List[LibraryDoc]) -> None: + def _on_resources_changed(self, sender: ImportsManager, resources: list[LibraryDoc]) -> None: self.resources_changed(self, resources) - def _on_variables_changed(self, sender: ImportsManager, variables: List[LibraryDoc]) -> None: + def _on_variables_changed(self, sender: ImportsManager, variables: list[LibraryDoc]) -> None: self.variables_changed(self, variables) def default_imports_manager(self) -> ImportsManager: @@ -606,7 +993,7 @@ def get_imports_manager(self, document: TextDocument) -> ImportsManager: def get_imports_manager_for_uri(self, uri: Uri) -> ImportsManager: return self.get_imports_manager_for_workspace_folder(self.workspace.get_workspace_folder(uri)) - def get_imports_manager_for_workspace_folder(self, folder: Optional[WorkspaceFolder]) -> ImportsManager: + def get_imports_manager_for_workspace_folder(self, folder: WorkspaceFolder | None) -> ImportsManager: if folder is None: if len(self.workspace.workspace_folders) == 1: folder = self.workspace.workspace_folders[0] diff --git a/packages/robot/src/robotcode/robot/diagnostics/imports_manager.py b/packages/robot/src/robotcode/robot/diagnostics/imports_manager.py index d5df54f6f..c7b9a2e3c 100644 --- a/packages/robot/src/robotcode/robot/diagnostics/imports_manager.py +++ b/packages/robot/src/robotcode/robot/diagnostics/imports_manager.py @@ -498,6 +498,24 @@ def filepath_base(self) -> str: raise ValueError("Cannot determine filepath base.") +@dataclass +class ResourceMetaData: + """Metadata for caching resource LibraryDoc to disk.""" + + meta_version: str + source: str + mtime: int + + @property + def filepath_base(self) -> str: + p = Path(self.source) + return f"{zlib.adler32(str(p.parent).encode('utf-8')):08x}_{p.stem}" + + +# Current version for resource cache invalidation +RESOURCE_META_VERSION = "1" + + class ImportsManager: _logger = LoggingDescriptor() @@ -589,6 +607,10 @@ def __init__( self._resource_libdoc_cache: "weakref.WeakKeyDictionary[ast.AST, Dict[str, LibraryDoc]]" = ( weakref.WeakKeyDictionary() ) + self._libdoc_by_source: dict[str, LibraryDoc] = {} + self._libdoc_by_name: dict[str, LibraryDoc] = {} # For standard libraries cached by name + self._resource_libdoc_by_source: dict[str, LibraryDoc] = {} + self._variables_libdoc_by_source: dict[str, LibraryDoc] = {} self._diagnostics: List[Diagnostic] = [] @@ -834,9 +856,7 @@ def __resource_document_changed(self, document: TextDocument) -> None: lib_doc = r_entry.get_libdoc() r_entry.invalidate() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: + except Exception: result = True if result and lib_doc is not None: @@ -871,7 +891,7 @@ def did_change_watched_files(self, sender: Any, changes: List[FileEvent]) -> Non result = r_entry.check_file_changed(changes) if result is not None: resource_changed.append((r_key, result, lib_doc)) - except BaseException as e: + except Exception as e: self._logger.exception(e) raise @@ -911,6 +931,7 @@ def __remove_library_entry( entry: _LibrariesEntry, now: bool = False, ) -> None: + removed_lib_doc: Optional[LibraryDoc] = None try: if len(entry.references) == 0 or now: self._logger.debug(lambda: f"Remove Library Entry {entry_key}") @@ -918,11 +939,17 @@ def __remove_library_entry( if len(entry.references) == 0: e1 = self._libaries.get(entry_key, None) if e1 == entry: + removed_lib_doc = entry.get_libdoc() self._libaries.pop(entry_key, None) entry.invalidate() self._logger.debug(lambda: f"Library Entry {entry_key} removed") finally: self._library_files_cache.clear() + if removed_lib_doc is not None: + if removed_lib_doc.source: + self._libdoc_by_source.pop(removed_lib_doc.source, None) + if removed_lib_doc.name: + self._libdoc_by_name.pop(removed_lib_doc.name, None) def __remove_resource_entry( self, @@ -930,6 +957,7 @@ def __remove_resource_entry( entry: _ResourcesEntry, now: bool = False, ) -> None: + removed_lib_doc: Optional[LibraryDoc] = None try: if len(entry.references) == 0 or now: self._logger.debug(lambda: f"Remove Resource Entry {entry_key}") @@ -937,12 +965,14 @@ def __remove_resource_entry( if len(entry.references) == 0 or now: e1 = self._resources.get(entry_key, None) if e1 == entry: + removed_lib_doc = entry.get_libdoc() self._resources.pop(entry_key, None) - entry.invalidate() self._logger.debug(lambda: f"Resource Entry {entry_key} removed") finally: self._resource_files_cache.clear() + if removed_lib_doc is not None and removed_lib_doc.source: + self._resource_libdoc_by_source.pop(removed_lib_doc.source, None) def __remove_variables_entry( self, @@ -950,6 +980,7 @@ def __remove_variables_entry( entry: _VariablesEntry, now: bool = False, ) -> None: + removed_lib_doc: Optional[VariablesDoc] = None try: if len(entry.references) == 0 or now: self._logger.debug(lambda: f"Remove Variables Entry {entry_key}") @@ -957,11 +988,14 @@ def __remove_variables_entry( if len(entry.references) == 0: e1 = self._variables.get(entry_key, None) if e1 == entry: + removed_lib_doc = entry.get_libdoc() self._variables.pop(entry_key, None) entry.invalidate() self._logger.debug(lambda: f"Variables Entry {entry_key} removed") finally: self._variables_files_cache.clear() + if removed_lib_doc is not None and removed_lib_doc.source: + self._variables_libdoc_by_source.pop(removed_lib_doc.source, None) def get_library_meta( self, @@ -1027,9 +1061,7 @@ def get_library_meta( ) return result, import_name, ignore_arguments - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: + except Exception: pass return None, import_name, ignore_arguments @@ -1096,9 +1128,7 @@ def get_variables_meta( ) return result, import_name - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: + except Exception: pass return None, name @@ -1232,7 +1262,9 @@ def __find_variables_simple( def executor(self) -> ProcessPoolExecutor: with self._executor_lock: if self._executor is None: - self._executor = ProcessPoolExecutor(mp_context=mp.get_context("spawn")) + # Cap at 4 workers to balance parallelism with memory usage + max_workers = min(mp.cpu_count() or 1, 4) + self._executor = ProcessPoolExecutor(max_workers=max_workers, mp_context=mp.get_context("spawn")) return self._executor @@ -1293,38 +1325,30 @@ def _get_library_libdoc( self._logger.exception(e) self._logger.debug(lambda: f"Load library in process {name}{args!r}", context_name="import") - # if self._process_pool_executor is None: - # self._process_pool_executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context("spawn")) - # executor = self._process_pool_executor - executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context("spawn")) try: - try: - result = executor.submit( - get_library_doc, - name, - args if not ignore_arguments else (), - working_dir, - base_dir, - self.get_resolvable_command_line_variables(), - variables, - ).result(self.load_library_timeout) - - except TimeoutError as e: - raise RuntimeError( - f"Loading library {name!r} with args {args!r} (working_dir={working_dir!r}, base_dir={base_dir!r}) " - f"timed out after {self.load_library_timeout} seconds. " - "The library may be slow or blocked during import. " - "If required, increase the timeout by setting the ROBOTCODE_LOAD_LIBRARY_TIMEOUT " - "environment variable." - ) from e - + result = self.executor.submit( + get_library_doc, + name, + args if not ignore_arguments else (), + working_dir, + base_dir, + self.get_resolvable_command_line_variables(), + variables, + ).result(self.load_library_timeout) + + except TimeoutError as e: + raise RuntimeError( + f"Loading library {name!r} with args {args!r} (working_dir={working_dir!r}, base_dir={base_dir!r}) " + f"timed out after {self.load_library_timeout} seconds. " + "The library may be slow or blocked during import. " + "If required, increase the timeout by setting the ROBOTCODE_LOAD_LIBRARY_TIMEOUT " + "environment variable." + ) from e except (SystemExit, KeyboardInterrupt): raise except BaseException as e: self._logger.exception(e) raise - finally: - executor.shutdown(wait=True) try: if meta is not None: @@ -1391,6 +1415,21 @@ def get_libdoc_for_library_import( return entry.get_libdoc() + def _get_resource_meta(self, source: str) -> Optional[ResourceMetaData]: + """Get metadata for a resource file for cache validation.""" + source_path = Path(source) + if not source_path.exists(): + return None + try: + mtime = source_path.stat().st_mtime_ns + except OSError: + return None + return ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source=source, + mtime=mtime, + ) + @_logger.call def get_libdoc_from_model( self, @@ -1399,6 +1438,7 @@ def get_libdoc_from_model( ) -> LibraryDoc: key = source + # Check in-memory cache first entry = None if model in self._resource_libdoc_cache: entry = self._resource_libdoc_cache.get(model, None) @@ -1406,15 +1446,209 @@ def get_libdoc_from_model( if entry and key in entry: return entry[key] + # Check disk cache + meta = self._get_resource_meta(source) + if meta is not None: + meta_file = meta.filepath_base + ".meta" + if self.data_cache.cache_data_exists(CacheSection.RESOURCE, meta_file): + try: + saved_meta = self.data_cache.read_cache_data(CacheSection.RESOURCE, meta_file, ResourceMetaData) + if saved_meta == meta: + spec_path = meta.filepath_base + ".spec" + self._logger.debug(lambda: f"Use cached resource data for {source}", context_name="import") + result = self.data_cache.read_cache_data(CacheSection.RESOURCE, spec_path, LibraryDoc) + # Store in in-memory cache too + if entry is None: + entry = {} + self._resource_libdoc_cache[model] = entry + entry[key] = result + return result + except Exception: + self._logger.debug( + lambda: f"Failed to load cached resource data for {source}", context_name="import" + ) + + # Cache miss - compute the LibraryDoc result = get_model_doc(model=model, source=source) + + # Store in in-memory cache if entry is None: entry = {} self._resource_libdoc_cache[model] = entry - entry[key] = result + # Save to disk cache + if meta is not None: + try: + meta_file = meta.filepath_base + ".meta" + spec_file = meta.filepath_base + ".spec" + self.data_cache.save_cache_data(CacheSection.RESOURCE, spec_file, result) + self.data_cache.save_cache_data(CacheSection.RESOURCE, meta_file, meta) + except OSError: + self._logger.debug(lambda: f"Failed to save resource cache for {source}", context_name="import") + return result + def _cache_libdoc( + self, + lib_doc: LibraryDoc, + name: Optional[str] = None, + source: Optional[str] = None, + ) -> LibraryDoc: + """Cache a LibraryDoc for future lookups and return it.""" + effective_source = source or lib_doc.source + if effective_source: + self._libdoc_by_source[effective_source] = lib_doc + if name: + self._libdoc_by_name[name] = lib_doc + elif lib_doc.name: + self._libdoc_by_name[lib_doc.name] = lib_doc + return lib_doc + + def get_libdoc_for_source(self, source: Optional[str], name: Optional[str] = None) -> Optional[LibraryDoc]: + """Look up a library LibraryDoc by its source path or name. + + Used when restoring namespace from cache - looks up cached library data. + Returns None if not found (cache miss or no source). + + Uses in-memory index first, then falls back to loaded libraries + and disk cache if needed. + + Args: + source: The source path of the library + name: Optional library name, used to look up standard libraries that + are cached by name instead of source path + """ + if source is None and name is None: + return None + + if source is not None and source in self._libdoc_by_source: + return self._libdoc_by_source[source] + + if name is not None and name in self._libdoc_by_name: + return self._libdoc_by_name[name] + + # Check loaded libraries (and populate index for future lookups) + with self._libaries_lock: + for entry in self._libaries.values(): + lib_doc = entry.get_libdoc() + # Populate index while iterating + if lib_doc.source and lib_doc.source not in self._libdoc_by_source: + self._libdoc_by_source[lib_doc.source] = lib_doc + if lib_doc.name and lib_doc.name not in self._libdoc_by_name: + self._libdoc_by_name[lib_doc.name] = lib_doc + + if source is not None and lib_doc.source == source: + return lib_doc + if name is not None and lib_doc.name == name: + return lib_doc + + # Try disk cache using library name (for standard libraries) + # Standard libraries are cached with full module paths like "robot/libraries/BuiltIn.spec" + try: + if name is not None: + # First try the name directly (for libraries with dots like "mypackage.MyLibrary") + spec_file = name.replace(".", "/") + ".spec" + if self.data_cache.cache_data_exists(CacheSection.LIBRARY, spec_file): + lib_doc = self.data_cache.read_cache_data(CacheSection.LIBRARY, spec_file, LibraryDoc) + return self._cache_libdoc(lib_doc, name=name) + + # Try standard Robot Framework library path + spec_file = "robot/libraries/" + name + ".spec" + if self.data_cache.cache_data_exists(CacheSection.LIBRARY, spec_file): + lib_doc = self.data_cache.read_cache_data(CacheSection.LIBRARY, spec_file, LibraryDoc) + return self._cache_libdoc(lib_doc, name=name) + + # Try disk cache using source path to build cache key (for by_path libraries) + if source is not None: + source_path = Path(source) + if source_path.exists(): + cache_key = f"{zlib.adler32(str(source_path.parent).encode('utf-8')):08x}_{source_path.stem}" + + spec_file = cache_key + ".spec" + if self.data_cache.cache_data_exists(CacheSection.LIBRARY, spec_file): + lib_doc = self.data_cache.read_cache_data(CacheSection.LIBRARY, spec_file, LibraryDoc) + return self._cache_libdoc(lib_doc, source=source) + except Exception as e: + self._logger.debug( + lambda e=e: f"get_libdoc_for_source failed for source={source}, name={name}: {e}", # type: ignore[misc] + context_name="import", + ) + + return None + + def get_resource_libdoc_for_source(self, source: Optional[str]) -> Optional[LibraryDoc]: + """Look up a resource LibraryDoc by its source path. + + Used when restoring namespace from cache - looks up cached resource data. + Returns None if not found (cache miss or no source). + + Uses in-memory cache first, then falls back to disk cache. + """ + if source is None: + return None + + if source in self._resource_libdoc_by_source: + return self._resource_libdoc_by_source[source] + + source_path = Path(source) + if not source_path.exists(): + return None + + # Build cache filename using ResourceMetaData.filepath_base logic + cache_key = f"{zlib.adler32(str(source_path.parent).encode('utf-8')):08x}_{source_path.stem}" + spec_file = cache_key + ".spec" + + if self.data_cache.cache_data_exists(CacheSection.RESOURCE, spec_file): + try: + lib_doc = self.data_cache.read_cache_data(CacheSection.RESOURCE, spec_file, LibraryDoc) + # Cache for future lookups + self._resource_libdoc_by_source[source] = lib_doc + return lib_doc + except (OSError, TypeError, EOFError, AttributeError, ImportError) as e: + self._logger.debug( + lambda e=e: f"get_resource_libdoc_for_source failed for {source}: {e}", # type: ignore[misc] + context_name="import", + ) + + return None + + def get_variables_libdoc_for_source(self, source: Optional[str]) -> Optional[LibraryDoc]: + """Look up a variables LibraryDoc by its source path. + + Used when restoring namespace from cache - looks up cached variables data. + Returns None if not found (cache miss or no source). + + Uses in-memory cache first, then falls back to disk cache. + """ + if source is None: + return None + + if source in self._variables_libdoc_by_source: + return self._variables_libdoc_by_source[source] + + source_path = Path(source) + if not source_path.exists(): + return None + + # Build cache filename similar to variables cache logic + cache_key = f"{zlib.adler32(str(source_path.parent).encode('utf-8')):08x}_{source_path.stem}" + spec_file = cache_key + ".spec" + + if self.data_cache.cache_data_exists(CacheSection.VARIABLES, spec_file): + try: + lib_doc = self.data_cache.read_cache_data(CacheSection.VARIABLES, spec_file, LibraryDoc) + # Cache for future lookups + self._variables_libdoc_by_source[source] = lib_doc + return lib_doc + except (OSError, TypeError, EOFError, AttributeError, ImportError) as e: + self._logger.debug( + lambda e=e: f"get_variables_libdoc_for_source failed for {source}: {e}", # type: ignore[misc] + context_name="import", + ) + + return None + def _get_variables_libdoc_handler( self, variables: Optional[Dict[str, Any]] = None, @@ -1474,36 +1708,31 @@ def _get_variables_libdoc( except BaseException as e: self._logger.exception(e) - executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context("spawn")) try: - try: - result = executor.submit( - get_variables_doc, - name, - args, - working_dir, - base_dir, - self.get_resolvable_command_line_variables() if resolve_command_line_vars else None, - variables, - ).result(self.load_library_timeout) - - except TimeoutError as e: - raise RuntimeError( - f"Loading variables {name!r} with args {args!r} (working_dir={working_dir!r}, " - f"base_dir={base_dir!r}) " - f"timed out after {self.load_library_timeout} seconds. " - "The variables may be slow or blocked during import. " - "If required, increase the timeout by setting the ROBOTCODE_LOAD_LIBRARY_TIMEOUT " - "environment variable." - ) from e - + result = self.executor.submit( + get_variables_doc, + name, + args, + working_dir, + base_dir, + self.get_resolvable_command_line_variables() if resolve_command_line_vars else None, + variables, + ).result(self.load_library_timeout) + + except TimeoutError as e: + raise RuntimeError( + f"Loading variables {name!r} with args {args!r} (working_dir={working_dir!r}, " + f"base_dir={base_dir!r}) " + f"timed out after {self.load_library_timeout} seconds. " + "The variables may be slow or blocked during import. " + "If required, increase the timeout by setting the ROBOTCODE_LOAD_LIBRARY_TIMEOUT " + "environment variable." + ) from e except (SystemExit, KeyboardInterrupt): raise except BaseException as e: self._logger.exception(e) raise - finally: - executor.shutdown(True) try: if meta is not None: diff --git a/packages/robot/src/robotcode/robot/diagnostics/library_doc.py b/packages/robot/src/robotcode/robot/diagnostics/library_doc.py index 065962aa1..ccbc7e303 100644 --- a/packages/robot/src/robotcode/robot/diagnostics/library_doc.py +++ b/packages/robot/src/robotcode/robot/diagnostics/library_doc.py @@ -620,9 +620,7 @@ class KeywordDoc(SourceEntity): name_token: Optional[Token] = field(default=None, compare=False) arguments: List[ArgumentInfo] = field(default_factory=list, compare=False) arguments_spec: Optional[ArgumentSpec] = field(default=None, compare=False) - argument_definitions: Optional[List[ArgumentDefinition]] = field( - default=None, compare=False, metadata={"nosave": True} - ) + argument_definitions: Optional[List[ArgumentDefinition]] = field(default=None, compare=False) doc: str = field(default="", compare=False) tags: List[str] = field(default_factory=list) type: str = "keyword" diff --git a/packages/robot/src/robotcode/robot/diagnostics/namespace.py b/packages/robot/src/robotcode/robot/diagnostics/namespace.py index f6be8fe14..8530f2cc5 100644 --- a/packages/robot/src/robotcode/robot/diagnostics/namespace.py +++ b/packages/robot/src/robotcode/robot/diagnostics/namespace.py @@ -1,9 +1,13 @@ import ast import enum +import hashlib import itertools +import sys import weakref +import zlib from collections import OrderedDict, defaultdict from concurrent.futures import CancelledError +from dataclasses import dataclass, replace from pathlib import Path from typing import ( Any, @@ -21,13 +25,16 @@ from robot.errors import VariableError from robot.parsing.lexer.tokens import Token -from robot.parsing.model.blocks import Keyword, SettingSection, TestCase, VariableSection +from robot.parsing.model.blocks import ( + Keyword, + SettingSection, + TestCase, + VariableSection, +) from robot.parsing.model.statements import Arguments, Setup, Statement, Timeout from robot.parsing.model.statements import LibraryImport as RobotLibraryImport from robot.parsing.model.statements import ResourceImport as RobotResourceImport -from robot.parsing.model.statements import ( - VariablesImport as RobotVariablesImport, -) +from robot.parsing.model.statements import VariablesImport as RobotVariablesImport from robotcode.core.concurrent import RLock from robotcode.core.event import event from robotcode.core.lsp.types import ( @@ -46,7 +53,7 @@ from robotcode.core.utils.logging import LoggingDescriptor from robotcode.core.utils.path import same_file -from ..utils import get_robot_version +from ..utils import get_robot_version, get_robot_version_str from ..utils.ast import ( get_first_variable_token, range_from_node, @@ -66,9 +73,11 @@ from .entities import ( ArgumentDefinition, BuiltInVariableDefinition, + EmbeddedArgumentDefinition, EnvironmentVariableDefinition, GlobalVariableDefinition, Import, + ImportedVariableDefinition, LibraryEntry, LibraryImport, LocalVariableDefinition, @@ -78,6 +87,7 @@ TestCaseDefinition, TestVariableDefinition, VariableDefinition, + VariableDefinitionType, VariablesEntry, VariablesImport, ) @@ -89,6 +99,7 @@ DEFAULT_LIBRARIES, KeywordDoc, KeywordMatcher, + KeywordStore, LibraryDoc, resolve_robot_variables, ) @@ -98,6 +109,142 @@ from robot.parsing.model.statements import Var +# Namespace cache version - bump major for incompatible format changes +# 1.0: Single-file cache format with atomic writes (meta + spec in one file) +# Extended to include full analysis caching (keyword_references, variable_references, local_variable_assignments) +NAMESPACE_META_VERSION = "1.0" + +# Variable syntax constants for parsing ${var}, @{var}, &{var}, %{var} +_VAR_PREFIX_LEN = 2 # Length of "${", "@{", "&{", or "%{" +_VAR_SUFFIX_LEN = 1 # Length of "}" +_VAR_WRAPPER_LEN = _VAR_PREFIX_LEN + _VAR_SUFFIX_LEN # Total wrapper length (3) + + +@dataclass(frozen=True) +class NamespaceMetaData: + """Metadata for validating namespace cache (immutable).""" + + meta_version: str + source: str + mtime: int # Source file modification time in nanoseconds + file_size: int # Source file size in bytes + content_hash: str # Tiered hash of file content (size + first 64KB + last 64KB) + library_sources_mtimes: tuple[ + tuple[str, int], ... + ] # ((library_source, mtime), ...) + resource_sources_mtimes: tuple[ + tuple[str, int], ... + ] # ((resource_source, mtime), ...) + variables_sources_mtimes: tuple[ + tuple[str, int], ... + ] # ((variables_source, mtime), ...) + robot_version: str + python_executable: str # sys.executable - detects venv changes + sys_path_hash: str # Hash of sys.path in original order - detects PYTHONPATH/install changes + + @property + def filepath_base(self) -> str: + p = Path(self.source) + return f"{zlib.adler32(str(p.parent).encode('utf-8')):08x}_{p.stem}" + + +@dataclass(frozen=True) +class _CachedEntryBase: + """Common fields for all cached import entries (immutable).""" + + name: str + import_name: str + library_doc_source: str | None # Source path to look up LibraryDoc + args: tuple[Any, ...] + alias: str | None + import_range: Range + import_source: str | None + alias_range: Range + + +@dataclass(frozen=True) +class CachedLibraryEntry(_CachedEntryBase): + """Serializable representation of LibraryEntry for caching (immutable).""" + + +@dataclass(frozen=True) +class CachedResourceEntry(_CachedEntryBase): + """Serializable representation of ResourceEntry for caching (immutable).""" + + imports: tuple[Import, ...] = () + variables: tuple[VariableDefinition, ...] = () + + +@dataclass(frozen=True) +class CachedVariablesEntry(_CachedEntryBase): + """Serializable representation of VariablesEntry for caching (immutable).""" + + variables: tuple["ImportedVariableDefinition", ...] = () + + +@dataclass(frozen=True) +class KeywordRefKey: + """Stable key for identifying keywords across cache sessions (immutable). + + Uses minimal fields needed for unique identification: + - source + line_no uniquely identifies a location in code + - name ensures we match the right keyword at that location + """ + + source: str # File path (empty string for builtins) + name: str # Keyword name + line_no: int # Line number (0 for builtins) + + +@dataclass(frozen=True) +class VariableRefKey: + """Stable key for identifying variables across cache sessions (immutable). + + Uses minimal fields needed for unique identification: + - source + line_no + col_offset uniquely identifies a location + - name ensures we match the right variable at that location + - var_type distinguishes between different variable definition types + """ + + source: str # File path + name: str # Variable name (e.g., "${MY_VAR}") + var_type: str # VariableDefinitionType.value + line_no: int + col_offset: int + + +@dataclass(frozen=True) +class NamespaceCacheData: + """Serializable namespace state for disk caching (immutable).""" + + # Initialization data + libraries: tuple[tuple[str, CachedLibraryEntry], ...] + resources: tuple[tuple[str, CachedResourceEntry], ...] + resources_files: tuple[tuple[str, str], ...] # ((source, key), ...) + variables_imports: tuple[tuple[str, CachedVariablesEntry], ...] + own_variables: tuple[VariableDefinition, ...] + imports: tuple[Import, ...] + library_doc: LibraryDoc | None + + # Analysis data (cached to skip re-analysis on warm start) + analyzed: bool = False # True if analysis was completed when cache was saved + diagnostics: tuple[Diagnostic, ...] = () + test_case_definitions: tuple[TestCaseDefinition, ...] = () + tag_definitions: tuple[TagDefinition, ...] = () + # Namespace references: ((import_index, (locations...)), ...) + # Maps import index (in imports tuple) to set of locations where the import is referenced + namespace_references: tuple[tuple[int, tuple[Location, ...]], ...] = () + + # Full analysis caching - keyword and variable references + # When these are populated and fully_analyzed is True, analysis phase can be skipped entirely + keyword_references: tuple[tuple[KeywordRefKey, tuple[Location, ...]], ...] = () + variable_references: tuple[tuple[VariableRefKey, tuple[Location, ...]], ...] = () + local_variable_assignments: tuple[ + tuple[VariableRefKey, tuple[Range, ...]], ... + ] = () + fully_analyzed: bool = False # True if full analysis data is cached + + class DiagnosticsError(Exception): pass @@ -131,7 +278,9 @@ def visit_Variable(self, node: Statement) -> None: # noqa: N802 return if name_token.value is not None: - matcher = search_variable(name_token.value, ignore_errors=True, parse_type=True) + matcher = search_variable( + name_token.value, ignore_errors=True, parse_type=True + ) if not matcher.is_assign(allow_assign_mark=True) or matcher.name is None: return @@ -145,7 +294,9 @@ def visit_Variable(self, node: Statement) -> None: # noqa: N802 for s in values ) - stripped_name_token = strip_variable_token(name_token, matcher=matcher, parse_type=True) + stripped_name_token = strip_variable_token( + name_token, matcher=matcher, parse_type=True + ) self._results.append( VariableDefinition( @@ -221,11 +372,15 @@ def visit_Arguments(self, node: Statement) -> None: # noqa: N802 ): break - matcher = VariableMatcher(argument.value, parse_type=True, ignore_errors=True) + matcher = VariableMatcher( + argument.value, parse_type=True, ignore_errors=True + ) if not matcher.is_variable() or matcher.name is None: continue - stripped_argument_token = strip_variable_token(argument, parse_type=True, matcher=matcher) + stripped_argument_token = strip_variable_token( + argument, parse_type=True, matcher=matcher + ) if matcher not in args: arg_def = ArgumentDefinition( @@ -273,7 +428,9 @@ def visit_KeywordName(self, node: Statement) -> None: # noqa: N802 name_token = node.get_token(Token.KEYWORD_NAME) if name_token is not None and name_token.value: - keyword = ModelHelper.get_keyword_definition_at_token(self.namespace.get_library_doc(), name_token) + keyword = ModelHelper.get_keyword_definition_at_token( + self.namespace.get_library_doc(), name_token + ) self.current_kw_doc = keyword for variable_token in filter( @@ -281,7 +438,9 @@ def visit_KeywordName(self, node: Statement) -> None: # noqa: N802 tokenize_variables(name_token, identifiers="$", ignore_errors=True), ): if variable_token.value: - match = search_variable(variable_token.value, "$", ignore_errors=True) + match = search_variable( + variable_token.value, "$", ignore_errors=True + ) if match.base is None: continue name = match.base.split(":", 1)[0] @@ -301,7 +460,11 @@ def visit_KeywordName(self, node: Statement) -> None: # noqa: N802 if self.current_kw is not None: args = ArgumentVisitor( - self.namespace, self.nodes, self.position, self.in_args, self.current_kw_doc + self.namespace, + self.nodes, + self.position, + self.in_args, + self.current_kw_doc, ).get(self.current_kw) if args: self._results.update(args) @@ -328,11 +491,15 @@ def visit_ExceptHeader(self, node: Statement) -> None: # noqa: N802 except VariableError: pass - def _get_var_name(self, original: str, position: Position, require_assign: bool = True) -> Optional[str]: + def _get_var_name( + self, original: str, position: Position, require_assign: bool = True + ) -> Optional[str]: if self._resolved_variables is None: self._resolved_variables = resolve_robot_variables( str(self.namespace.imports_manager.root_folder), - str(Path(self.namespace.source).parent) if self.namespace.source else ".", + str(Path(self.namespace.source).parent) + if self.namespace.source + else ".", self.namespace.imports_manager.get_resolvable_command_line_variables(), variables=self.namespace.get_resolvable_variables(), ) @@ -379,16 +546,23 @@ def visit_KeywordCall(self, node: Statement) -> None: # noqa: N802 continue try: matcher = search_variable( - assign_token.value[:-1].rstrip() if assign_token.value.endswith("=") else assign_token.value, + assign_token.value[:-1].rstrip() + if assign_token.value.endswith("=") + else assign_token.value, parse_type=True, ignore_errors=True, ) - if not matcher.is_assign(allow_assign_mark=True) or matcher.name is None: + if ( + not matcher.is_assign(allow_assign_mark=True) + or matcher.name is None + ): continue if matcher not in self._results: - stripped_name_token = strip_variable_token(assign_token, matcher=matcher, parse_type=True) + stripped_name_token = strip_variable_token( + assign_token, matcher=matcher, parse_type=True + ) self._results[matcher] = LocalVariableDefinition( name=matcher.name, @@ -468,16 +642,23 @@ def visit_InlineIfHeader(self, node: Statement) -> None: # noqa: N802 continue try: matcher = search_variable( - assign_token.value[:-1].rstrip() if assign_token.value.endswith("=") else assign_token.value, + assign_token.value[:-1].rstrip() + if assign_token.value.endswith("=") + else assign_token.value, parse_type=True, ignore_errors=True, ) - if not matcher.is_assign(allow_assign_mark=True) or matcher.name is None: + if ( + not matcher.is_assign(allow_assign_mark=True) + or matcher.name is None + ): continue if matcher not in self._results: - stripped_name_token = strip_variable_token(assign_token, matcher=matcher, parse_type=True) + stripped_name_token = strip_variable_token( + assign_token, matcher=matcher, parse_type=True + ) self._results[matcher] = LocalVariableDefinition( name=matcher.name, @@ -503,16 +684,23 @@ def visit_ForHeader(self, node: Statement) -> None: # noqa: N802 continue try: matcher = search_variable( - assign_token.value[:-1].rstrip() if assign_token.value.endswith("=") else assign_token.value, + assign_token.value[:-1].rstrip() + if assign_token.value.endswith("=") + else assign_token.value, parse_type=True, ignore_errors=True, ) - if not matcher.is_assign(allow_assign_mark=True) or matcher.name is None: + if ( + not matcher.is_assign(allow_assign_mark=True) + or matcher.name is None + ): continue if matcher not in self._results: - stripped_name_token = strip_variable_token(assign_token, matcher=matcher, parse_type=True) + stripped_name_token = strip_variable_token( + assign_token, matcher=matcher, parse_type=True + ) self._results[matcher] = LocalVariableDefinition( name=matcher.name, @@ -537,14 +725,21 @@ def visit_Var(self, node: Var) -> None: # noqa: N802 try: matcher = search_variable( - name_token.value[:-1].rstrip() if name_token.value.endswith("=") else name_token.value, + name_token.value[:-1].rstrip() + if name_token.value.endswith("=") + else name_token.value, parse_type=True, ignore_errors=True, ) - if not matcher.is_assign(allow_assign_mark=True) or matcher.name is None: + if ( + not matcher.is_assign(allow_assign_mark=True) + or matcher.name is None + ): return - stripped_name_token = strip_variable_token(name_token, matcher=matcher, parse_type=True) + stripped_name_token = strip_variable_token( + name_token, matcher=matcher, parse_type=True + ) scope = node.scope @@ -568,9 +763,13 @@ def visit_Var(self, node: Var) -> None: # noqa: N802 value_type=matcher.type, ) - if matcher not in self._results or type(self._results[matcher]) is not type(var): + if matcher not in self._results or type( + self._results[matcher] + ) is not type(var): if isinstance(var, LocalVariableDefinition) or not any( - l for l in self.namespace.get_global_variables() if l.matcher == var.matcher + l + for l in self.namespace.get_global_variables() + if l.matcher == var.matcher ): self._results[matcher] = var else: @@ -597,7 +796,9 @@ def visit_LibraryImport(self, node: RobotLibraryImport) -> None: # noqa: N802 separator = node.get_token(Token.WITH_NAME) alias_token = node.get_tokens(Token.NAME)[-1] if separator else None - last_data_token = next(v for v in reversed(node.tokens) if v.type not in Token.NON_DATA_TOKENS) + last_data_token = next( + v for v in reversed(node.tokens) if v.type not in Token.NON_DATA_TOKENS + ) if node.name: self._results.append( LibraryImport( @@ -629,7 +830,9 @@ def visit_LibraryImport(self, node: RobotLibraryImport) -> None: # noqa: N802 def visit_ResourceImport(self, node: RobotResourceImport) -> None: # noqa: N802 name = node.get_token(Token.NAME) - last_data_token = next(v for v in reversed(node.tokens) if v.type not in Token.NON_DATA_TOKENS) + last_data_token = next( + v for v in reversed(node.tokens) if v.type not in Token.NON_DATA_TOKENS + ) if node.name: self._results.append( ResourceImport( @@ -658,7 +861,9 @@ def visit_ResourceImport(self, node: RobotResourceImport) -> None: # noqa: N802 def visit_VariablesImport(self, node: RobotVariablesImport) -> None: # noqa: N802 name = node.get_token(Token.NAME) - last_data_token = next(v for v in reversed(node.tokens) if v.type not in Token.NON_DATA_TOKENS) + last_data_token = next( + v for v in reversed(node.tokens) if v.type not in Token.NON_DATA_TOKENS + ) if node.name: self._results.append( VariablesImport( @@ -731,18 +936,30 @@ def __init__( self._analyzed = False self._analyze_lock = RLock(default_timeout=120, name="Namespace.analyze") self._library_doc: Optional[LibraryDoc] = None - self._library_doc_lock = RLock(default_timeout=120, name="Namespace.library_doc") + self._library_doc_lock = RLock( + default_timeout=120, name="Namespace.library_doc" + ) self._imports: Optional[List[Import]] = None self._import_entries: Dict[Import, LibraryEntry] = OrderedDict() self._own_variables: Optional[List[VariableDefinition]] = None - self._own_variables_lock = RLock(default_timeout=120, name="Namespace.own_variables") + self._own_variables_lock = RLock( + default_timeout=120, name="Namespace.own_variables" + ) self._global_variables: Optional[List[VariableDefinition]] = None - self._global_variables_lock = RLock(default_timeout=120, name="Namespace.global_variables") - self._global_variables_dict: Optional[Dict[VariableMatcher, VariableDefinition]] = None - self._global_variables_dict_lock = RLock(default_timeout=120, name="Namespace.global_variables_dict") + self._global_variables_lock = RLock( + default_timeout=120, name="Namespace.global_variables" + ) + self._global_variables_dict: Optional[ + Dict[VariableMatcher, VariableDefinition] + ] = None + self._global_variables_dict_lock = RLock( + default_timeout=120, name="Namespace.global_variables_dict" + ) self._imported_variables: Optional[List[VariableDefinition]] = None - self._imported_variables_lock = RLock(default_timeout=120, name="Namespace._imported_variables_lock") + self._imported_variables_lock = RLock( + default_timeout=120, name="Namespace._imported_variables_lock" + ) self._global_resolvable_variables: Optional[Dict[str, Any]] = None self._global_resolvable_variables_lock = RLock( @@ -755,19 +972,27 @@ def __init__( ) self._suite_variables: Optional[Dict[str, Any]] = None - self._suite_variables_lock = RLock(default_timeout=120, name="Namespace.global_variables") + self._suite_variables_lock = RLock( + default_timeout=120, name="Namespace.global_variables" + ) self._diagnostics: Optional[List[Diagnostic]] = None self._keyword_references: Optional[Dict[KeywordDoc, Set[Location]]] = None - self._variable_references: Optional[Dict[VariableDefinition, Set[Location]]] = None - self._local_variable_assignments: Optional[Dict[VariableDefinition, Set[Range]]] = None + self._variable_references: Optional[ + Dict[VariableDefinition, Set[Location]] + ] = None + self._local_variable_assignments: Optional[ + Dict[VariableDefinition, Set[Range]] + ] = None self._namespace_references: Optional[Dict[LibraryEntry, Set[Location]]] = None self._test_case_definitions: Optional[List[TestCaseDefinition]] = None self._tag_definitions: Optional[List[TagDefinition]] = None self._imported_keywords: Optional[List[KeywordDoc]] = None - self._imported_keywords_lock = RLock(default_timeout=120, name="Namespace.imported_keywords") + self._imported_keywords_lock = RLock( + default_timeout=120, name="Namespace.imported_keywords" + ) self._keywords: Optional[List[KeywordDoc]] = None self._keywords_lock = RLock(default_timeout=120, name="Namespace.keywords") @@ -784,13 +1009,45 @@ def __init__( self._in_initialize = False @event - def has_invalidated(sender) -> None: ... + def has_invalidated(sender) -> None: + ... @event - def has_initialized(sender) -> None: ... + def has_initialized(sender) -> None: + ... @event - def has_analysed(sender) -> None: ... + def has_analysed(sender) -> None: + ... + + @staticmethod + def _make_variable_ref_key(var: VariableDefinition) -> "VariableRefKey": + """Create a VariableRefKey from a VariableDefinition. + + This helper eliminates code duplication when creating cache keys + for variable references. + + Note: For RF 7.3+ compatibility, variable names are normalized to strip + type annotations. This ensures that "${arg: int}" and "${arg}" produce + the same key, matching how _visit_Arguments normalizes names. + """ + # Normalize name to strip type annotations for RF 7.3+ compatibility + normalized_name = var.name + if var.name and var.name.startswith(("${", "@{", "&{", "%{")): + try: + matcher = VariableMatcher(var.name, parse_type=True, ignore_errors=True) + if matcher.name: + normalized_name = matcher.name + except Exception: + pass # Keep original name if parsing fails + + return VariableRefKey( + var.source or "", + normalized_name, + var.type.value, + var.line_no, + var.col_offset, + ) @property def document(self) -> Optional[TextDocument]: @@ -798,7 +1055,11 @@ def document(self) -> Optional[TextDocument]: @property def document_uri(self) -> str: - return self.document.document_uri if self.document is not None else str(Uri.from_path(self.source)) + return ( + self.document.document_uri + if self.document is not None + else str(Uri.from_path(self.source)) + ) @property def search_order(self) -> Tuple[str, ...]: @@ -834,7 +1095,9 @@ def _on_resources_changed(self, sender: Any, resources: List[LibraryDoc]) -> Non invalidate = False for p in resources: - if any(e for e in self._resources.values() if e.library_doc.source == p.source): + if any( + e for e in self._resources.values() if e.library_doc.source == p.source + ): invalidate = True break @@ -849,7 +1112,11 @@ def _on_variables_changed(self, sender: Any, variables: List[LibraryDoc]) -> Non invalidate = False for p in variables: - if any(e for e in self._variables_imports.values() if e.library_doc.source == p.source): + if any( + e + for e in self._variables_imports.values() + if e.library_doc.source == p.source + ): invalidate = True break @@ -901,7 +1168,9 @@ def get_variable_references(self) -> Dict[VariableDefinition, Set[Location]]: self.analyze() - return self._variable_references if self._variable_references is not None else {} + return ( + self._variable_references if self._variable_references is not None else {} + ) def get_testcase_definitions(self) -> List[TestCaseDefinition]: if self._test_case_definitions is None: @@ -909,7 +1178,11 @@ def get_testcase_definitions(self) -> List[TestCaseDefinition]: self.analyze() - return self._test_case_definitions if self._test_case_definitions is not None else [] + return ( + self._test_case_definitions + if self._test_case_definitions is not None + else [] + ) def get_local_variable_assignments(self) -> Dict[VariableDefinition, Set[Range]]: if self._local_variable_assignments is None: @@ -917,7 +1190,11 @@ def get_local_variable_assignments(self) -> Dict[VariableDefinition, Set[Range]] self.analyze() - return self._local_variable_assignments if self._local_variable_assignments is not None else {} + return ( + self._local_variable_assignments + if self._local_variable_assignments is not None + else {} + ) def get_namespace_references(self) -> Dict[LibraryEntry, Set[Location]]: if self._namespace_references is None: @@ -925,7 +1202,9 @@ def get_namespace_references(self) -> Dict[LibraryEntry, Set[Location]]: self.analyze() - return self._namespace_references if self._namespace_references is not None else {} + return ( + self._namespace_references if self._namespace_references is not None else {} + ) def get_import_entries(self) -> Dict[Import, LibraryEntry]: self.ensure_initialized() @@ -944,9 +1223,17 @@ def get_namespaces(self) -> Dict[KeywordMatcher, List[LibraryEntry]]: self._namespaces = defaultdict(list) for v in (self.get_libraries()).values(): - self._namespaces[KeywordMatcher(v.alias or v.name or v.import_name, is_namespace=True)].append(v) + self._namespaces[ + KeywordMatcher( + v.alias or v.name or v.import_name, is_namespace=True + ) + ].append(v) for v in (self.get_resources()).values(): - self._namespaces[KeywordMatcher(v.alias or v.name or v.import_name, is_namespace=True)].append(v) + self._namespaces[ + KeywordMatcher( + v.alias or v.name or v.import_name, is_namespace=True + ) + ].append(v) return self._namespaces @@ -964,10 +1251,1013 @@ def get_variables_imports(self) -> Dict[str, VariablesEntry]: def get_library_doc(self) -> LibraryDoc: with self._library_doc_lock: if self._library_doc is None: - self._library_doc = self.imports_manager.get_libdoc_from_model(self.model, self.source) + self._library_doc = self.imports_manager.get_libdoc_from_model( + self.model, self.source + ) return self._library_doc + @staticmethod + def _compute_content_hash(path: Path) -> tuple[int, str]: + """Compute robust content hash using tiered strategy. + + Returns (file_size, hash_string). + Hash covers: file_size + first_64KB + last_64KB + + This catches: + - Appended content (size change) + - Inserted content (size change) + - Modified content in first 64KB + - Modified content in last 64KB + """ + stat = path.stat() + file_size = stat.st_size + + with open(path, "rb") as f: + first_chunk = f.read(65536) + + # For files > 64KB, also hash last 64KB + if file_size > 65536: + f.seek(max(0, file_size - 65536)) + last_chunk = f.read(65536) + else: + last_chunk = b"" + + # Combine size + both chunks into single hash + hasher = hashlib.sha256() + hasher.update(f"{file_size}:".encode()) + hasher.update(first_chunk) + hasher.update(last_chunk) + + return file_size, hasher.hexdigest() + + def get_cache_metadata(self) -> NamespaceMetaData | None: + """Generate metadata for cache validation.""" + if not self._initialized: + return None + + source_path = Path(self.source) + if not source_path.exists(): + return None + + try: + # Compute content hash for robust validation + file_size, content_hash = self._compute_content_hash(source_path) + except OSError: + self._logger.debug( + lambda: f"Failed to compute content hash for {self.source}" + ) + return None + + library_mtimes: list[tuple[str, int]] = [] + for entry in self._libraries.values(): + if entry.library_doc.source: + lib_path = Path(entry.library_doc.source) + if lib_path.exists(): + library_mtimes.append( + (entry.library_doc.source, lib_path.stat().st_mtime_ns) + ) + + resource_mtimes: list[tuple[str, int]] = [] + for entry in self._resources.values(): + if entry.library_doc.source: + res_path = Path(entry.library_doc.source) + if res_path.exists(): + resource_mtimes.append( + (entry.library_doc.source, res_path.stat().st_mtime_ns) + ) + + variables_mtimes: list[tuple[str, int]] = [] + for entry in self._variables_imports.values(): + if entry.library_doc.source: + var_path = Path(entry.library_doc.source) + if var_path.exists(): + variables_mtimes.append( + (entry.library_doc.source, var_path.stat().st_mtime_ns) + ) + + # Compute environment identity - hash sys.path in original order + # Order matters for import resolution (first match wins) + sys_path_hash = hashlib.sha256("\n".join(sys.path).encode("utf-8")).hexdigest()[ + :16 + ] + + return NamespaceMetaData( + meta_version=NAMESPACE_META_VERSION, + source=self.source, + mtime=source_path.stat().st_mtime_ns, + file_size=file_size, + content_hash=content_hash, + library_sources_mtimes=tuple(library_mtimes), + resource_sources_mtimes=tuple(resource_mtimes), + variables_sources_mtimes=tuple(variables_mtimes), + robot_version=get_robot_version_str(), + python_executable=sys.executable, + sys_path_hash=sys_path_hash, + ) + + def _serialize_namespace_references( + self, + ) -> tuple[tuple[int, tuple[Location, ...]], ...]: + """Serialize _namespace_references for caching. + + Maps LibraryEntry keys to import indices (position in _imports list). + """ + if self._namespace_references is None or self._imports is None: + return () + + # Build reverse mapping: LibraryEntry -> import index + entry_to_index: dict[LibraryEntry, int] = {} + for i, imp in enumerate(self._imports): + if imp in self._import_entries: + entry_to_index[self._import_entries[imp]] = i + + # Serialize namespace references using import indices + result: list[tuple[int, tuple[Location, ...]]] = [] + for entry, locations in self._namespace_references.items(): + if entry in entry_to_index: + result.append((entry_to_index[entry], tuple(locations))) + + return tuple(result) + + def _serialize_keyword_references( + self, + ) -> tuple[tuple[KeywordRefKey, tuple[Location, ...]], ...]: + """Serialize _keyword_references for caching using stable keys. + + Uses (source, name, line_no) as a stable key that survives cache sessions. + """ + if self._keyword_references is None: + return () + + result: list[tuple[KeywordRefKey, tuple[Location, ...]]] = [] + for kw_doc, locations in self._keyword_references.items(): + key = KeywordRefKey( + source=kw_doc.source or "", + name=kw_doc.name, + line_no=kw_doc.line_no, + ) + result.append((key, tuple(locations))) + + return tuple(result) + + def _serialize_variable_references( + self, + ) -> tuple[tuple[VariableRefKey, tuple[Location, ...]], ...]: + """Serialize _variable_references for caching using stable keys. + + Uses (source, name, var_type, line_no, col_offset) as a stable key. + """ + if self._variable_references is None: + return () + + result: list[tuple[VariableRefKey, tuple[Location, ...]]] = [] + for var_def, locations in self._variable_references.items(): + result.append((self._make_variable_ref_key(var_def), tuple(locations))) + + return tuple(result) + + def _serialize_local_variable_assignments( + self, + ) -> tuple[tuple[VariableRefKey, tuple[Range, ...]], ...]: + """Serialize _local_variable_assignments for caching using stable keys. + + Uses the same key format as variable references. + """ + if self._local_variable_assignments is None: + return () + + result: list[tuple[VariableRefKey, tuple[Range, ...]]] = [] + for var_def, ranges in self._local_variable_assignments.items(): + result.append((self._make_variable_ref_key(var_def), tuple(ranges))) + + return tuple(result) + + def to_cache_data(self) -> NamespaceCacheData: + """Extract serializable state for disk caching.""" + # Convert LibraryEntry -> CachedLibraryEntry + cached_libraries: list[tuple[str, CachedLibraryEntry]] = [] + for key, entry in self._libraries.items(): + cached_libraries.append( + ( + key, + CachedLibraryEntry( + name=entry.name, + import_name=entry.import_name, + library_doc_source=entry.library_doc.source, + args=entry.args, + alias=entry.alias, + import_range=entry.import_range, + import_source=entry.import_source, + alias_range=entry.alias_range, + ), + ) + ) + + # Convert ResourceEntry -> CachedResourceEntry + cached_resources: list[tuple[str, CachedResourceEntry]] = [] + for key, entry in self._resources.items(): + cached_resources.append( + ( + key, + CachedResourceEntry( + name=entry.name, + import_name=entry.import_name, + library_doc_source=entry.library_doc.source, + args=entry.args, + alias=entry.alias, + import_range=entry.import_range, + import_source=entry.import_source, + alias_range=entry.alias_range, + imports=tuple(entry.imports), + variables=tuple(entry.variables), + ), + ) + ) + + # Build resources_files mapping (source -> key) + resources_files: list[tuple[str, str]] = [] + for source, entry in self._resources_files.items(): + # Find the key in _resources that corresponds to this entry + for key, res_entry in self._resources.items(): + if res_entry is entry: + resources_files.append((source, key)) + break + + # Convert VariablesEntry -> CachedVariablesEntry + cached_variables: list[tuple[str, CachedVariablesEntry]] = [] + for key, entry in self._variables_imports.items(): + cached_variables.append( + ( + key, + CachedVariablesEntry( + name=entry.name, + import_name=entry.import_name, + library_doc_source=entry.library_doc.source, + args=entry.args, + alias=entry.alias, + import_range=entry.import_range, + import_source=entry.import_source, + alias_range=entry.alias_range, + variables=tuple(entry.variables), + ), + ) + ) + + return NamespaceCacheData( + libraries=tuple(cached_libraries), + resources=tuple(cached_resources), + resources_files=tuple(resources_files), + variables_imports=tuple(cached_variables), + own_variables=tuple(self._own_variables) + if self._own_variables is not None + else (), + imports=tuple(self._imports) if self._imports is not None else (), + library_doc=self._library_doc, + # Include analysis results if analysis was completed + analyzed=self._analyzed, + diagnostics=tuple(self._diagnostics) + if self._diagnostics is not None + else (), + test_case_definitions=( + tuple(self._test_case_definitions) + if self._test_case_definitions is not None + else () + ), + tag_definitions=tuple(self._tag_definitions) + if self._tag_definitions is not None + else (), + namespace_references=self._serialize_namespace_references(), + # Full analysis caching + keyword_references=self._serialize_keyword_references() + if self._analyzed + else (), + variable_references=self._serialize_variable_references() + if self._analyzed + else (), + local_variable_assignments=self._serialize_local_variable_assignments() + if self._analyzed + else (), + fully_analyzed=self._analyzed, + ) + + @classmethod + def _restore_libraries_from_cache( + cls, + ns: "Namespace", + cached_libraries: tuple[tuple[str, CachedLibraryEntry], ...], + imports_manager: "ImportsManager", + ) -> bool: + """Restore library entries from cache. Returns False if cache is stale.""" + for key, cached_entry in cached_libraries: + library_doc = imports_manager.get_libdoc_for_source( + cached_entry.library_doc_source, cached_entry.name + ) + if library_doc is None: + ns._logger.debug( + lambda: f"Library cache miss: {cached_entry.name} (source={cached_entry.library_doc_source})" + ) + return False + ns._libraries[key] = LibraryEntry( + name=cached_entry.name, + import_name=cached_entry.import_name, + library_doc=library_doc, + args=cached_entry.args, + alias=cached_entry.alias, + import_range=cached_entry.import_range, + import_source=cached_entry.import_source, + alias_range=cached_entry.alias_range, + ) + return True + + @classmethod + def _restore_resources_from_cache( + cls, + ns: "Namespace", + cached_resources: tuple[tuple[str, CachedResourceEntry], ...], + imports_manager: "ImportsManager", + ) -> bool: + """Restore resource entries from cache. Returns False if cache is stale.""" + for key, cached_entry in cached_resources: + library_doc = imports_manager.get_resource_libdoc_for_source( + cached_entry.library_doc_source + ) + if library_doc is None: + ns._logger.debug( + lambda: f"Resource cache miss: {cached_entry.name} (source={cached_entry.library_doc_source})" + ) + return False + ns._resources[key] = ResourceEntry( + name=cached_entry.name, + import_name=cached_entry.import_name, + library_doc=library_doc, + args=cached_entry.args, + alias=cached_entry.alias, + import_range=cached_entry.import_range, + import_source=cached_entry.import_source, + alias_range=cached_entry.alias_range, + imports=list(cached_entry.imports), + variables=list(cached_entry.variables), + ) + return True + + @classmethod + def _restore_variables_from_cache( + cls, + ns: "Namespace", + cached_variables: tuple[tuple[str, CachedVariablesEntry], ...], + imports_manager: "ImportsManager", + ) -> bool: + """Restore variables entries from cache. Returns False if cache is stale.""" + for key, cached_entry in cached_variables: + library_doc = imports_manager.get_variables_libdoc_for_source( + cached_entry.library_doc_source + ) + if library_doc is None: + ns._logger.debug( + lambda: f"Variables cache miss: {cached_entry.name} (source={cached_entry.library_doc_source})" + ) + return False + ns._variables_imports[key] = VariablesEntry( + name=cached_entry.name, + import_name=cached_entry.import_name, + library_doc=library_doc, + args=cached_entry.args, + alias=cached_entry.alias, + import_range=cached_entry.import_range, + import_source=cached_entry.import_source, + alias_range=cached_entry.alias_range, + variables=list(cached_entry.variables), + ) + return True + + @classmethod + def _match_library_import( + cls, + imp: LibraryImport, + entry: LibraryEntry, + ) -> bool: + """Match a library import to an entry using resolution-based matching. + + Priority order: + 1. Exact alias match (if import has alias) + 2. Exact import_name match + 3. Exact library name match + 4. Source path match (for path-based imports) + + Does NOT use substring matching to avoid false positives like + "MyLib" matching "MyLibExtended". + """ + # 1. Best: alias match (most specific) + if imp.alias and entry.name == imp.alias: + return True + + # 2. Exact import_name match + if imp.name and entry.import_name == imp.name: + return True + + # 3. Exact library name match (case-insensitive for standard libs) + if imp.name and entry.name: + if entry.name == imp.name: + return True + # Case-insensitive match for library names + if entry.name.lower() == imp.name.lower(): + return True + + # 4. Source path match for path-based imports + if imp.name and entry.library_doc.source: + # Check if import name ends with the library filename + lib_filename = Path(entry.library_doc.source).stem + imp_path = Path(imp.name) + if imp_path.stem == lib_filename: + return True + # Also check the full library doc name + if entry.library_doc.name and entry.library_doc.name == imp.name: + return True + + return False + + @classmethod + def _rebuild_import_entries(cls, ns: "Namespace") -> None: + """Rebuild _import_entries mapping from restored imports and library/resource/variables entries. + + This is needed after restoring from cache so the analyzer can find namespace references. + The _import_entries dict maps Import objects to their corresponding LibraryEntry. + + Note: When the same library/resource is imported multiple times, each import gets its + own entry in _import_entries (with the same library_doc but different import_range/source). + """ + if ns._imports is None: + return + + for imp in ns._imports: + if isinstance(imp, LibraryImport): + # Find a library entry using resolution-based matching + for entry in ns._libraries.values(): + if cls._match_library_import(imp, entry): + # Create a new entry for this import with the correct range/source + ns._import_entries[imp] = LibraryEntry( + name=entry.name, + import_name=imp.name or "", + library_doc=entry.library_doc, + args=imp.args, + alias=imp.alias, + import_range=imp.range, + import_source=imp.source, + alias_range=imp.alias_range, + ) + break + elif isinstance(imp, ResourceImport): + for entry in ns._resources.values(): + if entry.import_name == imp.name or entry.name == imp.name: + ns._import_entries[imp] = ResourceEntry( + name=entry.name, + import_name=imp.name or "", + library_doc=entry.library_doc, + args=(), + alias=None, + import_range=imp.range, + import_source=imp.source, + imports=entry.imports, + variables=entry.variables, + ) + break + elif isinstance(imp, VariablesImport): + for entry in ns._variables_imports.values(): + if entry.import_name == imp.name or entry.name == imp.name: + ns._import_entries[imp] = VariablesEntry( + name=entry.name, + import_name=imp.name or "", + library_doc=entry.library_doc, + args=imp.args, + alias=None, + import_range=imp.range, + import_source=imp.source, + variables=entry.variables, + ) + break + + @classmethod + def _restore_namespace_references( + cls, + ns: "Namespace", + cached_refs: tuple[tuple[int, tuple[Location, ...]], ...], + ) -> None: + """Restore _namespace_references from cached import indices. + + Maps import indices back to LibraryEntry objects using _import_entries. + """ + if not cached_refs or ns._imports is None: + ns._namespace_references = {} + return + + # Build mapping: import index -> LibraryEntry + index_to_entry: dict[int, LibraryEntry] = {} + for i, imp in enumerate(ns._imports): + if imp in ns._import_entries: + index_to_entry[i] = ns._import_entries[imp] + + # Restore namespace references + ns._namespace_references = {} + for import_idx, locations in cached_refs: + if import_idx in index_to_entry: + entry = index_to_entry[import_idx] + ns._namespace_references[entry] = set(locations) + + @classmethod + def _restore_keyword_references( + cls, + ns: "Namespace", + cached_refs: tuple[tuple[KeywordRefKey, tuple[Location, ...]], ...], + ) -> dict[KeywordDoc, set[Location]] | None: + """Restore _keyword_references from cached stable keys. + + Returns None if >10% of references are missing (cache likely stale), + otherwise returns the restored dictionary. + """ + if not cached_refs: + return {} + + lookup: dict[KeywordRefKey, KeywordDoc] = {} + + # Include keywords from all imported libraries + for entry in ns._libraries.values(): + for kw in entry.library_doc.keywords: + key = KeywordRefKey(kw.source or "", kw.name, kw.line_no) + lookup[key] = kw + + # Include keywords from all imported resources + for entry in ns._resources.values(): + for kw in entry.library_doc.keywords: + key = KeywordRefKey(kw.source or "", kw.name, kw.line_no) + lookup[key] = kw + + # Include own keywords if this file has a library_doc + if ns._library_doc is not None: + for kw in ns._library_doc.keywords: + key = KeywordRefKey(kw.source or "", kw.name, kw.line_no) + lookup[key] = kw + + # Restore references with validation + result: dict[KeywordDoc, set[Location]] = {} + missing = 0 + missing_keys: list[KeywordRefKey] = [] + + for key, locations in cached_refs: + if key in lookup: + result[lookup[key]] = set(locations) + else: + missing += 1 + if len(missing_keys) < 5: # Log first 5 missing keys for debugging + missing_keys.append(key) + + # If >10% missing, cache is likely stale - signal to recompute + if missing > len(cached_refs) * 0.1: + ns._logger.debug( + lambda: f"Keyword reference restoration failed: {missing}/{len(cached_refs)} missing " + f"(>{int(len(cached_refs) * 0.1)} threshold). " + f"Sample missing keys: {missing_keys}" + ) + return None + + return result + + @staticmethod + def _get_variable_name_length(name: str) -> int: + """Get the length of just the variable name, excluding ${} or @{} etc. + + For example: + "${first}" -> 5 (length of "first") + "${a}" -> 1 (length of "a") + "@{items}" -> 5 (length of "items") + """ + if name.startswith(("${", "@{", "&{", "%{")) and name.endswith("}"): + return len(name) - _VAR_WRAPPER_LEN + return len(name) + + @classmethod + def _strip_argument_definition( + cls, arg_def: "ArgumentDefinition" + ) -> tuple["ArgumentDefinition", "VariableRefKey"]: + """Convert an ArgumentDefinition from original positions to stripped positions. + + _get_argument_definitions_from_line creates ArgumentDefinitions with positions + pointing to the full variable syntax (e.g., col_offset points to '$' in '${first}'). + + _visit_Arguments creates ArgumentDefinitions with stripped positions + (e.g., col_offset points to 'f' in '${first}'). + + This function converts from original to stripped positions for cache restoration. + + Note: Library arguments have col_offset=-1 (sentinel value for "no position"). + These should NOT be adjusted - return them as-is. + """ + # Library arguments have col_offset=-1 (no position info) - don't adjust + if arg_def.col_offset < 0: + return arg_def, cls._make_variable_ref_key(arg_def) + + var_name_len = cls._get_variable_name_length(arg_def.name) + stripped_col = arg_def.col_offset + _VAR_PREFIX_LEN + stripped_end = stripped_col + var_name_len + + stripped_arg_def = replace( + arg_def, + col_offset=stripped_col, + end_col_offset=stripped_end, + ) + + return stripped_arg_def, cls._make_variable_ref_key(stripped_arg_def) + + @classmethod + def _add_argument_definitions_from_keywords( + cls, + keywords: KeywordStore, + lookup: dict["VariableRefKey", VariableDefinition], + ) -> None: + """Add ArgumentDefinitions from keywords to the lookup dictionary. + + This helper extracts argument definitions from keyword [Arguments] sections + and adds them to the lookup with stripped positions (pointing to variable name + rather than the full ${var} syntax). + """ + for kw_doc in keywords.values(): + if kw_doc.argument_definitions: + for arg_def in kw_doc.argument_definitions: + stripped_arg_def, stripped_key = cls._strip_argument_definition( + arg_def + ) + if stripped_key not in lookup: + lookup[stripped_key] = stripped_arg_def + + @classmethod + def _add_embedded_argument_definitions_from_keywords( + cls, + keywords: KeywordStore, + lookup: dict["VariableRefKey", VariableDefinition], + ) -> None: + """Add EmbeddedArgumentDefinitions from keyword names to the lookup dictionary. + + This helper extracts embedded argument definitions from keyword names like + 'My Keyword ${arg}' and adds them to the lookup. These are NOT included in + kw_doc.argument_definitions (which only has [Arguments] line arguments). + + Embedded arguments use original positions (pointing to '$' in '${arg}'), + unlike regular arguments which use stripped positions. + """ + for kw_doc in keywords.values(): + if not kw_doc.is_embedded or not kw_doc.name_token: + continue + + # Parse embedded arguments from keyword name, similar to visit_KeywordName + name_token = kw_doc.name_token + for variable_token in filter( + lambda e: e.type == Token.VARIABLE, + tokenize_variables(name_token, identifiers="$", ignore_errors=True), + ): + if not variable_token.value: + continue + + matcher = search_variable(variable_token.value, "$", ignore_errors=True) + if matcher.base is None: + continue + + # Extract name (without pattern/type for embedded args with patterns) + if ":" not in matcher.base: + name = matcher.base + else: + # Handle ${name:pattern} or ${name: type: pattern} (RF 7.3+) + name = matcher.base.split(":")[0] + + full_name = f"{matcher.identifier}{{{name}}}" + # Use stripped positions (pointing to variable name, not $) + # This matches how references are found during live analysis + stripped_col = variable_token.col_offset + _VAR_PREFIX_LEN + arg_def = EmbeddedArgumentDefinition( + name=full_name, + name_token=None, + line_no=variable_token.lineno, + col_offset=stripped_col, + end_line_no=variable_token.lineno, + end_col_offset=stripped_col + len(name), + source=kw_doc.source, + keyword_doc=kw_doc, + ) + key = cls._make_variable_ref_key(arg_def) + if key not in lookup: + lookup[key] = arg_def + + @classmethod + def _create_variable_definition_from_key( + cls, + key: "VariableRefKey", + ) -> VariableDefinition: + """Create a VariableDefinition from a VariableRefKey. + + Used to recreate local variables from cache keys when the original + VariableDefinition is not available (e.g., for ${result}= assignments). + """ + var_name_len = cls._get_variable_name_length(key.name) + return VariableDefinition( + line_no=key.line_no, + col_offset=key.col_offset, + end_line_no=key.line_no, + end_col_offset=key.col_offset + var_name_len, + source=key.source or None, + name=key.name, + name_token=None, + type=VariableDefinitionType(key.var_type), + ) + + @classmethod + def _restore_variable_references( + cls, + ns: "Namespace", + cached_refs: tuple[tuple[VariableRefKey, tuple[Location, ...]], ...], + cached_local_var_assigns: tuple[ + tuple[VariableRefKey, tuple[Range, ...]], ... + ] = (), + ) -> dict[VariableDefinition, set[Location]] | None: + """Restore _variable_references from cached stable keys. + + Returns None if >10% of references are missing (cache likely stale), + otherwise returns the restored dictionary. + + Args: + cached_local_var_assigns: Local variable assignment keys - used to create + VariableDefinition objects for local variables (like ${result}=) that + aren't in _own_variables but may be referenced. + """ + if not cached_refs: + return {} + + lookup: dict[VariableRefKey, VariableDefinition] = {} + + # Include built-in variables (${TEST_NAME}, ${SUITE_NAME}, etc.) + # These have source="" and are shared across all namespaces + for var in cls.get_builtin_variables(): + lookup[cls._make_variable_ref_key(var)] = var + + # Include own variables + if ns._own_variables is not None: + for var in ns._own_variables: + lookup[cls._make_variable_ref_key(var)] = var + + # Include variables from imported resources + for res_entry in ns._resources.values(): + for var in res_entry.variables: + lookup[cls._make_variable_ref_key(var)] = var + + # Include variables from variables imports + for var_entry in ns._variables_imports.values(): + for var in var_entry.variables: + lookup[cls._make_variable_ref_key(var)] = var + + # Include ArgumentDefinitions from keywords' [Arguments] sections. + # These are needed because during analysis, KeywordDoc.argument_definitions + # are added to _variable_references when named arguments are used. + # Note: _get_argument_definitions_from_line uses original positions (pointing to ${), + # while _visit_Arguments uses stripped positions (pointing to variable name). + # We convert to stripped positions to match _visit_Arguments behavior. + # + # Also include EmbeddedArgumentDefinitions from keyword names (like "My Keyword ${arg}"). + # These are NOT in argument_definitions and use original positions. + + # From resource keywords + for res_entry in ns._resources.values(): + if res_entry.library_doc: + cls._add_argument_definitions_from_keywords( + res_entry.library_doc.keywords, lookup + ) + cls._add_embedded_argument_definitions_from_keywords( + res_entry.library_doc.keywords, lookup + ) + + # From library keywords + for lib_entry in ns._libraries.values(): + if lib_entry.library_doc: + cls._add_argument_definitions_from_keywords( + lib_entry.library_doc.keywords, lookup + ) + cls._add_embedded_argument_definitions_from_keywords( + lib_entry.library_doc.keywords, lookup + ) + + # From the file's own keywords (*** Keywords *** section) + if ns._library_doc: + cls._add_argument_definitions_from_keywords( + ns._library_doc.keywords, lookup + ) + cls._add_embedded_argument_definitions_from_keywords( + ns._library_doc.keywords, lookup + ) + + # Include local variables from assignments (e.g., ${result}= keyword call) + # These need to be recreated from the cache since they aren't in _own_variables + for local_key, _ in cached_local_var_assigns: + if local_key not in lookup: + lookup[local_key] = cls._create_variable_definition_from_key(local_key) + + # Restore references with validation + result: dict[VariableDefinition, set[Location]] = {} + missing = 0 + missing_keys: list[VariableRefKey] = [] + + for key, locations in cached_refs: + if key in lookup: + result[lookup[key]] = set(locations) + else: + # Try adjusted position for arguments (different position encoding paths) + # Some cached references have col_offset with additional offset that needs + # to be adjusted to match the stripped positions in the lookup + if key.var_type == "argument": + adjusted_key = VariableRefKey( + key.source, + key.name, + key.var_type, + key.line_no, + key.col_offset - _VAR_PREFIX_LEN, + ) + if adjusted_key in lookup: + result[lookup[adjusted_key]] = set(locations) + continue + missing += 1 + if len(missing_keys) < 5: + missing_keys.append(key) + + # If >10% missing, cache is likely stale - signal to recompute + if missing > len(cached_refs) * 0.1: + ns._logger.debug( + lambda: f"Variable reference restoration failed: {missing}/{len(cached_refs)} missing " + f"(>{int(len(cached_refs) * 0.1)} threshold). " + f"Sample missing keys: {missing_keys}" + ) + return None + + return result + + @classmethod + def _restore_local_variable_assignments( + cls, + ns: "Namespace", + cached_refs: tuple[tuple[VariableRefKey, tuple[Range, ...]], ...], + ) -> dict[VariableDefinition, set[Range]]: + """Restore _local_variable_assignments from cached stable keys. + + Unlike _restore_variable_references, this method always succeeds because + local variables (like ${result}= from keyword calls) aren't in _own_variables + and must be recreated from the cache keys. Missing variables are expected + and created on-demand rather than treated as cache staleness. + """ + if not cached_refs: + return {} + + lookup: dict[VariableRefKey, VariableDefinition] = {} + + if ns._own_variables is not None: + for var in ns._own_variables: + lookup[cls._make_variable_ref_key(var)] = var + + # Also check resources for local variables defined there + for res_entry in ns._resources.values(): + for var in res_entry.variables: + lookup[cls._make_variable_ref_key(var)] = var + + # Restore assignments - create VariableDefinition for missing local variables + result: dict[VariableDefinition, set[Range]] = {} + + for key, ranges in cached_refs: + if key in lookup: + result[lookup[key]] = set(ranges) + else: + # Create a VariableDefinition from the cached key for local variables + # These are variables like ${result}= that aren't in _own_variables + local_var = cls._create_variable_definition_from_key(key) + lookup[key] = local_var + result[local_var] = set(ranges) + + return result + + @classmethod + def from_cache_data( + cls, + cache_data: NamespaceCacheData, + imports_manager: "ImportsManager", + model: ast.AST, + source: str, + document: TextDocument | None = None, + document_type: "DocumentType | None" = None, + languages: Languages | None = None, + workspace_languages: Languages | None = None, + ) -> "Namespace | None": + """Create a pre-initialized namespace from cached data.""" + # Create namespace instance without initializing + ns = cls( + imports_manager=imports_manager, + model=model, + source=source, + document=document, + document_type=document_type, + languages=languages, + workspace_languages=workspace_languages, + ) + + # Restore libraries + if not cls._restore_libraries_from_cache( + ns, cache_data.libraries, imports_manager + ): + ns._logger.debug( + lambda: f"Failed to restore libraries from cache for {source}" + ) + return None + + # Restore resources + if not cls._restore_resources_from_cache( + ns, cache_data.resources, imports_manager + ): + ns._logger.debug( + lambda: f"Failed to restore resources from cache for {source}" + ) + return None + + # Restore resources_files mapping + for src, key in cache_data.resources_files: + if key in ns._resources: + ns._resources_files[src] = ns._resources[key] + + # Restore variables + if not cls._restore_variables_from_cache( + ns, cache_data.variables_imports, imports_manager + ): + ns._logger.debug( + lambda: f"Failed to restore variables from cache for {source}" + ) + return None + + # Restore other state + ns._own_variables = list(cache_data.own_variables) + ns._imports = list(cache_data.imports) + ns._library_doc = cache_data.library_doc + + # Rebuild _import_entries mapping from restored imports and entries + # This is needed for the analyzer to find namespace references + cls._rebuild_import_entries(ns) + + # Mark as initialized + ns._initialized = True + + # Restore cached diagnostics if available + if cache_data.analyzed and cache_data.diagnostics: + ns._diagnostics = list(cache_data.diagnostics) + + # Restore cached test case and tag definitions + if cache_data.test_case_definitions: + ns._test_case_definitions = list(cache_data.test_case_definitions) + if cache_data.tag_definitions: + ns._tag_definitions = list(cache_data.tag_definitions) + + # Restore namespace references + if cache_data.namespace_references: + cls._restore_namespace_references(ns, cache_data.namespace_references) + + # Attempt full analysis restoration if available + # This allows skipping the analysis phase entirely on warm start + if cache_data.fully_analyzed: + keyword_refs = cls._restore_keyword_references( + ns, cache_data.keyword_references + ) + variable_refs = cls._restore_variable_references( + ns, + cache_data.variable_references, + cache_data.local_variable_assignments, + ) + # Note: _restore_local_variable_assignments always succeeds (creates missing vars from cache) + local_var_assigns = cls._restore_local_variable_assignments( + ns, cache_data.local_variable_assignments + ) + + # Only set _analyzed=True if keyword and variable references were restored successfully + # If any returned None (>10% missing), fall back to recomputing + if keyword_refs is not None and variable_refs is not None: + ns._keyword_references = keyword_refs + ns._variable_references = variable_refs + ns._local_variable_assignments = local_var_assigns + ns._analyzed = True + ns._logger.debug( + lambda: f"Restored full analysis state from cache for {source}" + ) + else: + ns._logger.debug( + lambda: f"Could not restore full analysis from cache for {source}: " + f"keyword_refs={keyword_refs is not None}, " + f"variable_refs={variable_refs is not None}" + ) + else: + ns._logger.debug( + lambda: f"Cache for {source} does not have fully_analyzed=True" + ) + + return ns + class DataEntry(NamedTuple): libraries: Dict[str, LibraryEntry] = OrderedDict() resources: Dict[str, ResourceEntry] = OrderedDict() @@ -981,7 +2271,8 @@ def ensure_initialized(self) -> bool: with self._initialize_lock: if not self._initialized: with self._logger.measure_time( - lambda: f"Initialize Namespace for {self.source}", context_name="import" + lambda: f"Initialize Namespace for {self.source}", + context_name="import", ): succeed = False try: @@ -1043,7 +2334,10 @@ def get_own_variables(self) -> List[VariableDefinition]: @classmethod def get_builtin_variables(cls) -> List[VariableDefinition]: if cls._builtin_variables is None: - cls._builtin_variables = [BuiltInVariableDefinition(0, 0, 0, 0, "", n, None) for n in BUILTIN_VARIABLES] + cls._builtin_variables = [ + BuiltInVariableDefinition(0, 0, 0, 0, "", n, None) + for n in BUILTIN_VARIABLES + ] return cls._builtin_variables @@ -1102,11 +2396,19 @@ def yield_variables( nodes if nodes else [], ) ) - test_or_keyword = test_or_keyword_nodes[0] if test_or_keyword_nodes else None + test_or_keyword = ( + test_or_keyword_nodes[0] if test_or_keyword_nodes else None + ) - in_args = isinstance(test_or_keyword_nodes[-1], Arguments) if test_or_keyword_nodes else False + in_args = ( + isinstance(test_or_keyword_nodes[-1], Arguments) + if test_or_keyword_nodes + else False + ) only_args = ( - isinstance(test_or_keyword_nodes[-1], (Arguments, Setup, Timeout)) if test_or_keyword_nodes else False + isinstance(test_or_keyword_nodes[-1], (Arguments, Setup, Timeout)) + if test_or_keyword_nodes + else False ) yield from ( @@ -1115,13 +2417,19 @@ def yield_variables( ( ( (OnlyArgumentsVisitor if only_args else BlockVariableVisitor)( - self, nodes, position, in_args, resolved_variables=self.get_global_resolved_variables() + self, + nodes, + position, + in_args, + resolved_variables=self.get_global_resolved_variables(), ).get(test_or_keyword) ) if test_or_keyword is not None and not skip_local_variables else [] ), - [] if skip_global_variables or skip_commandline_variables else self.get_command_line_variables(), + [] + if skip_global_variables or skip_commandline_variables + else self.get_command_line_variables(), [] if skip_global_variables else self.get_imported_variables(), [] if skip_global_variables else self.get_own_variables(), [] if skip_global_variables else self.get_builtin_variables(), @@ -1143,7 +2451,9 @@ def get_imported_variables(self) -> List[VariableDefinition]: def get_suite_variables(self) -> Dict[str, Any]: with self._suite_variables_lock: if self._suite_variables is None: - self._suite_variables = {v.name: v.value for v in reversed(self.get_global_variables())} + self._suite_variables = { + v.name: v.value for v in reversed(self.get_global_variables()) + } return self._suite_variables @@ -1155,7 +2465,9 @@ def get_resolvable_variables( if nodes: return { v.convertable_name: v.value - for k, v in self.yield_variables(nodes, position, skip_commandline_variables=True) + for k, v in self.yield_variables( + nodes, position, skip_commandline_variables=True + ) if v.has_value } @@ -1163,7 +2475,9 @@ def get_resolvable_variables( if self._global_resolvable_variables is None: self._global_resolvable_variables = { v.convertable_name: v.value - for k, v in self.yield_variables(nodes, position, skip_commandline_variables=True) + for k, v in self.yield_variables( + nodes, position, skip_commandline_variables=True + ) if v.has_value } return self._global_resolvable_variables @@ -1282,7 +2596,10 @@ def _import( if ( top_level and result.library_doc.errors is None - and (len(result.library_doc.keywords) == 0 and not bool(result.library_doc.has_listener)) + and ( + len(result.library_doc.keywords) == 0 + and not bool(result.library_doc.has_listener) + ) ): self.append_diagnostics( range=value.range, @@ -1295,18 +2612,24 @@ def _import( if value.name is None: raise NameSpaceError("Resource setting requires value.") - source = self.imports_manager.find_resource(value.name, base_dir, variables=variables) + source = self.imports_manager.find_resource( + value.name, base_dir, variables=variables + ) allread_imported_resource = next( ( v for k, v in self._resources.items() - if v.library_doc.source is not None and same_file(v.library_doc.source, source) + if v.library_doc.source is not None + and same_file(v.library_doc.source, source) ), None, ) if allread_imported_resource is not None: - self._logger.debug(lambda: f"Resource '{value.name}' already imported.", context_name="import") + self._logger.debug( + lambda: f"Resource '{value.name}' already imported.", + context_name="import", + ) if top_level: self.append_diagnostics( range=value.range, @@ -1317,7 +2640,11 @@ def _import( [ DiagnosticRelatedInformation( location=Location( - uri=str(Uri.from_path(allread_imported_resource.import_source)), + uri=str( + Uri.from_path( + allread_imported_resource.import_source + ) + ), range=allread_imported_resource.import_range, ), message="", @@ -1414,7 +2741,10 @@ def _import( if top_level and result is not None: if result.library_doc.source is not None and result.library_doc.errors: - if any(err.source and Path(err.source).is_absolute() for err in result.library_doc.errors): + if any( + err.source and Path(err.source).is_absolute() + for err in result.library_doc.errors + ): self.append_diagnostics( range=value.range, message="Import definition contains errors.", @@ -1499,7 +2829,9 @@ def _import( ( [ DiagnosticRelatedInformation( - location=Location(str(Uri.from_path(parent_source)), value.range), + location=Location( + str(Uri.from_path(parent_source)), value.range + ), message=str(e), ), ] @@ -1543,9 +2875,14 @@ def _import_imports( if entry is not None: if isinstance(entry, ResourceEntry): assert entry.library_doc.source is not None - allread_imported_resource = self._resources_files.get(entry.library_doc.source, None) + allread_imported_resource = self._resources_files.get( + entry.library_doc.source, None + ) - if allread_imported_resource is None and entry.library_doc.source != self.source: + if ( + allread_imported_resource is None + and entry.library_doc.source != self.source + ): self._resources[entry.library_doc.source] = entry self._resources_files[entry.library_doc.source] = entry if entry.variables: @@ -1559,7 +2896,9 @@ def _import_imports( variables=variables, source=entry.library_doc.source, parent_import=imp if top_level else parent_import, - parent_source=parent_source if top_level else source, + parent_source=parent_source + if top_level + else source, ) except (SystemExit, KeyboardInterrupt): raise @@ -1581,7 +2920,10 @@ def _import_imports( source=DIAGNOSTICS_SOURCE_NAME, code=Error.RECURSIVE_IMPORT, ) - elif allread_imported_resource is not None and allread_imported_resource.library_doc.source: + elif ( + allread_imported_resource is not None + and allread_imported_resource.library_doc.source + ): self.append_diagnostics( range=entry.import_range, message=f"Resource {entry} already imported.", @@ -1591,7 +2933,11 @@ def _import_imports( [ DiagnosticRelatedInformation( location=Location( - uri=str(Uri.from_path(allread_imported_resource.import_source)), + uri=str( + Uri.from_path( + allread_imported_resource.import_source + ) + ), range=allread_imported_resource.import_range, ), message="", @@ -1612,9 +2958,15 @@ def _import_imports( ( e.library_doc.source is not None and entry.library_doc.source is not None - and same_file(e.library_doc.source, entry.library_doc.source) + and same_file( + e.library_doc.source, + entry.library_doc.source, + ) + ) + or ( + e.library_doc.source is None + and entry.library_doc.source is None ) - or (e.library_doc.source is None and entry.library_doc.source is None) ) and e.alias == entry.alias and e.args == entry.args @@ -1626,10 +2978,16 @@ def _import_imports( and entry.library_doc is not None and entry.library_doc.source_or_origin ): - self._variables_imports[entry.library_doc.source_or_origin] = entry + self._variables_imports[ + entry.library_doc.source_or_origin + ] = entry if entry.variables: variables = self.get_suite_variables() - elif top_level and already_imported_variables and already_imported_variables.library_doc.source: + elif ( + top_level + and already_imported_variables + and already_imported_variables.library_doc.source + ): self.append_diagnostics( range=entry.import_range, message=f'Variables "{entry}" already imported.', @@ -1639,7 +2997,11 @@ def _import_imports( [ DiagnosticRelatedInformation( location=Location( - uri=str(Uri.from_path(already_imported_variables.import_source)), + uri=str( + Uri.from_path( + already_imported_variables.import_source + ) + ), range=already_imported_variables.import_range, ), message="", @@ -1652,7 +3014,11 @@ def _import_imports( ) elif isinstance(entry, LibraryEntry): - if top_level and entry.name == BUILTIN_LIBRARY_NAME and entry.alias is None: + if ( + top_level + and entry.name == BUILTIN_LIBRARY_NAME + and entry.alias is None + ): self.append_diagnostics( range=entry.import_range, message=f'Library "{entry}" is not imported,' @@ -1663,7 +3029,9 @@ def _import_imports( [ DiagnosticRelatedInformation( location=Location( - uri=str(Uri.from_path(entry.import_source)), + uri=str( + Uri.from_path(entry.import_source) + ), range=entry.import_range, ), message="", @@ -1684,11 +3052,18 @@ def _import_imports( ( e.library_doc.source is not None and entry.library_doc.source is not None - and same_file(e.library_doc.source, entry.library_doc.source) + and same_file( + e.library_doc.source, + entry.library_doc.source, + ) + ) + or ( + e.library_doc.source is None + and entry.library_doc.source is None ) - or (e.library_doc.source is None and entry.library_doc.source is None) ) - and e.library_doc.member_name == entry.library_doc.member_name + and e.library_doc.member_name + == entry.library_doc.member_name and e.alias == entry.alias and e.args == entry.args ), @@ -1696,10 +3071,17 @@ def _import_imports( ) if ( already_imported_library is None - and (entry.alias or entry.name or entry.import_name) not in self._libraries + and (entry.alias or entry.name or entry.import_name) + not in self._libraries + ): + self._libraries[ + entry.alias or entry.name or entry.import_name + ] = entry + elif ( + top_level + and already_imported_library + and already_imported_library.library_doc.source ): - self._libraries[entry.alias or entry.name or entry.import_name] = entry - elif top_level and already_imported_library and already_imported_library.library_doc.source: self.append_diagnostics( range=entry.import_range, message=f'Library "{entry}" already imported.', @@ -1709,7 +3091,11 @@ def _import_imports( [ DiagnosticRelatedInformation( location=Location( - uri=str(Uri.from_path(already_imported_library.import_source)), + uri=str( + Uri.from_path( + already_imported_library.import_source + ) + ), range=already_imported_library.import_range, ), message="", @@ -1723,7 +3109,9 @@ def _import_imports( return variables - def _import_lib(self, library: str, variables: Optional[Dict[str, Any]] = None) -> Optional[LibraryEntry]: + def _import_lib( + self, library: str, variables: Optional[Dict[str, Any]] = None + ) -> Optional[LibraryEntry]: try: return self._get_library_entry( library, @@ -1745,8 +3133,13 @@ def _import_lib(self, library: str, variables: Optional[Dict[str, Any]] = None) ) return None - def _import_default_libraries(self, variables: Optional[Dict[str, Any]] = None) -> None: - with self._logger.measure_time(lambda: f"importing default libraries for {self.source}", context_name="import"): + def _import_default_libraries( + self, variables: Optional[Dict[str, Any]] = None + ) -> None: + with self._logger.measure_time( + lambda: f"importing default libraries for {self.source}", + context_name="import", + ): if variables is None: variables = self.get_suite_variables() @@ -1796,7 +3189,10 @@ def get_imported_library_libdoc( ( v.library_doc for e, v in self._import_entries.items() - if isinstance(e, LibraryImport) and v.import_name == name and v.args == args and v.alias == alias + if isinstance(e, LibraryImport) + and v.import_name == name + and v.args == args + and v.alias == alias ), None, ) @@ -1812,7 +3208,10 @@ def _get_resource_entry( if variables is None: variables = self.get_suite_variables() - (namespace, library_doc) = self.imports_manager.get_namespace_and_libdoc_for_resource_import( + ( + namespace, + library_doc, + ) = self.imports_manager.get_namespace_and_libdoc_for_resource_import( name, base_dir, sentinel=self, variables=variables ) @@ -1866,14 +3265,18 @@ def _get_variables_entry( ) @_logger.call - def get_variables_import_libdoc(self, name: str, args: Tuple[str, ...] = ()) -> Optional[LibraryDoc]: + def get_variables_import_libdoc( + self, name: str, args: Tuple[str, ...] = () + ) -> Optional[LibraryDoc]: self.ensure_initialized() return next( ( v.library_doc for e, v in self._import_entries.items() - if isinstance(e, VariablesImport) and v.import_name == name and v.args == args + if isinstance(e, VariablesImport) + and v.import_name == name + and v.args == args ), None, ) @@ -1960,7 +3363,9 @@ def analyze(self) -> None: self.ensure_initialized() - with self._logger.measure_time(lambda: f"analyzing document {self.source}", context_name="analyze"): + with self._logger.measure_time( + lambda: f"analyzing document {self.source}", context_name="analyze" + ): analyzer = NamespaceAnalyzer(self.model, self, self.create_finder()) try: @@ -1971,9 +3376,15 @@ def analyze(self) -> None: self._diagnostics += analyzer_result.diagnostics self._keyword_references = analyzer_result.keyword_references self._variable_references = analyzer_result.variable_references - self._local_variable_assignments = analyzer_result.local_variable_assignments - self._namespace_references = analyzer_result.namespace_references - self._test_case_definitions = analyzer_result.test_case_definitions + self._local_variable_assignments = ( + analyzer_result.local_variable_assignments + ) + self._namespace_references = ( + analyzer_result.namespace_references + ) + self._test_case_definitions = ( + analyzer_result.test_case_definitions + ) self._tag_definitions = analyzer_result.tag_definitions lib_doc = self.get_library_doc() @@ -1983,11 +3394,19 @@ def analyze(self) -> None: self.append_diagnostics( range=Range( start=Position( - line=((err.line_no - 1) if err.line_no is not None else 0), + line=( + (err.line_no - 1) + if err.line_no is not None + else 0 + ), character=0, ), end=Position( - line=((err.line_no - 1) if err.line_no is not None else 0), + line=( + (err.line_no - 1) + if err.line_no is not None + else 0 + ), character=0, ), ), @@ -2015,7 +3434,10 @@ def create_finder(self) -> "KeywordFinder": self.ensure_initialized() return KeywordFinder(self) - @_logger.call(condition=lambda self, name, **kwargs: self._finder is not None and name not in self._finder._cache) + @_logger.call( + condition=lambda self, name, **kwargs: self._finder is not None + and name not in self._finder._cache + ) def find_keyword( self, name: Optional[str], diff --git a/tests/robotcode/language_server/robotframework/parts/test_namespace_caching.py b/tests/robotcode/language_server/robotframework/parts/test_namespace_caching.py new file mode 100644 index 000000000..99ac87981 --- /dev/null +++ b/tests/robotcode/language_server/robotframework/parts/test_namespace_caching.py @@ -0,0 +1,247 @@ +"""Integration tests for namespace caching functionality.""" + +import pickle +from pathlib import Path + +import pytest + +from robotcode.language_server.robotframework.protocol import ( + RobotLanguageServerProtocol, +) + +# Cache directory relative to the test data root +DATA_ROOT = Path(__file__).parent / "data" +CACHE_DIR = DATA_ROOT / ".robotcode_cache" + + +class TestNamespaceCaching: + """Integration tests for namespace cache behavior.""" + + def test_cache_directory_created_after_analysis( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Cache directory is created after workspace analysis.""" + # Trigger analysis by accessing a document and its namespace + test_file = DATA_ROOT / "tests" / "hover.robot" + if not test_file.exists(): + pytest.skip("Test file not found") + + doc = protocol.documents.get_or_open_document(test_file, "robotframework") + ns = protocol.documents_cache.get_namespace(doc) + assert ns is not None, "Should have namespace" + + # After analysis, cache directory should be created + assert CACHE_DIR.exists(), "Cache directory should be created" + + def test_namespace_cache_files_created( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Namespace cache files are created for analyzed robot files.""" + # Trigger analysis first + test_file = DATA_ROOT / "tests" / "hover.robot" + if not test_file.exists(): + pytest.skip("Test file not found") + + doc = protocol.documents.get_or_open_document(test_file, "robotframework") + ns = protocol.documents_cache.get_namespace(doc) + assert ns is not None, "Should have namespace" + + # Look for namespace cache files + ns_cache_dirs = list(CACHE_DIR.glob("*/*/namespace")) + + assert len(ns_cache_dirs) > 0, "Should have namespace cache directories" + + # Check for cache files (either .cache.pkl single-file or legacy .meta.pkl/.spec.pkl) + cache_files: list[Path] = [] + for ns_dir in ns_cache_dirs: + cache_files.extend(ns_dir.glob("*.cache.pkl")) + cache_files.extend(ns_dir.glob("*.meta.pkl")) + + assert len(cache_files) > 0, "Should have namespace cache files" + + def test_cache_file_contains_valid_data( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Cache files contain valid pickled metadata and spec data.""" + ns_cache_dirs = list(CACHE_DIR.glob("*/*/namespace")) + if not ns_cache_dirs: + pytest.skip("No namespace cache directory found") + + # Find a cache file + cache_files = list(ns_cache_dirs[0].glob("*.cache.pkl")) + if not cache_files: + pytest.skip("No cache files found") + + # Verify it's valid pickle with expected structure + with open(cache_files[0], "rb") as f: + data = pickle.load(f) + + # Single-file format stores (meta, spec) tuple + assert isinstance(data, tuple), "Cache should be a tuple" + assert len(data) == 2, "Cache should have (meta, spec)" + + meta, _spec = data + # Verify metadata has required fields + assert hasattr(meta, "source"), "Meta should have source" + assert hasattr(meta, "mtime"), "Meta should have mtime" + assert hasattr(meta, "content_hash"), "Meta should have content_hash" + + def test_cache_metadata_tracks_environment( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Cache metadata includes Python environment tracking fields.""" + ns_cache_dirs = list(CACHE_DIR.glob("*/*/namespace")) + if not ns_cache_dirs: + pytest.skip("No namespace cache directory found") + + cache_files = list(ns_cache_dirs[0].glob("*.cache.pkl")) + if not cache_files: + pytest.skip("No cache files found") + + with open(cache_files[0], "rb") as f: + meta, _spec = pickle.load(f) + + # Environment tracking fields (for detecting venv changes) + assert hasattr(meta, "python_executable"), "Should track python_executable" + assert hasattr(meta, "sys_path_hash"), "Should track sys_path_hash" + assert hasattr(meta, "robot_version"), "Should track robot_version" + + def test_corrupt_cache_does_not_crash( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Corrupted cache files are handled gracefully without crashing.""" + ns_cache_dirs = list(CACHE_DIR.glob("*/*/namespace")) + if not ns_cache_dirs: + pytest.skip("No namespace cache directory found") + + # Create a corrupt cache file + corrupt_file = ns_cache_dirs[0] / "corrupt_test.cache.pkl" + corrupt_file.write_bytes(b"NOT VALID PICKLE DATA") + + try: + # Access a document - should not crash despite corrupt cache + test_file = DATA_ROOT / "tests" / "hover.robot" + if test_file.exists(): + doc = protocol.documents.get_or_open_document(test_file, "robotframework") + # Try to get namespace (triggers cache lookup) + ns = protocol.documents_cache.get_namespace(doc) + assert ns is not None, "Should get namespace despite corrupt sibling cache" + finally: + # Cleanup + if corrupt_file.exists(): + corrupt_file.unlink() + + def test_different_files_have_different_cache_keys( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Files in different directories have unique cache keys (no collisions).""" + ns_cache_dirs = list(CACHE_DIR.glob("*/*/namespace")) + if not ns_cache_dirs: + pytest.skip("No namespace cache directory found") + + # Check uniqueness within each RF version's namespace directory + # (different RF versions may have the same file names, which is expected) + for ns_dir in ns_cache_dirs: + cache_files = list(ns_dir.glob("*.cache.pkl")) + if len(cache_files) < 2: + continue + + # Within a single namespace directory, all cache file names should be unique + names = [f.name for f in cache_files] + assert len(names) == len(set(names)), f"Cache file names should be unique within {ns_dir}" + + +class TestCacheInvalidation: + """Tests for cache invalidation behavior.""" + + def test_namespace_available_for_document( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Namespace is available for documents after analysis.""" + test_file = DATA_ROOT / "tests" / "hover.robot" + if not test_file.exists(): + pytest.skip("Test file not found") + + doc = protocol.documents.get_or_open_document(test_file, "robotframework") + ns = protocol.documents_cache.get_namespace(doc) + + assert ns is not None, "Should have namespace for document" + assert ns.source is not None, "Namespace should have source" + + def test_namespace_has_source_and_imports( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Namespace contains source path and import information.""" + test_file = DATA_ROOT / "tests" / "hover.robot" + if not test_file.exists(): + pytest.skip("Test file not found") + + doc = protocol.documents.get_or_open_document(test_file, "robotframework") + ns = protocol.documents_cache.get_namespace(doc) + + assert ns is not None + assert ns.source is not None, "Namespace should have source path" + # Namespace should have libraries (at least BuiltIn is implicit) + assert hasattr(ns, "get_libraries"), "Namespace should support get_libraries" + + +class TestLibraryDocCaching: + """Tests for library documentation caching.""" + + def test_libdoc_cache_directory_exists( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Library documentation cache directory is created.""" + # Trigger analysis first by accessing a document that imports libraries + test_file = DATA_ROOT / "tests" / "hover.robot" + if not test_file.exists(): + pytest.skip("Test file not found") + + doc = protocol.documents.get_or_open_document(test_file, "robotframework") + ns = protocol.documents_cache.get_namespace(doc) + assert ns is not None, "Should have namespace" + + libdoc_dirs = list(CACHE_DIR.glob("*/*/libdoc")) + + # After analyzing files that import libraries, should have libdoc cache + assert len(libdoc_dirs) > 0, "Should have libdoc cache directories" + + def test_libdoc_cache_files_exist( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """Library documentation cache contains pickle files.""" + libdoc_dirs = list(CACHE_DIR.glob("*/*/libdoc")) + if not libdoc_dirs: + pytest.skip("No libdoc cache directory found") + + cache_files: list[Path] = [] + for libdoc_dir in libdoc_dirs: + cache_files.extend(libdoc_dir.glob("*.pkl")) + + assert len(cache_files) > 0, "Should have libdoc cache files" + + def test_builtin_library_is_cached( + self, + protocol: RobotLanguageServerProtocol, + ) -> None: + """BuiltIn library documentation is cached.""" + libdoc_dirs = list(CACHE_DIR.glob("*/*/libdoc")) + if not libdoc_dirs: + pytest.skip("No libdoc cache directory found") + + # Look for BuiltIn library cache (may be in subdirectory like robot/libraries/) + builtin_files: list[Path] = [] + for libdoc_dir in libdoc_dirs: + builtin_files.extend(libdoc_dir.glob("**/*BuiltIn*")) + + assert len(builtin_files) > 0, "BuiltIn library should be cached" diff --git a/tests/robotcode/robot/diagnostics/test_data_cache.py b/tests/robotcode/robot/diagnostics/test_data_cache.py new file mode 100644 index 000000000..827ab3653 --- /dev/null +++ b/tests/robotcode/robot/diagnostics/test_data_cache.py @@ -0,0 +1,257 @@ +"""Unit tests for data_cache.py - cache implementations.""" + +import pickle +from dataclasses import dataclass +from pathlib import Path + +import pytest + +from robotcode.robot.diagnostics.data_cache import ( + CacheSection, + JsonDataCache, + PickleDataCache, +) + + +@dataclass +class SampleData: + """Sample dataclass for testing serialization.""" + + name: str + value: int + + +class TestCacheSection: + """Tests for CacheSection enum.""" + + def test_cache_section_values(self) -> None: + """Verify CacheSection enum has expected values.""" + assert CacheSection.LIBRARY.value == "libdoc" + assert CacheSection.VARIABLES.value == "variables" + assert CacheSection.RESOURCE.value == "resource" + assert CacheSection.NAMESPACE.value == "namespace" + + +class TestPickleDataCache: + """Tests for PickleDataCache implementation.""" + + def test_init_creates_cache_directory(self, tmp_path: Path) -> None: + """Cache directory is created on initialization.""" + cache_dir = tmp_path / "cache" + assert not cache_dir.exists() + + PickleDataCache(cache_dir) + + assert cache_dir.exists() + assert (cache_dir / ".gitignore").exists() + + def test_init_with_existing_directory(self, tmp_path: Path) -> None: + """Initialization works with existing directory.""" + cache_dir = tmp_path / "cache" + cache_dir.mkdir(parents=True) + + cache = PickleDataCache(cache_dir) + + assert cache.cache_dir == cache_dir + + def test_build_cache_data_filename(self, tmp_path: Path) -> None: + """Filename is built correctly with section and entry name.""" + cache = PickleDataCache(tmp_path) + + path = cache.build_cache_data_filename(CacheSection.LIBRARY, "test_entry") + + assert path == tmp_path / "libdoc" / "test_entry.pkl" + + def test_cache_data_exists_returns_false_for_missing(self, tmp_path: Path) -> None: + """cache_data_exists returns False when file doesn't exist.""" + cache = PickleDataCache(tmp_path) + + assert cache.cache_data_exists(CacheSection.LIBRARY, "nonexistent") is False + + def test_cache_data_exists_returns_true_for_existing(self, tmp_path: Path) -> None: + """cache_data_exists returns True when file exists.""" + cache = PickleDataCache(tmp_path) + cache.save_cache_data(CacheSection.LIBRARY, "test", {"key": "value"}) + + assert cache.cache_data_exists(CacheSection.LIBRARY, "test") is True + + def test_save_and_read_cache_data_dict(self, tmp_path: Path) -> None: + """Save and read dictionary data correctly.""" + cache = PickleDataCache(tmp_path) + data = {"name": "test", "values": [1, 2, 3]} + + cache.save_cache_data(CacheSection.LIBRARY, "test", data) + result = cache.read_cache_data(CacheSection.LIBRARY, "test", dict) + + assert result == data + + def test_save_and_read_cache_data_dataclass(self, tmp_path: Path) -> None: + """Save and read dataclass correctly.""" + cache = PickleDataCache(tmp_path) + data = SampleData(name="test", value=42) + + cache.save_cache_data(CacheSection.NAMESPACE, "sample", data) + result = cache.read_cache_data(CacheSection.NAMESPACE, "sample", SampleData) + + assert result == data + + def test_read_cache_data_type_mismatch_raises_typeerror(self, tmp_path: Path) -> None: + """TypeError is raised when cached data doesn't match expected type.""" + cache = PickleDataCache(tmp_path) + cache.save_cache_data(CacheSection.LIBRARY, "test", {"key": "value"}) + + with pytest.raises(TypeError, match=r"Expected.*str.*got.*dict"): + cache.read_cache_data(CacheSection.LIBRARY, "test", str) + + def test_read_cache_data_accepts_tuple_of_types(self, tmp_path: Path) -> None: + """read_cache_data accepts a tuple of types for validation.""" + cache = PickleDataCache(tmp_path) + cache.save_cache_data(CacheSection.LIBRARY, "test", {"key": "value"}) + + result = cache.read_cache_data(CacheSection.LIBRARY, "test", (dict, list)) + + assert result == {"key": "value"} + + def test_read_cache_data_missing_file_raises_error(self, tmp_path: Path) -> None: + """FileNotFoundError is raised when cache file doesn't exist.""" + cache = PickleDataCache(tmp_path) + + with pytest.raises(FileNotFoundError): + cache.read_cache_data(CacheSection.LIBRARY, "nonexistent", dict) + + def test_save_creates_section_directory(self, tmp_path: Path) -> None: + """Section subdirectory is created when saving.""" + cache = PickleDataCache(tmp_path) + + cache.save_cache_data(CacheSection.VARIABLES, "test", {"data": 1}) + + assert (tmp_path / "variables").is_dir() + + def test_save_overwrites_existing_file(self, tmp_path: Path) -> None: + """Existing cache file is overwritten on save.""" + cache = PickleDataCache(tmp_path) + cache.save_cache_data(CacheSection.LIBRARY, "test", {"version": 1}) + cache.save_cache_data(CacheSection.LIBRARY, "test", {"version": 2}) + + result = cache.read_cache_data(CacheSection.LIBRARY, "test", dict) + + assert result == {"version": 2} + + def test_atomic_write_no_temp_files_left(self, tmp_path: Path) -> None: + """No temporary files are left after successful save.""" + cache = PickleDataCache(tmp_path) + cache.save_cache_data(CacheSection.LIBRARY, "test", {"data": 1}) + + section_dir = tmp_path / "libdoc" + files = list(section_dir.iterdir()) + + assert len(files) == 1 + assert files[0].suffix == ".pkl" + + def test_read_corrupt_pickle_raises_error(self, tmp_path: Path) -> None: + """UnpicklingError is raised when pickle data is corrupt.""" + cache = PickleDataCache(tmp_path) + cache_file = cache.build_cache_data_filename(CacheSection.LIBRARY, "corrupt") + cache_file.parent.mkdir(parents=True, exist_ok=True) + cache_file.write_bytes(b"not valid pickle data") + + with pytest.raises((pickle.UnpicklingError, EOFError)): + cache.read_cache_data(CacheSection.LIBRARY, "corrupt", dict) + + def test_different_sections_are_isolated(self, tmp_path: Path) -> None: + """Data in different sections doesn't interfere.""" + cache = PickleDataCache(tmp_path) + cache.save_cache_data(CacheSection.LIBRARY, "same_name", {"section": "library"}) + cache.save_cache_data(CacheSection.RESOURCE, "same_name", {"section": "resource"}) + + lib_data = cache.read_cache_data(CacheSection.LIBRARY, "same_name", dict) + res_data = cache.read_cache_data(CacheSection.RESOURCE, "same_name", dict) + + assert lib_data["section"] == "library" + assert res_data["section"] == "resource" + + +class TestJsonDataCache: + """Tests for JsonDataCache implementation.""" + + def test_build_cache_data_filename(self, tmp_path: Path) -> None: + """Filename uses .json extension.""" + cache = JsonDataCache(tmp_path) + + path = cache.build_cache_data_filename(CacheSection.LIBRARY, "test_entry") + + assert path == tmp_path / "libdoc" / "test_entry.json" + + def test_cache_data_exists(self, tmp_path: Path) -> None: + """cache_data_exists works for JSON cache.""" + cache = JsonDataCache(tmp_path) + + assert cache.cache_data_exists(CacheSection.LIBRARY, "test") is False + + cache.save_cache_data(CacheSection.LIBRARY, "test", {"key": "value"}) + + assert cache.cache_data_exists(CacheSection.LIBRARY, "test") is True + + def test_save_and_read_cache_data(self, tmp_path: Path) -> None: + """Save and read JSON data correctly.""" + cache = JsonDataCache(tmp_path) + data = {"name": "test", "values": [1, 2, 3]} + + cache.save_cache_data(CacheSection.LIBRARY, "test", data) + result = cache.read_cache_data(CacheSection.LIBRARY, "test", dict) + + assert result == data + + +class TestCacheEdgeCases: + """Edge case tests for cache implementations.""" + + @pytest.mark.parametrize( + "entry_name", + [ + "simple", + "with_underscore", + "with-dash", + "with.dots", + "nested/path/entry", + "unicode_日本語", + ], + ) + def test_various_entry_names(self, tmp_path: Path, entry_name: str) -> None: + """Cache handles various entry name formats.""" + cache = PickleDataCache(tmp_path) + data = {"entry": entry_name} + + cache.save_cache_data(CacheSection.LIBRARY, entry_name, data) + result = cache.read_cache_data(CacheSection.LIBRARY, entry_name, dict) + + assert result == data + + def test_large_data(self, tmp_path: Path) -> None: + """Cache handles large data objects.""" + cache = PickleDataCache(tmp_path) + # Create ~1MB of data + data = {"items": list(range(100000)), "text": "x" * 500000} + + cache.save_cache_data(CacheSection.NAMESPACE, "large", data) + result = cache.read_cache_data(CacheSection.NAMESPACE, "large", dict) + + assert result == data + + def test_none_value(self, tmp_path: Path) -> None: + """Cache handles None values.""" + cache = PickleDataCache(tmp_path) + + cache.save_cache_data(CacheSection.LIBRARY, "none_test", None) + result = cache.read_cache_data(CacheSection.LIBRARY, "none_test", type(None)) + + assert result is None + + def test_empty_dict(self, tmp_path: Path) -> None: + """Cache handles empty dictionaries.""" + cache = PickleDataCache(tmp_path) + + cache.save_cache_data(CacheSection.LIBRARY, "empty", {}) + result = cache.read_cache_data(CacheSection.LIBRARY, "empty", dict) + + assert result == {} diff --git a/tests/robotcode/robot/diagnostics/test_imports_manager_cache.py b/tests/robotcode/robot/diagnostics/test_imports_manager_cache.py new file mode 100644 index 000000000..8e2377117 --- /dev/null +++ b/tests/robotcode/robot/diagnostics/test_imports_manager_cache.py @@ -0,0 +1,182 @@ +"""Unit tests for imports_manager cache functionality.""" + +import zlib +from pathlib import Path + +import pytest + +from robotcode.robot.diagnostics.imports_manager import ( + RESOURCE_META_VERSION, + ResourceMetaData, +) + + +class TestResourceMetaData: + """Tests for ResourceMetaData dataclass.""" + + def test_create_metadata(self) -> None: + """ResourceMetaData can be created with all required fields.""" + meta = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/to/resource.resource", + mtime=1234567890123456789, + ) + + assert meta.meta_version == RESOURCE_META_VERSION + assert meta.source == "/path/to/resource.resource" + assert meta.mtime == 1234567890123456789 + + def test_filepath_base_property(self) -> None: + """filepath_base computes correct cache filename base.""" + source = "/home/user/project/resources/common.resource" + meta = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source=source, + mtime=0, + ) + + # Should be "adler32hash_stem" format + parent_path = str(Path(source).parent) + expected_hash = f"{zlib.adler32(parent_path.encode('utf-8')):08x}" + assert meta.filepath_base == f"{expected_hash}_common" + + def test_filepath_base_different_paths(self) -> None: + """filepath_base generates unique hashes for different parent directories.""" + meta1 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/a/resource.resource", + mtime=0, + ) + meta2 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/b/resource.resource", + mtime=0, + ) + + # Different parent dirs should produce different hashes + assert meta1.filepath_base != meta2.filepath_base + # But both end with the same stem + assert meta1.filepath_base.endswith("_resource") + assert meta2.filepath_base.endswith("_resource") + + def test_filepath_base_same_name_different_dirs(self) -> None: + """Same filename in different directories produces different cache keys.""" + meta1 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/project/tests/keywords.resource", + mtime=0, + ) + meta2 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/project/lib/keywords.resource", + mtime=0, + ) + + assert meta1.filepath_base != meta2.filepath_base + + def test_metadata_equality(self) -> None: + """ResourceMetaData instances are equal when all fields match.""" + meta1 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/to/resource.resource", + mtime=12345, + ) + meta2 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/to/resource.resource", + mtime=12345, + ) + + assert meta1 == meta2 + + def test_metadata_inequality_different_mtime(self) -> None: + """ResourceMetaData instances differ when mtime differs.""" + meta1 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/to/resource.resource", + mtime=12345, + ) + meta2 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/to/resource.resource", + mtime=67890, + ) + + assert meta1 != meta2 + + def test_metadata_inequality_different_source(self) -> None: + """ResourceMetaData instances differ when source differs.""" + meta1 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/a/resource.resource", + mtime=12345, + ) + meta2 = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/path/b/resource.resource", + mtime=12345, + ) + + assert meta1 != meta2 + + +class TestResourceMetaVersion: + """Tests for RESOURCE_META_VERSION constant.""" + + def test_meta_version_is_string(self) -> None: + """Meta version is a string.""" + assert isinstance(RESOURCE_META_VERSION, str) + assert len(RESOURCE_META_VERSION) > 0 + + def test_meta_version_value(self) -> None: + """Meta version has expected value.""" + assert RESOURCE_META_VERSION == "1" + + +class TestCacheKeyGeneration: + """Tests for cache key generation patterns.""" + + @pytest.mark.parametrize( + ("source", "expected_stem"), + [ + ("/path/to/test.resource", "_test"), + ("/path/to/common_keywords.resource", "_common_keywords"), + ("/path/to/My-Library.resource", "_My-Library"), + ("/path/日本語/テスト.resource", "_テスト"), + ], + ) + def test_cache_key_stem_extraction(self, source: str, expected_stem: str) -> None: + """Cache key correctly extracts filename stem.""" + meta = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source=source, + mtime=0, + ) + + assert meta.filepath_base.endswith(expected_stem) + + def test_cache_key_uses_adler32(self) -> None: + """Cache key uses zlib.adler32 for parent directory hash.""" + source = "/specific/path/to/resource.resource" + meta = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source=source, + mtime=0, + ) + + parent_path = str(Path(source).parent) + expected_hash = f"{zlib.adler32(parent_path.encode('utf-8')):08x}" + + assert meta.filepath_base.startswith(expected_hash) + + def test_cache_key_hash_length(self) -> None: + """Cache key hash portion is 8 hex characters (adler32).""" + meta = ResourceMetaData( + meta_version=RESOURCE_META_VERSION, + source="/any/path/file.resource", + mtime=0, + ) + + hash_part = meta.filepath_base.split("_")[0] + assert len(hash_part) == 8 + assert all(c in "0123456789abcdef" for c in hash_part) diff --git a/tests/robotcode/robot/diagnostics/test_namespace_cache.py b/tests/robotcode/robot/diagnostics/test_namespace_cache.py new file mode 100644 index 000000000..367cad94f --- /dev/null +++ b/tests/robotcode/robot/diagnostics/test_namespace_cache.py @@ -0,0 +1,372 @@ +"""Unit tests for namespace caching data classes and serialization.""" + +import hashlib +import zlib +from pathlib import Path + +import pytest + +from robotcode.core.lsp.types import Position, Range +from robotcode.robot.diagnostics.namespace import ( + NAMESPACE_META_VERSION, + CachedLibraryEntry, + CachedResourceEntry, + CachedVariablesEntry, + Namespace, + NamespaceCacheData, + NamespaceMetaData, +) + + +class TestNamespaceMetaData: + """Tests for NamespaceMetaData dataclass.""" + + def test_create_metadata(self) -> None: + """NamespaceMetaData can be created with all required fields.""" + meta = NamespaceMetaData( + meta_version=NAMESPACE_META_VERSION, + source="/path/to/test.robot", + mtime=1234567890123456789, + file_size=1024, + content_hash="abc123", + library_sources_mtimes=(("/path/lib.py", 111),), + resource_sources_mtimes=(("/path/res.resource", 222),), + variables_sources_mtimes=(("/path/vars.py", 333),), + robot_version="7.0", + python_executable="/usr/bin/python3", + sys_path_hash="def456", + ) + + assert meta.meta_version == NAMESPACE_META_VERSION + assert meta.source == "/path/to/test.robot" + assert meta.mtime == 1234567890123456789 + assert meta.file_size == 1024 + assert meta.content_hash == "abc123" + + def test_metadata_is_frozen(self) -> None: + """NamespaceMetaData is immutable.""" + meta = NamespaceMetaData( + meta_version=NAMESPACE_META_VERSION, + source="/path/to/test.robot", + mtime=1234567890, + file_size=100, + content_hash="abc", + library_sources_mtimes=(), + resource_sources_mtimes=(), + variables_sources_mtimes=(), + robot_version="7.0", + python_executable="/usr/bin/python3", + sys_path_hash="def", + ) + + with pytest.raises(AttributeError): + meta.source = "/other/path" # type: ignore[misc] + + def test_filepath_base_property(self) -> None: + """filepath_base computes correct cache filename base.""" + source = "/home/user/project/tests/test_example.robot" + meta = NamespaceMetaData( + meta_version=NAMESPACE_META_VERSION, + source=source, + mtime=1234567890, + file_size=100, + content_hash="abc", + library_sources_mtimes=(), + resource_sources_mtimes=(), + variables_sources_mtimes=(), + robot_version="7.0", + python_executable="/usr/bin/python3", + sys_path_hash="def", + ) + + # Should be "adler32hash_stem" format + parent_path = str(Path(source).parent) + expected_hash = f"{zlib.adler32(parent_path.encode('utf-8')):08x}" + assert meta.filepath_base == f"{expected_hash}_test_example" + + def test_filepath_base_with_different_paths(self) -> None: + """filepath_base generates unique hashes for different parent directories.""" + meta1 = NamespaceMetaData( + meta_version=NAMESPACE_META_VERSION, + source="/path/a/test.robot", + mtime=0, + file_size=0, + content_hash="", + library_sources_mtimes=(), + resource_sources_mtimes=(), + variables_sources_mtimes=(), + robot_version="7.0", + python_executable="", + sys_path_hash="", + ) + meta2 = NamespaceMetaData( + meta_version=NAMESPACE_META_VERSION, + source="/path/b/test.robot", + mtime=0, + file_size=0, + content_hash="", + library_sources_mtimes=(), + resource_sources_mtimes=(), + variables_sources_mtimes=(), + robot_version="7.0", + python_executable="", + sys_path_hash="", + ) + + # Different parent dirs should produce different hashes + assert meta1.filepath_base != meta2.filepath_base + # But both end with the same stem + assert meta1.filepath_base.endswith("_test") + assert meta2.filepath_base.endswith("_test") + + +class TestCachedEntryClasses: + """Tests for cached entry dataclasses.""" + + def test_cached_library_entry(self) -> None: + """CachedLibraryEntry can be created with all fields.""" + entry = CachedLibraryEntry( + name="Collections", + import_name="Collections", + library_doc_source="/path/to/collections.py", + args=(), + alias=None, + import_range=Range(start=Position(line=0, character=0), end=Position(line=0, character=11)), + import_source="/test.robot", + alias_range=Range.zero(), + ) + + assert entry.name == "Collections" + assert entry.import_name == "Collections" + assert entry.library_doc_source == "/path/to/collections.py" + + def test_cached_library_entry_with_alias(self) -> None: + """CachedLibraryEntry supports alias.""" + entry = CachedLibraryEntry( + name="MyAlias", + import_name="SomeLibrary", + library_doc_source="/path/to/lib.py", + args=("arg1", "arg2"), + alias="MyAlias", + import_range=Range.zero(), + import_source="/test.robot", + alias_range=Range(start=Position(line=0, character=20), end=Position(line=0, character=27)), + ) + + assert entry.alias == "MyAlias" + assert entry.args == ("arg1", "arg2") + + def test_cached_resource_entry(self) -> None: + """CachedResourceEntry includes imports and variables.""" + entry = CachedResourceEntry( + name="common", + import_name="resources/common.resource", + library_doc_source="/project/resources/common.resource", + args=(), + alias=None, + import_range=Range.zero(), + import_source="/test.robot", + alias_range=Range.zero(), + imports=(), + variables=(), + ) + + assert entry.name == "common" + assert entry.imports == () + assert entry.variables == () + + def test_cached_variables_entry(self) -> None: + """CachedVariablesEntry includes variables.""" + entry = CachedVariablesEntry( + name="vars", + import_name="variables.py", + library_doc_source="/project/variables.py", + args=(), + alias=None, + import_range=Range.zero(), + import_source="/test.robot", + alias_range=Range.zero(), + variables=(), + ) + + assert entry.name == "vars" + assert entry.variables == () + + def test_cached_entries_are_frozen(self) -> None: + """All cached entry types are immutable.""" + lib_entry = CachedLibraryEntry( + name="Test", + import_name="Test", + library_doc_source=None, + args=(), + alias=None, + import_range=Range.zero(), + import_source=None, + alias_range=Range.zero(), + ) + + with pytest.raises(AttributeError): + lib_entry.name = "Modified" # type: ignore[misc] + + +class TestNamespaceCacheData: + """Tests for NamespaceCacheData dataclass.""" + + def test_create_minimal_cache_data(self) -> None: + """NamespaceCacheData can be created with minimal data.""" + cache_data = NamespaceCacheData( + libraries=(), + resources=(), + resources_files=(), + variables_imports=(), + own_variables=(), + imports=(), + library_doc=None, + ) + + assert cache_data.libraries == () + assert cache_data.analyzed is False + assert cache_data.diagnostics == () + + def test_cache_data_with_analysis_results(self) -> None: + """NamespaceCacheData includes analysis data when analyzed=True.""" + cache_data = NamespaceCacheData( + libraries=(), + resources=(), + resources_files=(), + variables_imports=(), + own_variables=(), + imports=(), + library_doc=None, + analyzed=True, + diagnostics=(), + test_case_definitions=(), + tag_definitions=(), + namespace_references=(), + ) + + assert cache_data.analyzed is True + + def test_cache_data_is_frozen(self) -> None: + """NamespaceCacheData is immutable.""" + cache_data = NamespaceCacheData( + libraries=(), + resources=(), + resources_files=(), + variables_imports=(), + own_variables=(), + imports=(), + library_doc=None, + ) + + with pytest.raises(AttributeError): + cache_data.analyzed = True # type: ignore[misc] + + +class TestComputeContentHash: + """Tests for Namespace._compute_content_hash static method.""" + + def test_compute_hash_small_file(self, tmp_path: Path) -> None: + """Content hash is computed for small files (< 64KB).""" + test_file = tmp_path / "small.robot" + content = b"*** Test Cases ***\nTest\n Log Hello" + test_file.write_bytes(content) + + file_size, content_hash = Namespace._compute_content_hash(test_file) + + assert file_size == len(content) + assert len(content_hash) == 64 # SHA256 hex digest length + assert content_hash == hashlib.sha256(f"{len(content)}:".encode() + content).hexdigest() + + def test_compute_hash_large_file(self, tmp_path: Path) -> None: + """Content hash includes first and last 64KB for large files.""" + test_file = tmp_path / "large.robot" + # Create file > 64KB: 100KB of content + first_part = b"A" * 65536 + middle_part = b"B" * 20000 + last_part = b"C" * 65536 + content = first_part + middle_part + last_part + test_file.write_bytes(content) + + file_size, content_hash = Namespace._compute_content_hash(test_file) + + assert file_size == len(content) + # Verify hash includes size + first 64KB + last 64KB + expected_hasher = hashlib.sha256() + expected_hasher.update(f"{len(content)}:".encode()) + expected_hasher.update(first_part) + expected_hasher.update(content[-65536:]) # Last 64KB + assert content_hash == expected_hasher.hexdigest() + + def test_different_content_different_hash(self, tmp_path: Path) -> None: + """Different file content produces different hashes.""" + file1 = tmp_path / "file1.robot" + file2 = tmp_path / "file2.robot" + file1.write_bytes(b"Content A") + file2.write_bytes(b"Content B") + + _, hash1 = Namespace._compute_content_hash(file1) + _, hash2 = Namespace._compute_content_hash(file2) + + assert hash1 != hash2 + + def test_same_content_same_hash(self, tmp_path: Path) -> None: + """Same file content produces same hash.""" + file1 = tmp_path / "file1.robot" + file2 = tmp_path / "file2.robot" + content = b"Same content in both files" + file1.write_bytes(content) + file2.write_bytes(content) + + _, hash1 = Namespace._compute_content_hash(file1) + _, hash2 = Namespace._compute_content_hash(file2) + + assert hash1 == hash2 + + def test_append_detection(self, tmp_path: Path) -> None: + """Hash detects appended content (size change).""" + test_file = tmp_path / "test.robot" + original = b"Original content" + test_file.write_bytes(original) + size1, hash1 = Namespace._compute_content_hash(test_file) + + # Append content + test_file.write_bytes(original + b"\nAppended line") + size2, hash2 = Namespace._compute_content_hash(test_file) + + assert size2 > size1 + assert hash1 != hash2 + + def test_modification_detection(self, tmp_path: Path) -> None: + """Hash detects in-place modification (same size, different content).""" + test_file = tmp_path / "test.robot" + test_file.write_bytes(b"Original content here") + _, hash1 = Namespace._compute_content_hash(test_file) + + # Modify without changing size + test_file.write_bytes(b"Modified content here") + _, hash2 = Namespace._compute_content_hash(test_file) + + assert hash1 != hash2 + + def test_empty_file(self, tmp_path: Path) -> None: + """Hash handles empty files.""" + test_file = tmp_path / "empty.robot" + test_file.write_bytes(b"") + + file_size, content_hash = Namespace._compute_content_hash(test_file) + + assert file_size == 0 + assert len(content_hash) == 64 + + +class TestMetaVersion: + """Tests for namespace meta version constant.""" + + def test_meta_version_format(self) -> None: + """Meta version is a valid version string.""" + assert NAMESPACE_META_VERSION == "1.0" + # Verify it can be parsed as a version + parts = NAMESPACE_META_VERSION.split(".") + assert len(parts) == 2 + assert all(part.isdigit() for part in parts)