Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
EnvironmentVariableDefinition,
GlobalVariableDefinition,
LibraryArgumentDefinition,
VariableDefinition,
)
from robotcode.robot.diagnostics.library_doc import LibraryDoc
from robotcode.robot.diagnostics.library_doc import KeywordDoc, LibraryDoc
from robotcode.robot.diagnostics.namespace import Namespace

from ...common.parts.diagnostics import DiagnosticsCollectType, DiagnosticsResult
Expand Down Expand Up @@ -54,20 +55,18 @@ def _on_initialized(self, sender: Any) -> None:
self.parent.documents_cache.variables_changed.add(self._on_variables_changed)

def _on_libraries_changed(self, sender: Any, libraries: List[LibraryDoc]) -> None:
for doc in self.parent.documents.documents:
namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if namespace is not None:
lib_docs = (e.library_doc for e in namespace.get_libraries().values())
if any(lib_doc in lib_docs for lib_doc in libraries):
self.parent.diagnostics.force_refresh_document(doc)
docs_to_refresh: set[TextDocument] = set()
for lib_doc in libraries:
docs_to_refresh.update(self.parent.documents_cache.get_library_users(lib_doc))
for doc in docs_to_refresh:
self.parent.diagnostics.force_refresh_document(doc)

def _on_variables_changed(self, sender: Any, variables: List[LibraryDoc]) -> None:
for doc in self.parent.documents.documents:
namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if namespace is not None:
lib_docs = (e.library_doc for e in namespace.get_variables_imports().values())
if any(lib_doc in lib_docs for lib_doc in variables):
self.parent.diagnostics.force_refresh_document(doc)
docs_to_refresh: set[TextDocument] = set()
for var_doc in variables:
docs_to_refresh.update(self.parent.documents_cache.get_variables_users(var_doc))
for doc in docs_to_refresh:
self.parent.diagnostics.force_refresh_document(doc)

@language_id("robotframework")
def analyze(self, sender: Any, document: TextDocument) -> None:
Expand All @@ -83,37 +82,25 @@ def _on_get_related_documents(self, sender: Any, document: TextDocument) -> Opti
namespace = self.parent.documents_cache.get_only_initialized_namespace(document)
if namespace is None:
return None

result = []

lib_doc = namespace.get_library_doc()
for doc in self.parent.documents.documents:
if doc.language_id != "robotframework":
continue

doc_namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if doc_namespace is None:
continue

if doc_namespace.is_analyzed():
for ref in doc_namespace.get_namespace_references():
if ref.library_doc == lib_doc:
result.append(doc)

return result
source = str(document.uri.to_path())
return self.parent.documents_cache.get_importers(source)

def modify_diagnostics(self, document: TextDocument, diagnostics: List[Diagnostic]) -> List[Diagnostic]:
return self.parent.documents_cache.get_diagnostic_modifier(document).modify_diagnostics(diagnostics)

