summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py92
1 files changed, 59 insertions, 33 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index a880845605..fef81f5f9b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -49,6 +49,12 @@ SYNC_PAGINATION_ORDER_TS = "o" SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) +SyncExtras = collections.namedtuple("SyncExtras", [ + "paginate", + "rooms", +]) + + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -152,7 +158,7 @@ class SyncHandler(object): self.response_cache = ResponseCache() def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, - full_state=False): + full_state=False, extras=None): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. @@ -164,14 +170,14 @@ class SyncHandler(object): result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, batch_token, timeout, full_state + sync_config, batch_token, timeout, full_state, extras, ) ) return result @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, - full_state): + full_state, extras=None): context = LoggingContext.current_context() if context: if batch_token is None: @@ -184,13 +190,15 @@ class SyncHandler(object): if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = yield self.current_sync_for_user( - sync_config, batch_token, full_state=full_state, + result = yield self.generate_sync_result( + sync_config, batch_token, full_state=full_state, extras=extras, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, batch_token) + return self.generate_sync_result( + sync_config, batch_token, full_state=False, extras=extras, + ) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, @@ -198,14 +206,6 @@ class SyncHandler(object): ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, batch_token=None, - full_state=False): - """Get the sync for client needed to match what the server has now. - Returns: - A Deferred SyncResult. - """ - return self.generate_sync_result(sync_config, batch_token, full_state) - @defer.inlineCallbacks def push_rules_for_user(self, user): user_id = user.to_string() @@ -502,7 +502,8 @@ class SyncHandler(object): defer.returnValue(None) @defer.inlineCallbacks - def generate_sync_result(self, sync_config, batch_token=None, full_state=False): + def generate_sync_result(self, sync_config, batch_token=None, full_state=False, + extras=None): """Generates a sync result. Args: @@ -531,7 +532,7 @@ class SyncHandler(object): ) res = yield self._generate_sync_entry_for_rooms( - sync_result_builder, account_data_by_room + sync_result_builder, account_data_by_room, extras, ) newly_joined_rooms, newly_joined_users = res @@ -658,7 +659,8 @@ class SyncHandler(object): sync_result_builder.presence = presence @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room, + extras): """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. @@ -713,6 +715,8 @@ class SyncHandler(object): else: pagination_config = None + include_map = extras.get("peek", {}) if extras else {} + if sync_result_builder.pagination_state: missing_state = yield self._get_rooms_that_need_full_state( room_ids=[r.room_id for r in room_entries], @@ -725,34 +729,54 @@ class SyncHandler(object): for r in room_entries: if r.room_id in missing_state: r.full_state = True + if r.room_id in include_map: + r.always_include = True + r.events = None + r.since_token = None + r.upto_token = now_token - new_pagination_state = None if pagination_config: room_ids = [r.room_id for r in room_entries] pagination_limit = pagination_config.limit + extra_limit = extras.get("paginate", {}).get("limit", 0) if extras else 0 + room_map = yield self._get_room_timestamps_at_token( - room_ids, sync_result_builder.now_token, sync_config, pagination_limit + room_ids, sync_result_builder.now_token, sync_config, + pagination_limit + extra_limit, ) if room_map: sorted_list = sorted( room_map.items(), key=lambda item: -item[1] - )[:pagination_limit] + )[:pagination_limit + extra_limit] + + if sorted_list[pagination_limit:]: + new_room_ids = set(r[0] for r in sorted_list[pagination_limit:]) + for r in room_entries: + if r.room_id in new_room_ids: + r.full_state = True + r.always_include = True + r.since_token = None + r.upto_token = now_token + r.events = None _, bottom_ts = sorted_list[-1] value = bottom_ts - new_pagination_state = SyncPaginationState( - order=pagination_config.order, value=value, limit=pagination_limit, + sync_result_builder.pagination_state = SyncPaginationState( + order=pagination_config.order, value=value, + limit=pagination_limit + extra_limit, ) - else: - new_pagination_state = None - room_entries = [r for r in room_entries if r.room_id in room_map] + if len(room_map) == len(room_entries): + sync_result_builder.pagination_state = None - sync_result_builder.pagination_state = new_pagination_state + room_entries = [ + r for r in room_entries + if r.room_id in room_map or r.always_include + ] sync_result_builder.full_state |= sync_result_builder.since_token is None @@ -764,7 +788,6 @@ class SyncHandler(object): ephemeral=ephemeral_by_room.get(room_entry.room_id, []), tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builder.full_state, ) yield concurrently_execute(handle_room_entries, room_entries, 10) @@ -995,8 +1018,7 @@ class SyncHandler(object): @defer.inlineCallbacks def _generate_room_entry(self, sync_result_builder, ignored_users, - room_builder, ephemeral, tags, account_data, - always_include=False): + room_builder, ephemeral, tags, account_data): """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. @@ -1012,7 +1034,11 @@ class SyncHandler(object): even if empty. """ newly_joined = room_builder.newly_joined - always_include = always_include or newly_joined or sync_result_builder.full_state + always_include = ( + newly_joined + or sync_result_builder.full_state + or room_builder.always_include + ) full_state = ( room_builder.full_state or newly_joined @@ -1025,7 +1051,6 @@ class SyncHandler(object): if events == [] and tags is None: return - since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -1166,7 +1191,7 @@ class SyncHandler(object): start_ts = yield self._get_room_timestamps_at_token( room_ids, since_token, sync_config=sync_config, - limit=pagination_state.limit, + limit=len(room_ids), ) missing_list = frozenset( @@ -1263,7 +1288,7 @@ class RoomSyncResultBuilder(object): __slots__ = ( "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", - "upto_token", + "upto_token", "always_include", ) def __init__(self, room_id, rtype, events, newly_joined, full_state, @@ -1286,3 +1311,4 @@ class RoomSyncResultBuilder(object): self.full_state = full_state self.since_token = since_token self.upto_token = upto_token + self.always_include = False