Skip to content

Commit 66e9ad6

Browse files
committed
WIP
1 parent 9f84cf0 commit 66e9ad6

File tree

3 files changed

+78
-27
lines changed

3 files changed

+78
-27
lines changed

securedrop_client/gui/widgets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1339,7 +1339,7 @@ def mouseReleaseEvent(self, e):
13391339
self.controller.on_file_open(self.submission)
13401340
else:
13411341
# Download the file.
1342-
self.controller.on_file_download(self.source, self.submission)
1342+
self.controller.on_submission_download(self.source, self.submission)
13431343

13441344

13451345
class ConversationView(QWidget):

securedrop_client/logic.py

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@
2828
from gettext import gettext as _
2929
from PyQt5.QtCore import QObject, QThread, pyqtSignal, QTimer, QProcess
3030
from sdclientapi import RequestTimeoutError
31-
from typing import Dict, Tuple # noqa: F401
31+
from typing import Dict, Tuple, Union # noqa: F401
3232

3333
from securedrop_client import storage
3434
from securedrop_client import db
35-
from securedrop_client.utils import check_dir_permissions
3635
from securedrop_client.crypto import GpgHelper, CryptoError
3736
from securedrop_client.message_sync import MessageSync, ReplySync
37+
from securedrop_client.queue import ApiJobQueue, DownloadSubmissionJob
38+
from securedrop_client.utils import check_dir_permissions
3839

3940
logger = logging.getLogger(__name__)
4041

