summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-05-18 11:28:26 +0100
committerErik Johnston <erik@matrix.org>2016-05-18 11:28:26 +0100
commitb999adcaa21f1d51b87e2198d9f5f4c98d027f90 (patch)
tree39703893a963d18ff21aeeb79dd0d9d029a560f6 /synapse/handlers/sync.py
parentCall get_last_ts less (diff)
downloadsynapse-b999adcaa21f1d51b87e2198d9f5f4c98d027f90.tar.xz
Filter before ordering
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py74
1 files changed, 66 insertions, 8 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index 497744b9af..426a93b8dd 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -202,19 +202,64 @@ class SyncHandler(BaseHandler): return self.incremental_sync_with_gap(sync_config, batch_token) @defer.inlineCallbacks - def _get_room_timestamps_at_token(self, room_ids, token): - room_to_last_ts = {} + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, + limit): + room_to_entries = {} @defer.inlineCallbacks def _get_last_ts(room_id): - ts = yield self.store.get_last_ts_for_room( + entry = yield self.store.get_last_ts_for_room( room_id, token.room_key ) - room_to_last_ts[room_id] = ts if ts else 0 - logger.info("room_to_last_ts: %r", room_to_last_ts) + # TODO: Is this ever possible? + room_to_entries[room_id] = entry if entry else { + "origin_server_ts": 0, + } + yield concurrently_execute(_get_last_ts, room_ids, 10) - defer.returnValue(room_to_last_ts) + + if len(room_to_entries) <= limit: + defer.returnValue({ + room_id: entry["origin_server_ts"] + for room_id, entry in room_to_entries.items() + }) + + queued_events = sorted( + room_to_entries.items(), + key=lambda e: e[1]["origin_server_ts"] + ) + + to_return = {} + + while len(to_return) < limit and len(queued_events) > 0: + to_fetch = queued_events[:limit - len(to_return)] + event_to_q = { + e["event_id"]: (room_id, e) for room_id, e in to_fetch.items() + if "event_id" in e + } + + # Now we fetch each event to check if its been filtered out + event_map = yield self.store.get_events(event_to_q.keys()) + + recents = sync_config.filter_collection.filter_room_timeline( + event_map.values() + ) + recents = yield filter_events_for_client( + self.store, + sync_config.user.to_string(), + recents, + ) + + to_return.update({r.room_id: r.origin_server_ts for r in recents}) + + for ev_id in set(event_map.keys()) - set(r.event_id for r in recents): + queued_events.append(event_to_q[ev_id]) + + # FIXME: Need to refetch TS + queued_events.sort(key=lambda e: e[1]["origin_server_ts"]) + + defer.returnValue(to_return) @defer.inlineCallbacks def full_state_sync(self, sync_config, batch_token): @@ -292,6 +337,8 @@ class SyncHandler(BaseHandler): e.room_id for e in room_list if e.membership == Membership.JOIN ], token=now_token, + sync_config=sync_config, + limit=pagination_limit, ) if room_to_last_ts: @@ -512,8 +559,13 @@ class SyncHandler(BaseHandler): ) @defer.inlineCallbacks - def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts): - start_ts = yield self._get_room_timestamps_at_token(room_ids, since_token) + def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts, + sync_config, pagination_limit): + start_ts = yield self._get_room_timestamps_at_token( + room_ids, since_token, + sync_config=sync_config, + limit=pagination_limit, + ) missing_list = frozenset( room_id for room_id, ts in @@ -675,10 +727,13 @@ class SyncHandler(BaseHandler): p_room_token = room_pagination_config.get("t", None) if p_room_token: + pa_limit = room_pagination_config["l"] needing_full_state = yield self._get_rooms_that_need_full_state( [room_id], since_token, room_pagination_config.get("ts", 0), + sync_config=sync_config, + pagination_limit=pa_limit, ) need_full_state = room_id in needing_full_state else: @@ -716,10 +771,13 @@ class SyncHandler(BaseHandler): if not room_sync.timeline: p_room_token = room_pagination_config.get("t", None) if p_room_token: + pa_limit = room_pagination_config["l"] needing_full_state = yield self._get_rooms_that_need_full_state( [room_id], since_token, room_pagination_config.get("ts", 0), + sync_config=sync_config, + pagination_limit=pa_limit, ) if room_id in needing_full_state: continue