summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/sync.py92
-rw-r--r--synapse/rest/client/v2_alpha/sync.py7
-rw-r--r--synapse/types.py18
3 files changed, 80 insertions, 37 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index a880845605..fef81f5f9b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -49,6 +49,12 @@ SYNC_PAGINATION_ORDER_TS = "o" SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,) +SyncExtras = collections.namedtuple("SyncExtras", [ + "paginate", + "rooms", +]) + + class TimelineBatch(collections.namedtuple("TimelineBatch", [ "prev_batch", "events", @@ -152,7 +158,7 @@ class SyncHandler(object): self.response_cache = ResponseCache() def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, - full_state=False): + full_state=False, extras=None): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. @@ -164,14 +170,14 @@ class SyncHandler(object): result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, batch_token, timeout, full_state + sync_config, batch_token, timeout, full_state, extras, ) ) return result @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, - full_state): + full_state, extras=None): context = LoggingContext.current_context() if context: if batch_token is None: @@ -184,13 +190,15 @@ class SyncHandler(object): if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = yield self.current_sync_for_user( - sync_config, batch_token, full_state=full_state, + result = yield self.generate_sync_result( + sync_config, batch_token, full_state=full_state, extras=extras, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, batch_token) + return self.generate_sync_result( + sync_config, batch_token, full_state=False, extras=extras, + ) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, @@ -198,14 +206,6 @@ class SyncHandler(object): ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, batch_token=None, - full_state=False): - """Get the sync for client needed to match what the server has now. - Returns: - A Deferred SyncResult. - """ - return self.generate_sync_result(sync_config, batch_token, full_state) - @defer.inlineCallbacks def push_rules_for_user(self, user): user_id = user.to_string() @@ -502,7 +502,8 @@ class SyncHandler(object): defer.returnValue(None) @defer.inlineCallbacks - def generate_sync_result(self, sync_config, batch_token=None, full_state=False): + def generate_sync_result(self, sync_config, batch_token=None, full_state=False, + extras=None): """Generates a sync result. Args: @@ -531,7 +532,7 @@ class SyncHandler(object): ) res = yield self._generate_sync_entry_for_rooms( - sync_result_builder, account_data_by_room + sync_result_builder, account_data_by_room, extras, ) newly_joined_rooms, newly_joined_users = res @@ -658,7 +659,8 @@ class SyncHandler(object): sync_result_builder.presence = presence @defer.inlineCallbacks - def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room, + extras): """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. @@ -713,6 +715,8 @@ class SyncHandler(object): else: pagination_config = None + include_map = extras.get("peek", {}) if extras else {} + if sync_result_builder.pagination_state: missing_state = yield self._get_rooms_that_need_full_state( room_ids=[r.room_id for r in room_entries], @@ -725,34 +729,54 @@ class SyncHandler(object): for r in room_entries: if r.room_id in missing_state: r.full_state = True + if r.room_id in include_map: + r.always_include = True + r.events = None + r.since_token = None + r.upto_token = now_token - new_pagination_state = None if pagination_config: room_ids = [r.room_id for r in room_entries] pagination_limit = pagination_config.limit + extra_limit = extras.get("paginate", {}).get("limit", 0) if extras else 0 + room_map = yield self._get_room_timestamps_at_token( - room_ids, sync_result_builder.now_token, sync_config, pagination_limit + room_ids, sync_result_builder.now_token, sync_config, + pagination_limit + extra_limit, ) if room_map: sorted_list = sorted( room_map.items(), key=lambda item: -item[1] - )[:pagination_limit] + )[:pagination_limit + extra_limit] + + if sorted_list[pagination_limit:]: + new_room_ids = set(r[0] for r in sorted_list[pagination_limit:]) + for r in room_entries: + if r.room_id in new_room_ids: + r.full_state = True + r.always_include = True + r.since_token = None + r.upto_token = now_token + r.events = None _, bottom_ts = sorted_list[-1] value = bottom_ts - new_pagination_state = SyncPaginationState( - order=pagination_config.order, value=value, limit=pagination_limit, + sync_result_builder.pagination_state = SyncPaginationState( + order=pagination_config.order, value=value, + limit=pagination_limit + extra_limit, ) - else: - new_pagination_state = None - room_entries = [r for r in room_entries if r.room_id in room_map] + if len(room_map) == len(room_entries): + sync_result_builder.pagination_state = None - sync_result_builder.pagination_state = new_pagination_state + room_entries = [ + r for r in room_entries + if r.room_id in room_map or r.always_include + ] sync_result_builder.full_state |= sync_result_builder.since_token is None @@ -764,7 +788,6 @@ class SyncHandler(object): ephemeral=ephemeral_by_room.get(room_entry.room_id, []), tags=tags_by_room.get(room_entry.room_id), account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builder.full_state, ) yield concurrently_execute(handle_room_entries, room_entries, 10) @@ -995,8 +1018,7 @@ class SyncHandler(object): @defer.inlineCallbacks def _generate_room_entry(self, sync_result_builder, ignored_users, - room_builder, ephemeral, tags, account_data, - always_include=False): + room_builder, ephemeral, tags, account_data): """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. @@ -1012,7 +1034,11 @@ class SyncHandler(object): even if empty. """ newly_joined = room_builder.newly_joined - always_include = always_include or newly_joined or sync_result_builder.full_state + always_include = ( + newly_joined + or sync_result_builder.full_state + or room_builder.always_include + ) full_state = ( room_builder.full_state or newly_joined @@ -1025,7 +1051,6 @@ class SyncHandler(object): if events == [] and tags is None: return - since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -1166,7 +1191,7 @@ class SyncHandler(object): start_ts = yield self._get_room_timestamps_at_token( room_ids, since_token, sync_config=sync_config, - limit=pagination_state.limit, + limit=len(room_ids), ) missing_list = frozenset( @@ -1263,7 +1288,7 @@ class RoomSyncResultBuilder(object): __slots__ = ( "room_id", "rtype", "events", "newly_joined", "full_state", "since_token", - "upto_token", + "upto_token", "always_include", ) def __init__(self, room_id, rtype, events, newly_joined, full_state, @@ -1286,3 +1311,4 @@ class RoomSyncResultBuilder(object): self.full_state = full_state self.since_token = since_token self.upto_token = upto_token + self.always_include = False diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 3df9743132..da94220f1e 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py
@@ -98,6 +98,8 @@ class SyncRestServlet(RestServlet): since = body.get("since", None) + extras = body.get("extras", None) + if "from" in body: # /events used to use 'from', but /sync uses 'since'. # Lets be helpful and whine if we see a 'from'. @@ -162,6 +164,7 @@ class SyncRestServlet(RestServlet): set_presence=set_presence, full_state=full_state, timeout=timeout, + extras=extras, ) defer.returnValue(sync_result) @@ -239,7 +242,7 @@ class SyncRestServlet(RestServlet): @defer.inlineCallbacks def _handle_sync(self, requester, sync_config, batch_token, set_presence, - full_state, timeout): + full_state, timeout, extras=None): affect_presence = set_presence != PresenceState.OFFLINE user = sync_config.user @@ -253,7 +256,7 @@ class SyncRestServlet(RestServlet): with context: sync_result = yield self.sync_handler.wait_for_sync_for_user( sync_config, batch_token=batch_token, timeout=timeout, - full_state=full_state + full_state=full_state, extras=extras, ) time_now = self.clock.time_msec() diff --git a/synapse/types.py b/synapse/types.py
index fd7c0ffe7a..cf950a0c36 100644 --- a/synapse/types.py +++ b/synapse/types.py
@@ -129,7 +129,7 @@ class SyncNextBatchToken( if pa: pa = SyncPaginationState.from_dict(pa) return cls( - stream_token=StreamToken.from_string(d["t"]), + stream_token=StreamToken.from_arr(d["t"]), pagination_state=pa, ) except: @@ -137,7 +137,7 @@ class SyncNextBatchToken( def to_string(self): return encode_base64(json.dumps({ - "t": self.stream_token.to_string(), + "t": self.stream_token.to_arr(), "pa": self.pagination_state.to_dict() if self.pagination_state else None, })) @@ -196,6 +196,20 @@ class StreamToken( def to_string(self): return self._SEPARATOR.join([str(k) for k in self]) + @classmethod + def from_arr(cls, arr): + try: + keys = arr + while len(keys) < len(cls._fields): + # i.e. old token from before receipt_key + keys.append("0") + return cls(*keys) + except: + raise SynapseError(400, "Invalid Token") + + def to_arr(self): + return self + @property def room_stream_id(self): # TODO(markjh): Awful hack to work around hacks in the presence tests