Skip to content

Commit 677b9fd

Browse files
Merge pull request #144 from wfcommons/streamflow-ro-crate
Enhancements to the Streamflow/RO-Crate Tracer
2 parents 8e693c0 + 3c3ce86 commit 677b9fd

3 files changed

Lines changed: 37 additions & 31 deletions

File tree

tests/translators_loggers/Dockerfile.streamflow

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ RUN apt-get -y install gcc-multilib
2626
RUN apt-get -y install graphviz libgraphviz-dev
2727
RUN apt-get -y install zip
2828

29-
30-
3129
# Python stuff
3230
RUN apt-get -y install python3 python3-pip
3331
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
@@ -42,6 +40,8 @@ RUN apt-get -y install stress-ng
4240
# Streamflow
4341
RUN apt-get -y install nodejs
4442
RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev14
43+
# For now, the above hasnt' been released yet, so we get a commit tag
44+
#RUN git clone https://github.com/alpha-unito/streamflow.git && cd streamflow && git checkout 2295eda80ad37214d6f607f75f542b9eca03e121 && python3 -m pip install --break-system-packages .
4545

4646
# Add wfcommons user
4747
RUN useradd -ms /bin/bash wfcommons

tests/translators_loggers/test_translators_loggers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,8 @@ def test_translator(self, backend) -> None:
337337
elif backend == "streamflow":
338338
parser = ROCrateLogsParser(dirpath / "RO-Crate",
339339
steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"],
340-
file_extensions_to_ignore=[".out", ".err"])
340+
file_extensions_to_ignore=[".out", ".err"],
341+
instruments_to_ignore=["shell.cwl"])
341342

342343
if parser is not None:
343344
sys.stderr.write(f"[{backend}] Parsing the logs...\n")

wfcommons/wfinstances/logs/ro_crate.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,27 @@ class ROCrateLogsParser(LogsParser):
3939
:type description: Optional[str]
4040
:param logger: The logger where to log information/warning or errors (optional).
4141
:type logger: Optional[Logger]
42+
:param steps_to_ignore: Names of CWL steps that should be ignored in the translation
43+
:type steps_to_ignore: Optional[list[str]]
44+
:param file_extensions_to_ignore: File extensions that should be ignored in the translation
45+
:type file_extensions_to_ignore: Optional[list[str]]
46+
:param instruments_to_ignore: Names of instruments that should be ignored in the translation
47+
:type instruments_to_ignore: Optional[list[str]]
4248
"""
4349

4450
def __init__(self,
4551
crate_dir: pathlib.Path,
4652
description: Optional[str] = None,
4753
logger: Optional[Logger] = None,
48-
steps_to_ignore: Optional[list[str]]=[],
49-
file_extensions_to_ignore: Optional[list[str]]=[],
54+
steps_to_ignore: Optional[list[str]] = None,
55+
file_extensions_to_ignore: Optional[list[str]] = None,
56+
instruments_to_ignore: Optional[list[str]] = None,
5057
) -> None:
5158
"""Create an object of the RO crate parser."""
5259

53-
# TODO: Decide if these should be RO crate or Streamflow or whatev
5460
super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger)
5561

5662
# Sanity check
57-
if steps_to_ignore is None:
58-
steps_to_ignore = []
5963
if not crate_dir.is_dir():
6064
raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}')
6165

@@ -71,8 +75,9 @@ def __init__(self,
7175
self.task_id_name_map: dict[str, str] = {}
7276
self.data_file_id_name_map: dict[str, str] = {}
7377

74-
self.steps_to_ignore = steps_to_ignore
75-
self.file_extensions_to_ignore = file_extensions_to_ignore
78+
self.steps_to_ignore : list[str] = steps_to_ignore or []
79+
self.file_extensions_to_ignore : list[str] = file_extensions_to_ignore or []
80+
self.instruments_to_ignore : list[str] = instruments_to_ignore or []
7681

7782

7883
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
@@ -204,27 +209,27 @@ def _add_dependencies(self, files, instruments):
204209
for child in file.get('in', []):
205210
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
206211

207-
# THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS
208-
# (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files
209-
# parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
210-
# for parameter_connection in parameter_connections:
211-
# # parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
212-
# # which is bad design but whatever
213-
# source_parameters = parameter_connection["sourceParameter"]
214-
# if not isinstance(source_parameters, list):
215-
# source_parameters = [source_parameters]
216-
# for item in source_parameters:
217-
# source = item["@id"]
218-
# source = source.rsplit("#", 1)[0] # Trim to get instrument
219-
#
220-
# target = parameter_connection["targetParameter"]["@id"]
221-
# target = target.rsplit("#", 1)[0] # Trim to get instrument
222-
# print("source", source, "----> target", target)
223-
#
224-
# for parent in instruments.get(source, []):
225-
# for child in instruments.get(target, []):
226-
# self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
227-
212+
parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
213+
for parameter_connection in parameter_connections:
214+
# parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
215+
# which is bad design but whatever
216+
source_parameters = parameter_connection["sourceParameter"]
217+
if not isinstance(source_parameters, list):
218+
source_parameters = [source_parameters]
219+
for item in source_parameters:
220+
source = item["@id"]
221+
source = source.rsplit("#", 1)[0] # Trim to get instrument
222+
223+
target = parameter_connection["targetParameter"]["@id"]
224+
target = target.rsplit("#", 1)[0] # Trim to get instrument
225+
226+
if source in self.instruments_to_ignore or target in self.instruments_to_ignore:
227+
continue
228+
# print("source", source, "----> target", target)
229+
230+
for parent in instruments.get(source, []):
231+
for child in instruments.get(target, []):
232+
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])
228233

229234
def _time_diff(self, start_time, end_time):
230235
diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)

0 commit comments

Comments
 (0)