@language_id("robotframework")
def collect_namespace_diagnostics(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
try:
namespace = self.parent.documents_cache.get_namespace(document)

return DiagnosticsResult(
self.collect_namespace_diagnostics, self.modify_diagnostics(document, namespace.get_diagnostics())
self.collect_namespace_diagnostics,
self.modify_diagnostics(document, namespace.get_diagnostics()),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
Expand All @@ -138,10 +125,47 @@ def collect_namespace_diagnostics(
],
)

def _is_keyword_used_anywhere(
self,
document: TextDocument,
kw: KeywordDoc,
namespace: Namespace,
) -> bool:
"""Check if keyword is used anywhere, using index with safe fallback."""
if self.parent.documents_cache.get_keyword_ref_users(kw):
return True

if namespace.get_keyword_references().get(kw):
return True

# Safe fallback: workspace scan if index might be incomplete
refs = self.parent.robot_references.find_keyword_references(document, kw, False, True)
return bool(refs)

def _is_variable_used_anywhere(
self,
document: TextDocument,
var: VariableDefinition,
namespace: Namespace,
) -> bool:
"""Check if variable is used anywhere, using index with safe fallback."""
if self.parent.documents_cache.get_variable_ref_users(var):
return True

if namespace.get_variable_references().get(var):
return True

# Safe fallback: workspace scan if index might be incomplete
refs = self.parent.robot_references.find_variable_references(document, var, False, True)
return bool(refs)

@language_id("robotframework")
@_logger.call
def collect_unused_keyword_references(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri)

Expand All @@ -161,8 +185,7 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
for kw in (namespace.get_library_doc()).keywords.values():
check_current_task_canceled()

references = self.parent.robot_references.find_keyword_references(document, kw, False, True)
if not references:
if not self._is_keyword_used_anywhere(document, kw, namespace):
result.append(
Diagnostic(
range=kw.name_range,
Expand All @@ -174,7 +197,10 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
)
)

return DiagnosticsResult(self.collect_unused_keyword_references, self.modify_diagnostics(document, result))
return DiagnosticsResult(
self.collect_unused_keyword_references,
self.modify_diagnostics(document, result),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
Expand All @@ -200,7 +226,10 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
@language_id("robotframework")
@_logger.call
def collect_unused_variable_references(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri)

Expand All @@ -222,15 +251,19 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos
check_current_task_canceled()

if isinstance(
var, (LibraryArgumentDefinition, EnvironmentVariableDefinition, GlobalVariableDefinition)
var,
(
LibraryArgumentDefinition,
EnvironmentVariableDefinition,
GlobalVariableDefinition,
),
):
continue

if var.name_token is not None and var.name_token.value and var.name_token.value.startswith("_"):
continue

references = self.parent.robot_references.find_variable_references(document, var, False, True)
if not references:
if not self._is_variable_used_anywhere(document, var, namespace):
result.append(
Diagnostic(
range=var.name_range,
Expand All @@ -243,7 +276,10 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos
)
)

return DiagnosticsResult(self.collect_unused_variable_references, self.modify_diagnostics(document, result))
return DiagnosticsResult(
self.collect_unused_variable_references,
self.modify_diagnostics(document, result),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,30 @@ def _find_variable_references(
include_declaration: bool = True,
stop_at_first: bool = False,
) -> List[Location]:
result = []
result: List[Location] = []

if include_declaration and variable.source:
result.append(Location(str(Uri.from_path(variable.source)), variable.name_range))

if variable.type == VariableDefinitionType.LOCAL_VARIABLE:
result.extend(self.find_variable_references_in_file(document, variable, False))
else:
result.extend(
self._find_references_in_workspace(
document,
stop_at_first,
self.find_variable_references_in_file,
variable,
False,
# Use reverse index for lookup instead of workspace scan
docs_to_search = self.parent.documents_cache.get_variable_ref_users(variable)
if docs_to_search:
for doc in docs_to_search:
check_current_task_canceled()
result.extend(self.find_variable_references_in_file(doc, variable, False))
if result and stop_at_first:
break
else:
# Fallback to workspace scan if index is empty
result.extend(
self._find_references_in_workspace(
document, stop_at_first, self.find_variable_references_in_file, variable, False
)
)
)

return result

@_logger.call
Expand Down Expand Up @@ -317,20 +324,26 @@ def _find_keyword_references(
include_declaration: bool = True,
stop_at_first: bool = False,
) -> List[Location]:
result = []
result: List[Location] = []

if include_declaration and kw_doc.source:
result.append(Location(str(Uri.from_path(kw_doc.source)), kw_doc.range))

result.extend(
self._find_references_in_workspace(
document,
stop_at_first,
self.find_keyword_references_in_file,
kw_doc,
False,
# Use reverse index for lookup instead of workspace scan
docs_to_search = self.parent.documents_cache.get_keyword_ref_users(kw_doc)
if docs_to_search:
for doc in docs_to_search:
check_current_task_canceled()
result.extend(self.find_keyword_references_in_file(doc, kw_doc, False))
if result and stop_at_first:
break
else:
# Fallback to workspace scan if index is empty
result.extend(
self._find_references_in_workspace(
document, stop_at_first, self.find_keyword_references_in_file, kw_doc, False
)
)
)

return result

Expand Down
26 changes: 24 additions & 2 deletions packages/robot/src/robotcode/robot/diagnostics/data_cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import pickle
import tempfile
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
Expand All @@ -12,6 +14,8 @@
class CacheSection(Enum):
LIBRARY = "libdoc"
VARIABLES = "variables"
RESOURCE = "resource"
NAMESPACE = "namespace"


class DataCache(ABC):
Expand Down Expand Up @@ -85,5 +89,23 @@ def save_cache_data(self, section: CacheSection, entry_name: str, data: Any) ->
cached_file = self.build_cache_data_filename(section, entry_name)

cached_file.parent.mkdir(parents=True, exist_ok=True)
with cached_file.open("wb") as f:
pickle.dump(data, f)

# Atomic write: write to temp file, then rename
# This ensures readers never see partial/corrupt data
temp_fd, temp_path = tempfile.mkstemp(
dir=cached_file.parent,
prefix=cached_file.stem + "_",
suffix=".tmp",
)
try:
with os.fdopen(temp_fd, "wb") as f:
pickle.dump(data, f)
# Atomic rename (POSIX guarantees atomicity; Windows may fail if target exists)
Path(temp_path).replace(cached_file)
except Exception:
# Clean up temp file on failure (temp file may be left behind on SystemExit/KeyboardInterrupt)
try:
os.unlink(temp_path)
except OSError:
pass
raise
Loading