summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/sync.py206
1 files changed, 176 insertions, 30 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index 9ebfccc8bf..a32f48135e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -22,6 +22,8 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import SyncNextBatchToken + from twisted.internet import defer import collections @@ -141,7 +143,7 @@ class SyncHandler(object): self.clock = hs.get_clock() self.response_cache = ResponseCache() - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then @@ -154,53 +156,113 @@ class SyncHandler(object): result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, since_token, timeout, full_state + sync_config, batch_token, timeout, full_state ) ) return result @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, + def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, full_state): context = LoggingContext.current_context() if context: - if since_token is None: + if batch_token is None: context.tag = "initial_sync" elif full_state: context.tag = "full_state_sync" else: context.tag = "incremental_sync" - if timeout == 0 or since_token is None or full_state: + if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result = yield self.current_sync_for_user( - sync_config, since_token, full_state=full_state, + sync_config, batch_token, full_state=full_state, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, since_token) + return self.current_sync_for_user(sync_config, batch_token) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token, + from_token=batch_token.stream_token, ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None, + def current_sync_for_user(self, sync_config, batch_token=None, full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) + if batch_token is None or full_state: + return self.full_state_sync(sync_config, batch_token) else: - return self.incremental_sync_with_gap(sync_config, since_token) + return self.incremental_sync_with_gap(sync_config, batch_token) + + @defer.inlineCallbacks + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, + limit): + room_to_entries = {} + + @defer.inlineCallbacks + def _get_last_ts(room_id): + entry = yield self.store.get_last_ts_for_room( + room_id, token.room_key + ) + + # TODO: Is this ever possible? + room_to_entries[room_id] = entry if entry else { + "origin_server_ts": 0, + } + + yield concurrently_execute(_get_last_ts, room_ids, 10) + + if len(room_to_entries) <= limit: + defer.returnValue({ + room_id: entry["origin_server_ts"] + for room_id, entry in room_to_entries.items() + }) + + queued_events = sorted( + room_to_entries.items(), + key=lambda e: -e[1]["origin_server_ts"] + ) + + to_return = {} + + while len(to_return) < limit and len(queued_events) > 0: + to_fetch = queued_events[:limit - len(to_return)] + event_to_q = { + e["event_id"]: (room_id, e) for room_id, e in to_fetch + if "event_id" in e + } + + # Now we fetch each event to check if its been filtered out + event_map = yield self.store.get_events(event_to_q.keys()) + + recents = sync_config.filter_collection.filter_room_timeline( + event_map.values() + ) + recents = yield filter_events_for_client( + self.store, + sync_config.user.to_string(), + recents, + ) + + to_return.update({r.room_id: r.origin_server_ts for r in recents}) + + for ev_id in set(event_map.keys()) - set(r.event_id for r in recents): + queued_events.append(event_to_q[ev_id]) + + # FIXME: Need to refetch TS + queued_events.sort(key=lambda e: -e[1]["origin_server_ts"]) + + defer.returnValue(to_return) @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): + def full_state_sync(self, sync_config, batch_token): """Get a sync for a client which is starting without any state. If a 'message_since_token' is given, only timeline events which have @@ -209,6 +271,11 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ + if batch_token: + timeline_since_token = batch_token.stream_token + else: + timeline_since_token = None + now_token = yield self.event_sources.get_current_token() now_token, ephemeral_by_room = yield self.ephemeral_by_room( @@ -258,19 +325,50 @@ class SyncHandler(object): user_id = sync_config.user.to_string() + pagination_limit = 10 + room_pagination_config = { + "l": pagination_limit, + "o": 0, + "t": now_token.to_string(), + } + + room_to_last_ts = yield self._get_room_timestamps_at_token( + room_ids=[ + e.room_id for e in room_list if e.membership == Membership.JOIN + ], + token=now_token, + sync_config=sync_config, + limit=pagination_limit, + ) + + if room_to_last_ts: + sorted_list = sorted( + room_to_last_ts.items(), + key=lambda item: -item[1] + )[:pagination_limit] + + _, bottom_ts = sorted_list[-1] + room_pagination_config["ts"] = bottom_ts + + joined_rooms_list = frozenset(room_id for room_id, _f in sorted_list) + else: + room_pagination_config = {} + joined_rooms_list = frozenset() + @defer.inlineCallbacks def _generate_room_entry(event): if event.membership == Membership.JOIN: - room_result = yield self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - joined.append(room_result) + if event.room_id in joined_rooms_list: + room_result = yield self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) + joined.append(room_result) elif event.membership == Membership.INVITE: if event.sender in ignored_users: return @@ -306,9 +404,7 @@ class SyncHandler(object): self.account_data_for_user(account_data) ) - presence = sync_config.filter_collection.filter_presence( - presence - ) + presence = sync_config.filter_collection.filter_presence(presence) defer.returnValue(SyncResult( presence=presence, @@ -316,7 +412,10 @@ class SyncHandler(object): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=SyncNextBatchToken( + stream_token=now_token, + pagination_config=room_pagination_config, + ), )) @defer.inlineCallbacks @@ -460,12 +559,32 @@ class SyncHandler(object): ) @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): + def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts, + sync_config, pagination_limit): + start_ts = yield self._get_room_timestamps_at_token( + room_ids, since_token, + sync_config=sync_config, + limit=pagination_limit, + ) + + missing_list = frozenset( + room_id for room_id, ts in + sorted(start_ts.items(), key=lambda item: -item[1]) + if ts < pa_ts + ) + + defer.returnValue(missing_list) + + @defer.inlineCallbacks + def incremental_sync_with_gap(self, sync_config, batch_token): """ Get the incremental delta needed to bring the client up to date with the server. Returns: A Deferred SyncResult. """ + since_token = batch_token.stream_token + room_pagination_config = batch_token.pagination_config + now_token = yield self.event_sources.get_current_token() rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) @@ -606,7 +725,21 @@ class SyncHandler(object): prev_batch_token = now_token.copy_and_replace("room_key", start_key) - newly_joined_room = room_id in newly_joined_rooms + p_room_token = room_pagination_config.get("t", None) + if p_room_token: + pa_limit = room_pagination_config["l"] + needing_full_state = yield self._get_rooms_that_need_full_state( + [room_id], + since_token, + room_pagination_config.get("ts", 0), + sync_config=sync_config, + pagination_limit=pa_limit, + ) + need_full_state = room_id in needing_full_state + else: + need_full_state = False + + newly_joined_room = (room_id in newly_joined_rooms) or need_full_state full_state = newly_joined_room batch = yield self.load_filtered_recents( @@ -635,6 +768,19 @@ class SyncHandler(object): full_state=full_state, ) if room_sync: + if not room_sync.timeline: + p_room_token = room_pagination_config.get("t", None) + if p_room_token: + pa_limit = room_pagination_config["l"] + needing_full_state = yield self._get_rooms_that_need_full_state( + [room_id], + since_token, + room_pagination_config.get("ts", 0), + sync_config=sync_config, + pagination_limit=pa_limit, + ) + if room_id in needing_full_state: + continue joined.append(room_sync) # For each newly joined room, we want to send down presence of @@ -673,7 +819,7 @@ class SyncHandler(object): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=batch_token.replace(stream_token=now_token), )) @defer.inlineCallbacks