Skip to content

Commit 0582b91

Browse files
committed
Add per channel and per user max server_rev tracking.
1 parent 178815e commit 0582b91

5 files changed

Lines changed: 76 additions & 33 deletions

File tree

contentcuration/contentcuration/frontend/shared/data/constants.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,5 @@ export const CURRENT_USER = 'CURRENT_USER';
6262
export const ACTIVE_CHANNELS = 'ACTIVE_CHANNELS';
6363

6464
export const CHANNEL_SYNC_KEEP_ALIVE_INTERVAL = 300 * 1000;
65+
66+
export const MAX_REV_KEY = 'max_rev';

contentcuration/contentcuration/frontend/shared/data/resources.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
CURRENT_USER,
2727
ACTIVE_CHANNELS,
2828
CHANNEL_SYNC_KEEP_ALIVE_INTERVAL,
29+
MAX_REV_KEY,
2930
} from './constants';
3031
import applyChanges, { applyMods, collectChanges } from './applyRemoteChanges';
3132
import mergeAllChanges from './mergeChanges';
@@ -889,10 +890,14 @@ export const Session = new IndexedDBResource({
889890
? this.updateSession({ [`${ACTIVE_CHANNELS}.${channelId}`]: Date.now() })
890891
: null;
891892
},
892-
setChannelScope() {
893+
async setChannelScope() {
893894
const channelId = (window.CHANNEL_EDIT_GLOBAL || {}).channel_id || null;
894895
if (channelId) {
895896
channelScope.id = channelId;
897+
const channelRev = (window.CHANNEL_EDIT_GLOBAL || {}).channel_rev || 0;
898+
await this.updateSession({
899+
[`${MAX_REV_KEY}.${window.CHANNEL_EDIT_GLOBAL.channel_id}`]: channelRev,
900+
});
896901
this.channelSyncKeepAlive(channelId);
897902
setInterval(() => this.channelSyncKeepAlive(channelId), CHANNEL_SYNC_KEEP_ALIVE_INTERVAL);
898903
window.addEventListener('focus', () => this.channelSyncKeepAlive(channelId));

contentcuration/contentcuration/frontend/shared/data/serverSync.js

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import debounce from 'lodash/debounce';
22
import get from 'lodash/get';
33
import pick from 'lodash/pick';
4+
import orderBy from 'lodash/orderBy';
5+
import uniq from 'lodash/uniq';
46
import applyChanges from './applyRemoteChanges';
57
import { hasActiveLocks, cleanupLocks } from './changes';
68
import {
@@ -10,6 +12,7 @@ import {
1012
IGNORED_SOURCE,
1113
CHANNEL_SYNC_KEEP_ALIVE_INTERVAL,
1214
ACTIVE_CHANNELS,
15+
MAX_REV_KEY,
1316
} from './constants';
1417
import db from './db';
1518
import mergeAllChanges from './mergeChanges';
@@ -22,6 +25,10 @@ import urls from 'shared/urls';
2225
// change being registered, sync changes!
2326
const SYNC_IF_NO_CHANGES_FOR = 2;
2427

28+
// When this many seconds pass, repoll the backend to
29+
// check for any updates to active channels, or the user and sync any current changes
30+
const SYNC_POLL_INTERVAL = 5;
31+
2532
// Flag to check if a sync is currently active.
2633
let syncActive = false;
2734

@@ -142,15 +149,32 @@ function handleSuccesses(response) {
142149
const successes = get(response, ['data', 'successes'], []);
143150
if (successes.length) {
144151
return db[CHANGES_TABLE].where('server_rev')
145-
.anyOf(successes)
152+
.anyOf(successes.map(c => c.server_rev))
146153
.delete();
147154
}
148155
return Promise.resolve();
149156
}
150157

151-
function handleMaxRev(response) {
152-
const max_rev = response.data.max_rev;
153-
return Session.updateSession({ max_rev });
158+
function handleMaxRevs(response, userId) {
159+
const allChanges = orderBy(
160+
get(response, ['data', 'changes'], [])
161+
.concat(get(response, ['data', 'errors'], []))
162+
.concat(get(response, ['data', 'successes'], [])),
163+
'server_rev',
164+
'desc'
165+
);
166+
const channelIds = uniq(allChanges.map(c => c.channel_id)).filter(Boolean);
167+
const maxRevs = {};
168+
for (let channelId of channelIds) {
169+
maxRevs[`${MAX_REV_KEY}.${channelId}`] = allChanges.find(
170+
c => c.channel_id === channelId
171+
).server_rev;
172+
}
173+
const lastUserChange = allChanges.find(c => c.user_id === userId);
174+
if (lastUserChange) {
175+
maxRevs.user_rev = lastUserChange.server_rev;
176+
}
177+
return Session.updateSession(maxRevs);
154178
}
155179

156180
async function syncChanges() {
@@ -169,26 +193,29 @@ async function syncChanges() {
169193
// might have come in during processing - leave them for the next cycle.
170194
// This is the primary key of the change objects, so the collection is ordered by this
171195
// by default - if we just grab the last object, we can get the key from there.
172-
const [lastChange, earliestServerChange, user] = await Promise.all([
196+
const [lastChange, user] = await Promise.all([
173197
db[CHANGES_TABLE].orderBy('rev').last(),
174-
db[CHANGES_TABLE].orderBy('server_rev').first(),
175198
Session.getSession(),
176199
]);
177200
if (!user) {
178201
// If not logged in, nothing to do.
179202
return;
180203
}
204+
181205
const now = Date.now();
182-
const currentUser = await Session.getSession();
183-
const channel_ids = Object.entries(currentUser[ACTIVE_CHANNELS])
206+
const channel_ids = Object.entries(user[ACTIVE_CHANNELS])
184207
.filter(([id, time]) => id && time > now - CHANNEL_SYNC_KEEP_ALIVE_INTERVAL)
185208
.map(([id]) => id);
209+
const channel_revs = {};
210+
console.log(user);
211+
for (let channelId of channel_ids) {
212+
channel_revs[channelId] = get(user, [MAX_REV_KEY, channelId], 0);
213+
}
214+
186215
const requestPayload = {
187216
changes: [],
188-
channel_ids,
189-
// Last rev to send to the server is either the earliest change we are still seeking
190-
// confirmation on, or the current max_rev that we have synced to the frontend.
191-
last_rev: (earliestServerChange && earliestServerChange.server_rev) || user.max_rev,
217+
channel_revs,
218+
user_rev: user.user_rev || 0,
192219
};
193220

194221
if (lastChange) {
@@ -216,7 +243,7 @@ async function syncChanges() {
216243
// "allowed": [],
217244
// "changes": [],
218245
// "errors": [],
219-
// "successess": [],
246+
// "successes": [],
220247
// }
221248
const response = await client.post(urls['sync'](), requestPayload);
222249
try {
@@ -226,7 +253,7 @@ async function syncChanges() {
226253
handleReturnedChanges(response),
227254
handleErrors(response),
228255
handleSuccesses(response),
229-
handleMaxRev(response),
256+
handleMaxRevs(response, user.id),
230257
]);
231258
} catch (err) {
232259
console.error('There was an error updating change status', err); // eslint-disable-line no-console
@@ -279,15 +306,20 @@ async function handleChanges(changes) {
279306
}
280307
}
281308

309+
let intervalTimer;
310+
282311
export function startSyncing() {
283312
cleanupLocks();
284313
// Initiate a sync immediately in case any data
285314
// is left over in the database.
286315
debouncedSyncChanges();
316+
// Start the sync interval
317+
intervalTimer = setInterval(debouncedSyncChanges, SYNC_POLL_INTERVAL * 1000);
287318
db.on('changes', handleChanges);
288319
}
289320

290321
export function stopSyncing() {
322+
clearInterval(intervalTimer);
291323
debouncedSyncChanges.cancel();
292324
// Dexie's slightly counterintuitive method for unsubscribing from events
293325
db.on('changes').unsubscribe(handleChanges);

contentcuration/contentcuration/views/base.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def current_user_for_context(user):
7777

7878
user_data = {field: getattr(user, field) for field in user_fields}
7979

80-
user_data["max_rev"] = Change.objects.filter(applied=True).values_list("server_rev", flat=True).order_by("-server_rev").first() or 0
80+
user_data["user_rev"] = Change.objects.filter(applied=True, user=user).values_list("server_rev", flat=True).order_by("-server_rev").first() or 0
8181

8282
return json_for_parse_from_data(user_data)
8383

@@ -218,6 +218,7 @@ def accounts(request):
218218
@permission_classes((IsAuthenticated,))
219219
def channel(request, channel_id):
220220
channel_error = ""
221+
channel_rev = 0
221222

222223
# Check if channel exists
223224
try:
@@ -231,13 +232,15 @@ def channel(request, channel_id):
231232
# an option to restore the channel in the Administration page
232233
if channel.deleted:
233234
channel_error = 'CHANNEL_EDIT_ERROR_CHANNEL_DELETED'
235+
else:
236+
channel_rev = Change.objects.filter(applied=True, channel=channel).values_list("server_rev", flat=True).order_by("-server_rev").first() or 0
234237

235238
return render(
236239
request,
237240
"channel_edit.html",
238241
{
239242
CHANNEL_EDIT_GLOBAL: json_for_parse_from_data(
240-
{"channel_id": channel_id, "channel_error": channel_error}
243+
{"channel_id": channel_id, "channel_error": channel_error, "channel_rev": channel_rev}
241244
),
242245
CURRENT_USER: current_user_for_context(request.user),
243246
PREFERENCES: json_for_parse_from_data(request.user.content_defaults),

contentcuration/contentcuration/viewsets/sync/endpoint.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
def change_model_values_to_change_dict(c):
1818
datum = c["kwargs"]
19-
datum.update({"server_rev": c["server_rev"], "table": c["table"], "type": c["change_type"]})
19+
datum.update({"server_rev": c["server_rev"], "table": c["table"], "type": c["change_type"], "channel_id": c["channel_id"], "user_id": c["user_id"]})
2020
return datum
2121

2222

@@ -51,30 +51,33 @@ def handle_changes(self, request):
5151
if user_only_changes:
5252
get_or_create_async_task("apply_user_changes", request.user, user_id=request.user.id)
5353
for channel_id in allowed_ids:
54-
print(channel_id)
55-
task_info = get_or_create_async_task("apply_channel_changes", request.user, channel_id=channel_id)
56-
print(task_info.status)
54+
get_or_create_async_task("apply_channel_changes", request.user, channel_id=channel_id)
5755
allowed_changes = [{"rev": c.client_rev, "server_rev": c.server_rev} for c in change_models]
5856

5957
return {"disallowed": disallowed_changes, "allowed": allowed_changes}
6058
return {}
6159

6260
def return_changes(self, request):
63-
channel_ids = request.data.get("channel_ids", [])
64-
last_rev = request.data.get("last_rev") or 0
61+
channel_revs = request.data.get("channel_revs", {})
62+
user_rev = request.data.get("user_rev") or 0
6563
session_key = request.session.session_key
66-
if channel_ids:
67-
channel_ids = Channel.filter_view_queryset(Channel.objects.all(), request.user).filter(id__in=channel_ids).values_list("id", flat=True)
64+
if channel_revs:
65+
# Filter to only the channels that the user has permissions to view.
66+
channel_ids = Channel.filter_view_queryset(Channel.objects.all(), request.user).filter(id__in=channel_revs.keys()).values_list("id", flat=True)
67+
channel_revs = {channel_id: channel_revs[channel_id] for channel_id in channel_ids}
6868

69-
change_filter = (Q(session_id=session_key) & (Q(errored=True) | Q(applied=True))) | Q(user=request.user, applied=True)
69+
# Create a filter that returns all applied changes, and any errored changes made by this session
70+
relevant_to_session_filter = (Q(applied=True) | Q(errored=True, session_id=session_key))
7071

71-
if channel_ids:
72-
change_filter |= Q(channel_id__in=channel_ids, applied=True)
72+
change_filter = (Q(user=request.user, server_rev__gt=user_rev) & relevant_to_session_filter)
73+
74+
for channel_id, rev in channel_revs.items():
75+
change_filter |= (Q(channel_id=channel_id, server_rev__gt=rev) & relevant_to_session_filter)
7376

7477
changes_to_return = list(
7578
Change.objects.filter(
76-
server_rev__gt=last_rev
77-
).filter(change_filter).values("server_rev", "session_id", "applied", "errored", "table", "change_type", "kwargs")
79+
change_filter
80+
).values("server_rev", "session_id", "channel_id", "user_id", "applied", "errored", "table", "change_type", "kwargs")
7881
)
7982

8083
if not changes_to_return:
@@ -87,7 +90,7 @@ def return_changes(self, request):
8790
for c in changes_to_return:
8891
if c["applied"]:
8992
if c["session_id"] == session_key:
90-
successes.append(c["server_rev"])
93+
successes.append(change_model_values_to_change_dict(c))
9194
else:
9295
changes.append(change_model_values_to_change_dict(c))
9396
if c["errored"] and c["session_id"] == session_key:
@@ -108,6 +111,4 @@ def post(self, request):
108111

109112
response_payload.update(self.return_changes(request))
110113

111-
response_payload["max_rev"] = Change.objects.filter(applied=True).values_list("server_rev", flat=True).order_by("-server_rev").first() or 0
112-
113114
return Response(response_payload)

0 commit comments

Comments
 (0)