|
21 | 21 | import logging |
22 | 22 | import os |
23 | 23 | import sdclientapi |
24 | | -import shutil |
25 | 24 | import traceback |
26 | 25 | import uuid |
27 | 26 |
|
28 | 27 | from gettext import gettext as _ |
29 | | -from PyQt5.QtCore import QObject, QThread, pyqtSignal, QTimer, QProcess |
| 28 | +from PyQt5.QtCore import QObject, QThread, pyqtSignal, QTimer, QProcess, Qt |
30 | 29 | from sdclientapi import RequestTimeoutError |
31 | | -from typing import Dict, Tuple # noqa: F401 |
32 | 30 | from sqlalchemy.orm.session import sessionmaker |
| 31 | +from typing import Dict, Tuple, Union, Any, Type # noqa: F401 |
33 | 32 |
|
34 | 33 | from securedrop_client import storage |
35 | 34 | from securedrop_client import db |
36 | | -from securedrop_client.utils import check_dir_permissions |
37 | 35 | from securedrop_client.crypto import GpgHelper, CryptoError |
38 | 36 | from securedrop_client.message_sync import MessageSync, ReplySync |
| 37 | +from securedrop_client.queue import ApiJobQueue, DownloadSubmissionJob |
| 38 | +from securedrop_client.utils import check_dir_permissions |
39 | 39 |
|
40 | 40 | logger = logging.getLogger(__name__) |
41 | 41 |
|
@@ -150,6 +150,9 @@ def __init__(self, hostname: str, gui, session_maker: sessionmaker, |
150 | 150 | self.session_maker = session_maker |
151 | 151 | self.session = session_maker() |
152 | 152 |
|
| 153 | + # Queue that handles running API job |
| 154 | + self.api_job_queue = ApiJobQueue(self.api, self.session_maker) |
| 155 | + |
153 | 156 | # Contains active threads calling the API. |
154 | 157 | self.api_threads = {} # type: Dict[str, Dict] |
155 | 158 |
|
@@ -311,9 +314,12 @@ def on_authenticate_success(self, result): |
311 | 314 | self.gui.hide_login() |
312 | 315 | self.sync_api() |
313 | 316 | self.gui.show_main_window(self.api.username) |
| 317 | + |
314 | 318 | self.start_message_thread() |
315 | 319 | self.start_reply_thread() |
316 | 320 |
|
| 321 | + self.api_job_queue.start_queues(self.api) |
| 322 | + |
317 | 323 | # Clear the sidebar error status bar if a message was shown |
318 | 324 | # to the user indicating they should log in. |
319 | 325 | self.gui.clear_error_status() |
@@ -512,79 +518,57 @@ def on_file_open(self, file_db_object): |
512 | 518 | # Non Qubes OS. Just log the event for now. |
513 | 519 | logger.info('Opening file "{}".'.format(submission_filepath)) |
514 | 520 |
|
515 | | - def on_file_download(self, source_db_object, message): |
| 521 | + def on_reply_download(self, source_db_object: db.Source, reply: db.Reply) -> None: |
516 | 522 | """ |
517 | | - Download the file associated with the associated message (which may |
518 | | - be a Submission or Reply). |
| 523 | + Download the file associated with the Reply. |
519 | 524 | """ |
520 | 525 | if not self.api: # Then we should tell the user they need to login. |
521 | 526 | self.on_action_requiring_login() |
522 | 527 | return |
523 | 528 |
|
524 | | - if isinstance(message, db.File) or isinstance(message, db.Message): |
525 | | - # Handle submissions. |
526 | | - func = self.api.download_submission |
527 | | - sdk_object = sdclientapi.Submission(uuid=message.uuid) |
528 | | - sdk_object.filename = message.filename |
529 | | - sdk_object.source_uuid = source_db_object.uuid |
530 | | - elif isinstance(message, db.Reply): |
531 | | - # Handle journalist's replies. |
532 | | - func = self.api.download_reply |
533 | | - sdk_object = sdclientapi.Reply(uuid=message.uuid) |
534 | | - sdk_object.filename = message.filename |
535 | | - sdk_object.source_uuid = source_db_object.uuid |
| 529 | + sdk_object = sdclientapi.Reply(uuid=reply.uuid) |
| 530 | + sdk_object.filename = reply.filename |
| 531 | + sdk_object.source_uuid = source_db_object.uuid |
536 | 532 |
|
537 | 533 | self.set_status(_('Downloading {}'.format(sdk_object.filename))) |
538 | | - self.call_api(func, |
| 534 | + |
| 535 | + self.call_api(self.api.download_reply, |
539 | 536 | self.on_file_download_success, |
540 | 537 | self.on_file_download_failure, |
541 | 538 | sdk_object, |
542 | 539 | self.data_dir, |
543 | | - current_object=message) |
| 540 | + current_object=reply) |
544 | 541 |
|
545 | | - def on_file_download_success(self, result, current_object): |
| 542 | + def on_submission_download( |
| 543 | + self, |
| 544 | + submission_type: Union[Type[db.File], Type[db.Message]], |
| 545 | + submission_uuid: str, |
| 546 | + ) -> None: |
546 | 547 | """ |
547 | | - Called when a file has downloaded. Cause a refresh to the conversation view to display the |
548 | | - contents of the new file. |
| 548 | + Download the file associated with the Submission (which may be a File or Message). |
549 | 549 | """ |
550 | | - file_uuid = current_object.uuid |
551 | | - server_filename = current_object.filename |
552 | | - _, filename = result |
553 | | - # The filename contains the location where the file has been |
554 | | - # stored. On non-Qubes OSes, this will be the data directory. |
555 | | - # On Qubes OS, this will a ~/QubesIncoming directory. In case |
556 | | - # we are on Qubes, we should move the file to the data directory |
557 | | - # and name it the same as the server (e.g. spotless-tater-msg.gpg). |
558 | | - filepath_in_datadir = os.path.join(self.data_dir, server_filename) |
559 | | - shutil.move(filename, filepath_in_datadir) |
560 | | - storage.mark_file_as_downloaded(file_uuid, self.session) |
| 550 | + job = DownloadSubmissionJob( |
| 551 | + submission_type, |
| 552 | + submission_uuid, |
| 553 | + self.data_dir, |
| 554 | + self.gpg, |
| 555 | + ) |
| 556 | + job.success_signal.connect(self.on_file_download_success, type=Qt.QueuedConnection) |
| 557 | + job.failure_signal.connect(self.on_file_download_failure, type=Qt.QueuedConnection) |
561 | 558 |
|
562 | | - try: |
563 | | - # Attempt to decrypt the file. |
564 | | - self.gpg.decrypt_submission_or_reply( |
565 | | - filepath_in_datadir, server_filename, is_doc=True) |
566 | | - storage.set_object_decryption_status_with_content( |
567 | | - current_object, self.session, True) |
568 | | - except CryptoError as e: |
569 | | - logger.debug('Failed to decrypt file {}: {}'.format(server_filename, e)) |
570 | | - storage.set_object_decryption_status_with_content( |
571 | | - current_object, self.session, False) |
572 | | - self.set_status("Failed to decrypt file, " |
573 | | - "please try again or talk to your administrator.") |
574 | | - # TODO: We should save the downloaded content, and just |
575 | | - # try to decrypt again if there was a failure. |
576 | | - return # If we failed we should stop here. |
577 | | - |
578 | | - self.set_status('Finished downloading {}'.format(current_object.filename)) |
579 | | - self.file_ready.emit(file_uuid) |
580 | | - |
581 | | - def on_file_download_failure(self, result, current_object): |
| 559 | + self.api_job_queue.enqueue(job) |
| 560 | + self.set_status(_('Downloading file')) |
| 561 | + |
| 562 | + def on_file_download_success(self, result: Any) -> None: |
| 563 | + """ |
| 564 | + Called when a file has downloaded. |
| 565 | + """ |
| 566 | + self.file_ready.emit(result) |
| 567 | + |
| 568 | + def on_file_download_failure(self, exception: Exception) -> None: |
582 | 569 | """ |
583 | 570 | Called when a file fails to download. |
584 | 571 | """ |
585 | | - server_filename = current_object.filename |
586 | | - logger.debug('Failed to download file {}'.format(server_filename)) |
587 | | - # Update the UI in some way to indicate a failure state. |
588 | 572 | self.set_status("The file download failed. Please try again.") |
589 | 573 |
|
590 | 574 | def on_delete_source_success(self, result) -> None: |
|
0 commit comments