summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-05-16 16:59:18 +0100
committerErik Johnston <erik@matrix.org>2016-05-16 16:59:18 +0100
commit32d476d4f1fd2a99c8ccbffe687293a26d5519d3 (patch)
treee201c627640691e657f4412f74e3513d047687e1 /synapse/handlers/sync.py
parentOnly load the last N joined room (diff)
downloadsynapse-32d476d4f1fd2a99c8ccbffe687293a26d5519d3.tar.xz
Change token format
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py87
1 files changed, 55 insertions, 32 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index 4c5b935012..33c05950c5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -24,6 +24,8 @@ from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import SyncNextBatchToken + from twisted.internet import defer import collections @@ -141,7 +143,7 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() self.response_cache = ResponseCache() - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then @@ -154,53 +156,68 @@ class SyncHandler(BaseHandler): result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, since_token, timeout, full_state + sync_config, batch_token, timeout, full_state ) ) return result @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, + def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, full_state): context = LoggingContext.current_context() if context: - if since_token is None: + if batch_token is None: context.tag = "initial_sync" elif full_state: context.tag = "full_state_sync" else: context.tag = "incremental_sync" - if timeout == 0 or since_token is None or full_state: + if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result = yield self.current_sync_for_user( - sync_config, since_token, full_state=full_state, + sync_config, batch_token, full_state=full_state, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, since_token) + return self.current_sync_for_user(sync_config, batch_token) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token, + from_token=batch_token.stream_token, ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None, + def current_sync_for_user(self, sync_config, batch_token=None, full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) + if batch_token is None or full_state: + return self.full_state_sync(sync_config, batch_token) else: - return self.incremental_sync_with_gap(sync_config, since_token) + return self.incremental_sync_with_gap(sync_config, batch_token) @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): + def _get_room_timestamps_at_token(self, room_ids, token): + room_to_last_ts = {} + + @defer.inlineCallbacks + def _get_last_ts(room_id): + ts = yield self.store.get_last_ts_for_room( + room_id, token.room_key + ) + room_to_last_ts[room_id] = ts if ts else 0 + + logger.info("room_to_last_ts: %r", room_to_last_ts) + yield concurrently_execute(_get_last_ts, room_ids, 10) + defer.returnValue(room_to_last_ts) + + @defer.inlineCallbacks + def full_state_sync(self, sync_config, batch_token): """Get a sync for a client which is starting without any state. If a 'message_since_token' is given, only timeline events which have @@ -209,6 +226,11 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ + if batch_token: + timeline_since_token = batch_token.stream_token + else: + timeline_since_token = None + now_token = yield self.event_sources.get_current_token() now_token, ephemeral_by_room = yield self.ephemeral_by_room( @@ -258,24 +280,22 @@ class SyncHandler(BaseHandler): user_id = sync_config.user.to_string() - room_to_last_ts = {} - - @defer.inlineCallbacks - def _get_last_ts(event): - room_id = event.room_id - if event.membership == Membership.JOIN: - ts = yield self.store.get_last_ts_for_room( - room_id, now_token.room_key - ) - room_to_last_ts[room_id] = ts if ts else 0 + pagination_limit = 20 + room_pagination_config = { + "l": pagination_limit, + "o": 0, + "t": now_token, + } - logger.info("room_to_last_ts: %r", room_to_last_ts) - yield concurrently_execute(_get_last_ts, room_list, 10) + room_to_last_ts = yield self._get_room_timestamps_at_token( + room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN], + token=now_token, + ) joined_rooms_list = frozenset([ room_id for room_id, _f in sorted(room_to_last_ts.items(), key=lambda item: -item[1]) - ][:20]) + ][:pagination_limit]) @defer.inlineCallbacks def _generate_room_entry(event): @@ -326,9 +346,7 @@ class SyncHandler(BaseHandler): self.account_data_for_user(account_data) ) - presence = sync_config.filter_collection.filter_presence( - presence - ) + presence = sync_config.filter_collection.filter_presence(presence) defer.returnValue(SyncResult( presence=presence, @@ -336,7 +354,10 @@ class SyncHandler(BaseHandler): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=SyncNextBatchToken( + stream_token=now_token, + pagination_config=room_pagination_config, + ), )) @defer.inlineCallbacks @@ -480,12 +501,14 @@ class SyncHandler(BaseHandler): ) @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): + def incremental_sync_with_gap(self, sync_config, batch_token): """ Get the incremental delta needed to bring the client up to date with the server. Returns: A Deferred SyncResult. """ + since_token = batch_token.stream_token + now_token = yield self.event_sources.get_current_token() rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) @@ -693,7 +716,7 @@ class SyncHandler(BaseHandler): joined=joined, invited=invited, archived=archived, - next_batch=now_token, + next_batch=batch_token.replace(stream_token=now_token), )) @defer.inlineCallbacks