@@ -144,6 +145,10 @@ def __init__(self, hostname, gui, session,
144145

145146
# Reference to the API for secure drop proxy.
146147
self.api = None # type: sdclientapi.API
148+
149+
# Queue that handles running API job
150+
self.api_job_queue = ApiJobQueue(self.api, self)
151+
147152
# Contains active threads calling the API.
148153
self.api_threads = {} # type: Dict[str, Dict]
149154

@@ -310,6 +315,7 @@ def on_authenticate_success(self, result):
310315
self.gui.show_main_window(self.api.username)
311316
self.start_message_thread()
312317
self.start_reply_thread()
318+
self.api_job_queue.start_queues() # TODO <------------------- this is wrong somehow?
313319

314320
# Clear the sidebar error status bar if a message was shown
315321
# to the user indicating they should log in.
@@ -508,35 +514,47 @@ def on_file_open(self, file_db_object):
508514
# Non Qubes OS. Just log the event for now.
509515
logger.info('Opening file "{}".'.format(submission_filepath))
510516

511-
def on_file_download(self, source_db_object, message):
517+
def on_reply_download(self, source_db_object: db.Source, reply: db.Reply) -> None:
512518
"""
513-
Download the file associated with the associated message (which may
514-
be a Submission or Reply).
519+
Download the file associated with the Reply.
515520
"""
516521
if not self.api: # Then we should tell the user they need to login.
517522
self.on_action_requiring_login()
518523
return
519524

520-
if isinstance(message, db.File) or isinstance(message, db.Message):
521-
# Handle submissions.
522-
func = self.api.download_submission
523-
sdk_object = sdclientapi.Submission(uuid=message.uuid)
524-
sdk_object.filename = message.filename
525-
sdk_object.source_uuid = source_db_object.uuid
526-
elif isinstance(message, db.Reply):
527-
# Handle journalist's replies.
528-
func = self.api.download_reply
529-
sdk_object = sdclientapi.Reply(uuid=message.uuid)
530-
sdk_object.filename = message.filename
531-
sdk_object.source_uuid = source_db_object.uuid
525+
sdk_object = sdclientapi.Reply(uuid=reply.uuid)
526+
sdk_object.filename = reply.filename
527+
sdk_object.source_uuid = source_db_object.uuid
532528

533529
self.set_status(_('Downloading {}'.format(sdk_object.filename)))
534-
self.call_api(func,
530+
531+
self.call_api(self.api.download_reply,
535532
self.on_file_download_success,
536533
self.on_file_download_failure,
537534
sdk_object,
538535
self.data_dir,
539-
current_object=message)
536+
current_object=reply)
537+
538+
def on_submission_download(
539+
self,
540+
source_db_object: db.Source,
541+
submission: Union[db.File, db.Message],
542+
) -> None:
543+
"""
544+
Download the file associated with the Submission (which may be a File or Message).
545+
"""
546+
print('on sub')
547+
if not self.api: # Then we should tell the user they need to login.
548+
self.on_action_requiring_login()
549+
return
550+
551+
sdk_object = sdclientapi.Submission(uuid=submission.uuid)
552+
sdk_object.filename = submission.filename
553+
sdk_object.source_uuid = source_db_object.uuid
554+
555+
job = DownloadSubmissionJob(sdk_object, self.data_dir, submission)
556+
self.api_job_queue.enqueue(job)
557+
self.set_status(_('Downloading {}'.format(sdk_object.filename)))
540558

541559
def on_file_download_success(self, result, current_object):
542560
"""

securedrop_client/queue.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1+
import logging
2+
import sdclientapi
3+
14
from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtBoundSignal
25
from queue import Queue
36
from sdclientapi import API, RequestTimeoutError
4-
from typing import Any, Optional
7+
from typing import Any, Optional, Union
8+
9+
from securedrop_client.db import File, Message
10+
11+
12+
logger = logging.getLogger(__name__)
513

614

715
class ApiJob:
@@ -14,8 +22,10 @@ def _do_call_api(self, api_client: API) -> None:
1422
try:
1523
result = self.call_api(api_client, self.nargs, self.kwargs)
1624
except RequestTimeoutError:
25+
logger.debug('Job {} timed out'.format(self))
1726
raise
1827
except Exception as e:
28+
logger.error('Job {} raised an exception: {}'.format(self, e))
1929
self.handle_failure(e)
2030
else:
2131
self.handle_success(result)
@@ -30,11 +40,26 @@ def handle_failure(self, exception: Exception) -> None:
3040
raise NotImplementedError
3141

3242

33-
class DownloadFileJob(ApiJob):
43+
class DownloadSubmissionJob(ApiJob):
44+
45+
def __init__(
46+
self,
47+
submission: sdclientapi.Submission,
48+
data_dir: str,
49+
db_object: Union[File, Message],
50+
) -> None:
51+
super().__init__([submission, data_dir], {})
52+
self.__db_object = db_object
3453

3554
def call_api(self, api_client: API, nargs: list, kwargs: dict) -> Any:
3655
return api_client.download_submission(*nargs, **kwargs)
3756

57+
def handle_success(self, result: Any) -> None:
58+
print('success', result)
59+
60+
def handle_failure(self, exception: Exception) -> None:
61+
print('fail', exception)
62+
3863

3964
class RunnableQueue(QObject):
4065

@@ -43,6 +68,7 @@ def __init__(self, api_client: API, halt_signal: pyqtBoundSignal) -> None:
4368
self.run = True
4469
self.api_client = api_client
4570
self.queue = Queue() # type: Queue[ApiJob]
71+
self.last_job = None # type: Optional[ApiJob]
4672

4773
self.halt_signal = halt_signal
4874
halt_signal.connect(self.stop)
@@ -51,14 +77,18 @@ def stop(self) -> None:
5177
self.run = False
5278

5379
def __call__(self, loop: bool = True) -> None:
80+
print('running')
5481
while self.run:
55-
job = self.queue.get(block=True) # type: ApiJob
82+
# retry the "cached" job if it exists, otherwise get the next job
83+
job = self.last_job or self.queue.get(block=True)
84+
self.last_job = None
5685

5786
try:
5887
job._do_call_api(self.api_client)
5988
except RequestTimeoutError:
6089
self.run = False
6190
self.halt_signal.emit() # notify other threads of failure
91+
self.last_job = job # "cache" the last job since we can't re-queue it
6292
return
6393

6494
if not loop:
@@ -69,7 +99,7 @@ class ApiJobQueue(QObject):
6999

70100
'''
71101
Signal used to notify different job threads that they should halt. This is pub/sub like signal
72-
in that any threat may trigger it, and all threads listen to it.
102+
in that any job queues may trigger it, and all job queues listen to it.
73103
'''
74104
halt_signal = pyqtSignal()
75105

@@ -90,11 +120,14 @@ def start_queues(self) -> None:
90120
self.main_queue.moveToThread(main_thread)
91121
self.download_queue.moveToThread(download_thread)
92122

93-
main_thread.run()
94-
download_thread.run()
123+
main_thread.started.connect(self.main_queue)
124+
download_thread.started.connect(self.download_queue)
125+
126+
main_thread.start()
127+
download_thread.start()
95128

96129
def enqueue(self, job: ApiJob) -> None:
97-
if isinstance(job, DownloadFileJob):
130+
if isinstance(job, DownloadSubmissionJob):
98131
self.download_queue.queue.put_nowait(job)
99132
else:
100133
self.main_queue.queue.put_nowait(job)

0 commit comments

Comments
 (0)