diff options
author | Mark Haines <mark.haines@matrix.org> | 2015-01-30 11:32:35 +0000 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2015-01-30 11:32:35 +0000 |
commit | 22dd1cde2d83a2448074816108b85d1957315236 (patch) | |
tree | eb684887203602a9b000ddfe7d02224f0fa6ecce /synapse/handlers/sync.py | |
parent | Update todo for the filtering on sync (diff) | |
download | synapse-22dd1cde2d83a2448074816108b85d1957315236.tar.xz |
Filter the recent events before applying the limit when doing an incremental sync with a gap
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 53 |
1 files changed, 38 insertions, 15 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5768702192..0df1851b0e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -279,6 +279,40 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks + def load_filtered_recents(self, room_id, sync_config, since_token, + now_token): + limited = True + recents = [] + filtering_factor = 2 + load_limit = max(sync_config.limit * filtering_factor, 100) + max_repeat = 3 # Only try a few times per room, otherwise + room_key = now_token.room_key + + while limited and len(recents) < sync_config.limit and max_repeat: + events, room_key = yield self.store.get_recent_events_for_room( + room_id, + limit=load_limit + 1, + from_token=since_token.room_key, + end_token=room_key, + ) + loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents.extend(recents) + recents = loaded_recents + if len(events) <= load_limit: + limited = False + max_repeat -= 1 + + if len(recents) > sync_config.limit: + recents = recents[-sync_config.limit:] + room_key = recents[0].internal_metadata.before + + prev_batch_token = now_token.copy_and_replace( + "room_key", room_key + ) + + defer.returnValue((recents, prev_batch_token, limited)) + + @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, published_room_ids, typing_by_room): @@ -288,28 +322,17 @@ class SyncHandler(BaseHandler): Returns: A Deferred RoomSyncResult """ + # TODO(mjark): Check if they have joined the room between # the previous sync and this one. - # TODO(mjark): Apply the event filter in sync_config taking care to get - # enough events to reach the limit # TODO(mjark): Check for redactions we might have missed. - recents, token = yield self.store.get_recent_events_for_room( - room_id, - limit=sync_config.limit + 1, - from_token=since_token.room_key, - end_token=now_token.room_key, + + recents, prev_batch_token, limited = self.load_filtered_recents( + room_id, sync_config, since_token, ) logging.debug("Recents %r", recents) - if len(recents) > sync_config.limit: - limited = True - recents = recents[1:] - else: - limited = False - - prev_batch_token = now_token.copy_and_replace("room_key", token[0]) - # TODO(mjark): This seems racy since this isn't being passed a # token to indicate what point in the stream this is current_state_events = yield self.state_handler.get_current_state( |