diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9ebfccc8bf..a32f48135e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -22,6 +22,8 @@ from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
+from synapse.types import SyncNextBatchToken
+
from twisted.internet import defer
import collections
@@ -141,7 +143,7 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache()
- def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
+ def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
@@ -154,53 +156,113 @@ class SyncHandler(object):
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
- sync_config, since_token, timeout, full_state
+ sync_config, batch_token, timeout, full_state
)
)
return result
@defer.inlineCallbacks
- def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
+ def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
full_state):
context = LoggingContext.current_context()
if context:
- if since_token is None:
+ if batch_token is None:
context.tag = "initial_sync"
elif full_state:
context.tag = "full_state_sync"
else:
context.tag = "incremental_sync"
- if timeout == 0 or since_token is None or full_state:
+ if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(
- sync_config, since_token, full_state=full_state,
+ sync_config, batch_token, full_state=full_state,
)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
- return self.current_sync_for_user(sync_config, since_token)
+ return self.current_sync_for_user(sync_config, batch_token)
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback,
- from_token=since_token,
+ from_token=batch_token.stream_token,
)
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, since_token=None,
+ def current_sync_for_user(self, sync_config, batch_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
- if since_token is None or full_state:
- return self.full_state_sync(sync_config, since_token)
+ if batch_token is None or full_state:
+ return self.full_state_sync(sync_config, batch_token)
else:
- return self.incremental_sync_with_gap(sync_config, since_token)
+ return self.incremental_sync_with_gap(sync_config, batch_token)
+
+ @defer.inlineCallbacks
+ def _get_room_timestamps_at_token(self, room_ids, token, sync_config,
+ limit):
+ room_to_entries = {}
+
+ @defer.inlineCallbacks
+ def _get_last_ts(room_id):
+ entry = yield self.store.get_last_ts_for_room(
+ room_id, token.room_key
+ )
+
+ # TODO: Is this ever possible?
+ room_to_entries[room_id] = entry if entry else {
+ "origin_server_ts": 0,
+ }
+
+ yield concurrently_execute(_get_last_ts, room_ids, 10)
+
+ if len(room_to_entries) <= limit:
+ defer.returnValue({
+ room_id: entry["origin_server_ts"]
+ for room_id, entry in room_to_entries.items()
+ })
+
+ queued_events = sorted(
+ room_to_entries.items(),
+ key=lambda e: -e[1]["origin_server_ts"]
+ )
+
+ to_return = {}
+
+ while len(to_return) < limit and len(queued_events) > 0:
+ to_fetch = queued_events[:limit - len(to_return)]
+ event_to_q = {
+ e["event_id"]: (room_id, e) for room_id, e in to_fetch
+ if "event_id" in e
+ }
+
+ # Now we fetch each event to check if its been filtered out
+ event_map = yield self.store.get_events(event_to_q.keys())
+
+ recents = sync_config.filter_collection.filter_room_timeline(
+ event_map.values()
+ )
+ recents = yield filter_events_for_client(
+ self.store,
+ sync_config.user.to_string(),
+ recents,
+ )
+
+ to_return.update({r.room_id: r.origin_server_ts for r in recents})
+
+ for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
+ queued_events.append(event_to_q[ev_id])
+
+ # FIXME: Need to refetch TS
+ queued_events.sort(key=lambda e: -e[1]["origin_server_ts"])
+
+ defer.returnValue(to_return)
@defer.inlineCallbacks
- def full_state_sync(self, sync_config, timeline_since_token):
+ def full_state_sync(self, sync_config, batch_token):
"""Get a sync for a client which is starting without any state.
If a 'message_since_token' is given, only timeline events which have
@@ -209,6 +271,11 @@ class SyncHandler(object):
Returns:
A Deferred SyncResult.
"""
+ if batch_token:
+ timeline_since_token = batch_token.stream_token
+ else:
+ timeline_since_token = None
+
now_token = yield self.event_sources.get_current_token()
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
@@ -258,19 +325,50 @@ class SyncHandler(object):
user_id = sync_config.user.to_string()
+ pagination_limit = 10
+ room_pagination_config = {
+ "l": pagination_limit,
+ "o": 0,
+ "t": now_token.to_string(),
+ }
+
+ room_to_last_ts = yield self._get_room_timestamps_at_token(
+ room_ids=[
+ e.room_id for e in room_list if e.membership == Membership.JOIN
+ ],
+ token=now_token,
+ sync_config=sync_config,
+ limit=pagination_limit,
+ )
+
+ if room_to_last_ts:
+ sorted_list = sorted(
+ room_to_last_ts.items(),
+ key=lambda item: -item[1]
+ )[:pagination_limit]
+
+ _, bottom_ts = sorted_list[-1]
+ room_pagination_config["ts"] = bottom_ts
+
+ joined_rooms_list = frozenset(room_id for room_id, _f in sorted_list)
+ else:
+ room_pagination_config = {}
+ joined_rooms_list = frozenset()
+
@defer.inlineCallbacks
def _generate_room_entry(event):
if event.membership == Membership.JOIN:
- room_result = yield self.full_state_sync_for_joined_room(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- joined.append(room_result)
+ if event.room_id in joined_rooms_list:
+ room_result = yield self.full_state_sync_for_joined_room(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ joined.append(room_result)
elif event.membership == Membership.INVITE:
if event.sender in ignored_users:
return
@@ -306,9 +404,7 @@ class SyncHandler(object):
self.account_data_for_user(account_data)
)
- presence = sync_config.filter_collection.filter_presence(
- presence
- )
+ presence = sync_config.filter_collection.filter_presence(presence)
defer.returnValue(SyncResult(
presence=presence,
@@ -316,7 +412,10 @@ class SyncHandler(object):
joined=joined,
invited=invited,
archived=archived,
- next_batch=now_token,
+ next_batch=SyncNextBatchToken(
+ stream_token=now_token,
+ pagination_config=room_pagination_config,
+ ),
))
@defer.inlineCallbacks
@@ -460,12 +559,32 @@ class SyncHandler(object):
)
@defer.inlineCallbacks
- def incremental_sync_with_gap(self, sync_config, since_token):
+ def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts,
+ sync_config, pagination_limit):
+ start_ts = yield self._get_room_timestamps_at_token(
+ room_ids, since_token,
+ sync_config=sync_config,
+ limit=pagination_limit,
+ )
+
+ missing_list = frozenset(
+ room_id for room_id, ts in
+ sorted(start_ts.items(), key=lambda item: -item[1])
+ if ts < pa_ts
+ )
+
+ defer.returnValue(missing_list)
+
+ @defer.inlineCallbacks
+ def incremental_sync_with_gap(self, sync_config, batch_token):
""" Get the incremental delta needed to bring the client up to
date with the server.
Returns:
A Deferred SyncResult.
"""
+ since_token = batch_token.stream_token
+ room_pagination_config = batch_token.pagination_config
+
now_token = yield self.event_sources.get_current_token()
rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
@@ -606,7 +725,21 @@ class SyncHandler(object):
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
- newly_joined_room = room_id in newly_joined_rooms
+ p_room_token = room_pagination_config.get("t", None)
+ if p_room_token:
+ pa_limit = room_pagination_config["l"]
+ needing_full_state = yield self._get_rooms_that_need_full_state(
+ [room_id],
+ since_token,
+ room_pagination_config.get("ts", 0),
+ sync_config=sync_config,
+ pagination_limit=pa_limit,
+ )
+ need_full_state = room_id in needing_full_state
+ else:
+ need_full_state = False
+
+ newly_joined_room = (room_id in newly_joined_rooms) or need_full_state
full_state = newly_joined_room
batch = yield self.load_filtered_recents(
@@ -635,6 +768,19 @@ class SyncHandler(object):
full_state=full_state,
)
if room_sync:
+ if not room_sync.timeline:
+ p_room_token = room_pagination_config.get("t", None)
+ if p_room_token:
+ pa_limit = room_pagination_config["l"]
+ needing_full_state = yield self._get_rooms_that_need_full_state(
+ [room_id],
+ since_token,
+ room_pagination_config.get("ts", 0),
+ sync_config=sync_config,
+ pagination_limit=pa_limit,
+ )
+ if room_id in needing_full_state:
+ continue
joined.append(room_sync)
# For each newly joined room, we want to send down presence of
@@ -673,7 +819,7 @@ class SyncHandler(object):
joined=joined,
invited=invited,
archived=archived,
- next_batch=now_token,
+ next_batch=batch_token.replace(stream_token=now_token),
))
@defer.inlineCallbacks
|