summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-05-17 14:23:13 +0100
committerErik Johnston <erik@matrix.org>2016-05-17 14:23:13 +0100
commit64df83606745f8f8567a0db5920cd27cc744b098 (patch)
tree451fc43559821e52c064d07c27aa74b80e82ffe9
parentChange token format (diff)
downloadsynapse-64df83606745f8f8567a0db5920cd27cc744b098.tar.xz
Correctly figure out which rooms we've sent down
-rw-r--r--synapse/handlers/sync.py53
1 files changed, 46 insertions, 7 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index 33c05950c5..a1a818e264 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -284,18 +284,29 @@ class SyncHandler(BaseHandler): room_pagination_config = { "l": pagination_limit, "o": 0, - "t": now_token, + "t": now_token.to_string(), } room_to_last_ts = yield self._get_room_timestamps_at_token( - room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN], + room_ids=[ + e.room_id for e in room_list if e.membership == Membership.JOIN + ], token=now_token, ) - joined_rooms_list = frozenset([ - room_id for room_id, _f in - sorted(room_to_last_ts.items(), key=lambda item: -item[1]) - ][:pagination_limit]) + if room_to_last_ts: + sorted_list = sorted( + room_to_last_ts.items(), + key=lambda item: -item[1] + )[:pagination_limit] + + _, bottom_ts = sorted_list[-1] + room_pagination_config["ts"] = bottom_ts + + joined_rooms_list = frozenset(room_id for room_id, _f in sorted_list) + else: + room_pagination_config = {} + joined_rooms_list = frozenset() @defer.inlineCallbacks def _generate_room_entry(event): @@ -501,6 +512,18 @@ class SyncHandler(BaseHandler): ) @defer.inlineCallbacks + def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts): + start_ts = yield self._get_room_timestamps_at_token(room_ids, since_token) + + missing_list = frozenset( + room_id for room_id, ts in + sorted(start_ts.items(), key=lambda item: -item[1]) + if ts < pa_ts + ) + + defer.returnValue(missing_list) + + @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, batch_token): """ Get the incremental delta needed to bring the client up to date with the server. @@ -508,6 +531,7 @@ class SyncHandler(BaseHandler): A Deferred SyncResult. """ since_token = batch_token.stream_token + room_pagination_config = batch_token.pagination_config now_token = yield self.event_sources.get_current_token() @@ -638,18 +662,30 @@ class SyncHandler(BaseHandler): limit=timeline_limit + 1, ) + p_room_token = room_pagination_config.get("t", None) + if p_room_token: + needing_full_state = yield self._get_rooms_that_need_full_state( + joined_room_ids, + since_token, + room_pagination_config.get("ts", 0), + ) + else: + needing_full_state = set() + joined = [] # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. for room_id in joined_room_ids: room_entry = room_to_events.get(room_id, None) + need_full_state = room_id in needing_full_state + if room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace("room_key", start_key) - newly_joined_room = room_id in newly_joined_rooms + newly_joined_room = (room_id in newly_joined_rooms) or need_full_state full_state = newly_joined_room batch = yield self.load_filtered_recents( @@ -659,6 +695,9 @@ class SyncHandler(BaseHandler): newly_joined_room=newly_joined_room, ) else: + if need_full_state: + continue + batch = TimelineBatch( events=[], prev_batch=since_token,