summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/sync.py97
1 files changed, 81 insertions, 16 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index 3b89582d79..4ca9ff4dbc 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client +from synapse.types import SyncNextBatchToken from twisted.internet import defer @@ -140,7 +141,7 @@ class SyncHandler(object): self.clock = hs.get_clock() self.response_cache = ResponseCache() - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then @@ -153,47 +154,47 @@ class SyncHandler(object): result = self.response_cache.set( sync_config.request_key, self._wait_for_sync_for_user( - sync_config, since_token, timeout, full_state + sync_config, batch_token, timeout, full_state ) ) return result @defer.inlineCallbacks - def _wait_for_sync_for_user(self, sync_config, since_token, timeout, + def _wait_for_sync_for_user(self, sync_config, batch_token, timeout, full_state): context = LoggingContext.current_context() if context: - if since_token is None: + if batch_token is None: context.tag = "initial_sync" elif full_state: context.tag = "full_state_sync" else: context.tag = "incremental_sync" - if timeout == 0 or since_token is None or full_state: + if timeout == 0 or batch_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result = yield self.current_sync_for_user( - sync_config, since_token, full_state=full_state, + sync_config, batch_token, full_state=full_state, ) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, since_token) + return self.current_sync_for_user(sync_config, batch_token) result = yield self.notifier.wait_for_events( sync_config.user.to_string(), timeout, current_sync_callback, - from_token=since_token, + from_token=batch_token.stream_token, ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None, + def current_sync_for_user(self, sync_config, batch_token=None, full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - return self.generate_sync_result(sync_config, since_token, full_state) + return self.generate_sync_result(sync_config, batch_token, full_state) @defer.inlineCallbacks def push_rules_for_user(self, user): @@ -491,7 +492,7 @@ class SyncHandler(object): defer.returnValue(None) @defer.inlineCallbacks - def generate_sync_result(self, sync_config, since_token=None, full_state=False): + def generate_sync_result(self, sync_config, batch_token=None, full_state=False): """Generates a sync result. Args: @@ -511,7 +512,7 @@ class SyncHandler(object): sync_result_builder = SyncResultBuilder( sync_config, full_state, - since_token=since_token, + batch_token=batch_token, now_token=now_token, ) @@ -534,7 +535,10 @@ class SyncHandler(object): joined=sync_result_builder.joined, invited=sync_result_builder.invited, archived=sync_result_builder.archived, - next_batch=sync_result_builder.now_token, + next_batch=SyncNextBatchToken( + stream_token=sync_result_builder.now_token, + pagination_config=batch_token.pagination_config if batch_token else None, + ) )) @defer.inlineCallbacks @@ -1032,6 +1036,66 @@ class SyncHandler(object): else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) + @defer.inlineCallbacks + def _get_room_timestamps_at_token(self, room_ids, token, sync_config, + limit): + room_to_entries = {} + + @defer.inlineCallbacks + def _get_last_ts(room_id): + entry = yield self.store.get_last_ts_for_room( + room_id, token.room_key + ) + + # TODO: Is this ever possible? + room_to_entries[room_id] = entry if entry else { + "origin_server_ts": 0, + } + + yield concurrently_execute(_get_last_ts, room_ids, 10) + + if len(room_to_entries) <= limit: + defer.returnValue({ + room_id: entry["origin_server_ts"] + for room_id, entry in room_to_entries.items() + }) + + queued_events = sorted( + room_to_entries.items(), + key=lambda e: -e[1]["origin_server_ts"] + ) + + to_return = {} + + while len(to_return) < limit and len(queued_events) > 0: + to_fetch = queued_events[:limit - len(to_return)] + event_to_q = { + e["event_id"]: (room_id, e) for room_id, e in to_fetch + if "event_id" in e + } + + # Now we fetch each event to check if its been filtered out + event_map = yield self.store.get_events(event_to_q.keys()) + + recents = sync_config.filter_collection.filter_room_timeline( + event_map.values() + ) + recents = yield filter_events_for_client( + self.store, + sync_config.user.to_string(), + recents, + ) + + to_return.update({r.room_id: r.origin_server_ts for r in recents}) + + for ev_id in set(event_map.keys()) - set(r.event_id for r in recents): + queued_events.append(event_to_q[ev_id]) + + # FIXME: Need to refetch TS + queued_events.sort(key=lambda e: -e[1]["origin_server_ts"]) + + defer.returnValue(to_return) + def _action_has_highlight(actions): for action in actions: @@ -1083,17 +1147,18 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): "Used to help build up a new SyncResult for a user" - def __init__(self, sync_config, full_state, since_token, now_token): + def __init__(self, sync_config, full_state, batch_token, now_token): """ Args: sync_config(SyncConfig) full_state(bool): The full_state flag as specified by user - since_token(StreamToken): The token supplied by user, or None. + batch_token(SyncNextBatchToken): The token supplied by user, or None. now_token(StreamToken): The token to sync up to. """ self.sync_config = sync_config self.full_state = full_state - self.since_token = since_token + self.batch_token = batch_token + self.since_token = batch_token.stream_token if batch_token else None self.now_token = now_token self.presence = []