diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3b89582d79..4ca9ff4dbc 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
+from synapse.types import SyncNextBatchToken
from twisted.internet import defer
@@ -140,7 +141,7 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache()
- def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
+ def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
@@ -153,47 +154,47 @@ class SyncHandler(object):
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
- sync_config, since_token, timeout, full_state
+ sync_config, batch_token, timeout, full_state
)
)
return result
@defer.inlineCallbacks
- def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
+ def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
full_state):
context = LoggingContext.current_context()
if context:
- if since_token is None:
+ if batch_token is None:
context.tag = "initial_sync"
elif full_state:
context.tag = "full_state_sync"
else:
context.tag = "incremental_sync"
- if timeout == 0 or since_token is None or full_state:
+ if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(
- sync_config, since_token, full_state=full_state,
+ sync_config, batch_token, full_state=full_state,
)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
- return self.current_sync_for_user(sync_config, since_token)
+ return self.current_sync_for_user(sync_config, batch_token)
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback,
- from_token=since_token,
+ from_token=batch_token.stream_token,
)
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, since_token=None,
+ def current_sync_for_user(self, sync_config, batch_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
- return self.generate_sync_result(sync_config, since_token, full_state)
+ return self.generate_sync_result(sync_config, batch_token, full_state)
@defer.inlineCallbacks
def push_rules_for_user(self, user):
@@ -491,7 +492,7 @@ class SyncHandler(object):
defer.returnValue(None)
@defer.inlineCallbacks
- def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+ def generate_sync_result(self, sync_config, batch_token=None, full_state=False):
"""Generates a sync result.
Args:
@@ -511,7 +512,7 @@ class SyncHandler(object):
sync_result_builder = SyncResultBuilder(
sync_config, full_state,
- since_token=since_token,
+ batch_token=batch_token,
now_token=now_token,
)
@@ -534,7 +535,10 @@ class SyncHandler(object):
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
archived=sync_result_builder.archived,
- next_batch=sync_result_builder.now_token,
+ next_batch=SyncNextBatchToken(
+ stream_token=sync_result_builder.now_token,
+ pagination_config=batch_token.pagination_config if batch_token else None,
+ )
))
@defer.inlineCallbacks
@@ -1032,6 +1036,66 @@ class SyncHandler(object):
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+ @defer.inlineCallbacks
+ def _get_room_timestamps_at_token(self, room_ids, token, sync_config,
+ limit):
+ room_to_entries = {}
+
+ @defer.inlineCallbacks
+ def _get_last_ts(room_id):
+ entry = yield self.store.get_last_ts_for_room(
+ room_id, token.room_key
+ )
+
+ # TODO: Is this ever possible?
+ room_to_entries[room_id] = entry if entry else {
+ "origin_server_ts": 0,
+ }
+
+ yield concurrently_execute(_get_last_ts, room_ids, 10)
+
+ if len(room_to_entries) <= limit:
+ defer.returnValue({
+ room_id: entry["origin_server_ts"]
+ for room_id, entry in room_to_entries.items()
+ })
+
+ queued_events = sorted(
+ room_to_entries.items(),
+ key=lambda e: -e[1]["origin_server_ts"]
+ )
+
+ to_return = {}
+
+ while len(to_return) < limit and len(queued_events) > 0:
+ to_fetch = queued_events[:limit - len(to_return)]
+ event_to_q = {
+ e["event_id"]: (room_id, e) for room_id, e in to_fetch
+ if "event_id" in e
+ }
+
+ # Now we fetch each event to check if its been filtered out
+ event_map = yield self.store.get_events(event_to_q.keys())
+
+ recents = sync_config.filter_collection.filter_room_timeline(
+ event_map.values()
+ )
+ recents = yield filter_events_for_client(
+ self.store,
+ sync_config.user.to_string(),
+ recents,
+ )
+
+ to_return.update({r.room_id: r.origin_server_ts for r in recents})
+
+ for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
+ queued_events.append(event_to_q[ev_id])
+
+ # FIXME: Need to refetch TS
+ queued_events.sort(key=lambda e: -e[1]["origin_server_ts"])
+
+ defer.returnValue(to_return)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1083,17 +1147,18 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
- def __init__(self, sync_config, full_state, since_token, now_token):
+ def __init__(self, sync_config, full_state, batch_token, now_token):
"""
Args:
sync_config(SyncConfig)
full_state(bool): The full_state flag as specified by user
- since_token(StreamToken): The token supplied by user, or None.
+ batch_token(SyncNextBatchToken): The token supplied by user, or None.
now_token(StreamToken): The token to sync up to.
"""
self.sync_config = sync_config
self.full_state = full_state
- self.since_token = since_token
+ self.batch_token = batch_token
+ self.since_token = batch_token.stream_token if batch_token else None
self.now_token = now_token
self.presence = []
|