diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index be26a491ff..451182cfec 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
+from synapse.types import SyncNextBatchToken, SyncPaginationState
from twisted.internet import defer
@@ -35,6 +36,22 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter_collection",
"is_guest",
"request_key",
+ "pagination_config",
+])
+
+
+SyncPaginationConfig = collections.namedtuple("SyncPaginationConfig", [
+ "order",
+ "limit",
+])
+
+SYNC_PAGINATION_ORDER_TS = "o"
+SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
+
+
+SyncExtras = collections.namedtuple("SyncExtras", [
+ "paginate",
+ "rooms",
])
@@ -113,6 +130,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
+ "pagination_info",
])):
__slots__ = []
@@ -140,8 +158,8 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache()
- def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
- full_state=False):
+ def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
+ full_state=False, extras=None):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@@ -153,48 +171,42 @@ class SyncHandler(object):
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
- sync_config, since_token, timeout, full_state
+ sync_config, batch_token, timeout, full_state, extras,
)
)
return result
@defer.inlineCallbacks
- def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
- full_state):
+ def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
+ full_state, extras=None):
context = LoggingContext.current_context()
if context:
- if since_token is None:
+ if batch_token is None:
context.tag = "initial_sync"
elif full_state:
context.tag = "full_state_sync"
else:
context.tag = "incremental_sync"
- if timeout == 0 or since_token is None or full_state:
+ if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
- result = yield self.current_sync_for_user(
- sync_config, since_token, full_state=full_state,
+ result = yield self.generate_sync_result(
+ sync_config, batch_token, full_state=full_state, extras=extras,
)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
- return self.current_sync_for_user(sync_config, since_token)
+ return self.generate_sync_result(
+ sync_config, batch_token, full_state=False, extras=extras,
+ )
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback,
- from_token=since_token,
+ from_token=batch_token.stream_token,
)
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, since_token=None,
- full_state=False):
- """Get the sync for client needed to match what the server has now.
- Returns:
- A Deferred SyncResult.
- """
- return self.generate_sync_result(sync_config, since_token, full_state)
-
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
@@ -490,7 +502,8 @@ class SyncHandler(object):
defer.returnValue(None)
@defer.inlineCallbacks
- def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+ def generate_sync_result(self, sync_config, batch_token=None, full_state=False,
+ extras=None):
"""Generates a sync result.
Args:
@@ -510,7 +523,7 @@ class SyncHandler(object):
sync_result_builder = SyncResultBuilder(
sync_config, full_state,
- since_token=since_token,
+ batch_token=batch_token,
now_token=now_token,
)
@@ -519,7 +532,7 @@ class SyncHandler(object):
)
res = yield self._generate_sync_entry_for_rooms(
- sync_result_builder, account_data_by_room
+ sync_result_builder, account_data_by_room, extras,
)
newly_joined_rooms, newly_joined_users = res
@@ -533,7 +546,11 @@ class SyncHandler(object):
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
archived=sync_result_builder.archived,
- next_batch=sync_result_builder.now_token,
+ next_batch=SyncNextBatchToken(
+ stream_token=sync_result_builder.now_token,
+ pagination_state=sync_result_builder.pagination_state,
+ ),
+ pagination_info=sync_result_builder.pagination_info,
))
@defer.inlineCallbacks
@@ -646,7 +663,8 @@ class SyncHandler(object):
sync_result_builder.presence = presence
@defer.inlineCallbacks
- def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room,
+ extras):
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -659,6 +677,7 @@ class SyncHandler(object):
`(newly_joined_rooms, newly_joined_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
+ sync_config = sync_result_builder.sync_config
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_result_builder.sync_config,
@@ -690,6 +709,94 @@ class SyncHandler(object):
tags_by_room = yield self.store.get_tags_for_user(user_id)
+ if sync_config.pagination_config:
+ pagination_config = sync_config.pagination_config
+ old_pagination_value = 0
+ elif sync_result_builder.pagination_state:
+ pagination_config = SyncPaginationConfig(
+ order=sync_result_builder.pagination_state.order,
+ limit=sync_result_builder.pagination_state.limit,
+ )
+ old_pagination_value = sync_result_builder.pagination_state.value
+ else:
+ pagination_config = None
+ old_pagination_value = 0
+
+ include_map = extras.get("peek", {}) if extras else {}
+
+ if sync_result_builder.pagination_state:
+ missing_state = yield self._get_rooms_that_need_full_state(
+ room_ids=[r.room_id for r in room_entries],
+ sync_config=sync_config,
+ since_token=sync_result_builder.since_token,
+ pagination_state=sync_result_builder.pagination_state,
+ )
+
+ if missing_state:
+ for r in room_entries:
+ if r.room_id in missing_state:
+ r.full_state = True
+ if r.room_id in include_map:
+ r.always_include = True
+ r.events = None
+ r.since_token = None
+ r.upto_token = now_token
+
+ if pagination_config:
+ room_ids = [r.room_id for r in room_entries]
+ pagination_limit = pagination_config.limit
+
+ extra_limit = extras.get("paginate", {}).get("limit", 0) if extras else 0
+
+ room_map = yield self._get_room_timestamps_at_token(
+ room_ids, sync_result_builder.now_token, sync_config,
+ pagination_limit + extra_limit + 1,
+ )
+
+ limited = False
+ if room_map:
+ sorted_list = sorted(
+ room_map.items(),
+ key=lambda item: -item[1]
+ )
+
+ cutoff_list = sorted_list[:pagination_limit + extra_limit]
+
+ if cutoff_list[pagination_limit:]:
+ new_room_ids = set(r[0] for r in cutoff_list[pagination_limit:])
+ for r in room_entries:
+ if r.room_id in new_room_ids:
+ r.full_state = True
+ r.always_include = True
+ r.since_token = None
+ r.upto_token = now_token
+ r.events = None
+
+ _, bottom_ts = cutoff_list[-1]
+ value = bottom_ts
+
+ limited = any(
+ old_pagination_value < r[1] < value
+ for r in sorted_list[pagination_limit + extra_limit:]
+ )
+
+ sync_result_builder.pagination_state = SyncPaginationState(
+ order=pagination_config.order, value=value,
+ limit=pagination_limit + extra_limit,
+ )
+
+ sync_result_builder.pagination_info["limited"] = limited
+
+ if len(room_map) == len(room_entries):
+ sync_result_builder.pagination_state = None
+
+ room_entries = [
+ r for r in room_entries
+ if r.room_id in room_map or r.always_include
+ ]
+
+ sync_result_builder.full_state |= sync_result_builder.since_token is None
+
def handle_room_entries(room_entry):
return self._generate_room_entry(
sync_result_builder,
@@ -698,7 +805,6 @@ class SyncHandler(object):
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
- always_include=sync_result_builder.full_state,
)
yield concurrently_execute(handle_room_entries, room_entries, 10)
@@ -929,8 +1035,7 @@ class SyncHandler(object):
@defer.inlineCallbacks
def _generate_room_entry(self, sync_result_builder, ignored_users,
- room_builder, ephemeral, tags, account_data,
- always_include=False):
+ room_builder, ephemeral, tags, account_data):
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
@@ -946,6 +1051,11 @@ class SyncHandler(object):
even if empty.
"""
newly_joined = room_builder.newly_joined
+ always_include = (
+ newly_joined
+ or sync_result_builder.full_state
+ or room_builder.always_include
+ )
full_state = (
room_builder.full_state
or newly_joined
@@ -954,11 +1064,10 @@ class SyncHandler(object):
events = room_builder.events
# We want to shortcut out as early as possible.
- if not (always_include or account_data or ephemeral or full_state):
+ if not (always_include or account_data or ephemeral):
if events == [] and tags is None:
return
- since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -993,7 +1102,7 @@ class SyncHandler(object):
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
- if not (always_include or batch or account_data or ephemeral or full_state):
+ if not (always_include or batch or account_data or ephemeral):
return
state = yield self.compute_state_delta(
@@ -1034,6 +1143,82 @@ class SyncHandler(object):
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+ @defer.inlineCallbacks
+ def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
+ room_to_entries = {}
+
+ @defer.inlineCallbacks
+ def _get_last_ts(room_id):
+ entry = yield self.store.get_last_event_id_ts_for_room(
+ room_id, token.room_key
+ )
+
+ # TODO: Is this ever possible?
+ room_to_entries[room_id] = entry if entry else {
+ "origin_server_ts": 0,
+ }
+
+ yield concurrently_execute(_get_last_ts, room_ids, 10)
+
+ if len(room_to_entries) <= limit:
+ defer.returnValue({
+ room_id: entry["origin_server_ts"]
+ for room_id, entry in room_to_entries.items()
+ })
+
+ queued_events = sorted(
+ room_to_entries.items(),
+ key=lambda e: -e[1]["origin_server_ts"]
+ )
+
+ to_return = {}
+
+ while len(to_return) < limit and len(queued_events) > 0:
+ to_fetch = queued_events[:limit - len(to_return)]
+ event_to_q = {
+ e["event_id"]: (room_id, e) for room_id, e in to_fetch
+ if "event_id" in e
+ }
+
+ # Now we fetch each event to check if its been filtered out
+ event_map = yield self.store.get_events(event_to_q.keys())
+
+ recents = sync_config.filter_collection.filter_room_timeline(
+ event_map.values()
+ )
+ recents = yield filter_events_for_client(
+ self.store,
+ sync_config.user.to_string(),
+ recents,
+ )
+
+ to_return.update({r.room_id: r.origin_server_ts for r in recents})
+
+ for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
+ queued_events.append(event_to_q[ev_id])
+
+ # FIXME: Need to refetch TS
+ queued_events.sort(key=lambda e: -e[1]["origin_server_ts"])
+
+ defer.returnValue(to_return)
+
+ @defer.inlineCallbacks
+ def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
+ pagination_state):
+ start_ts = yield self._get_room_timestamps_at_token(
+ room_ids, since_token,
+ sync_config=sync_config,
+ limit=len(room_ids),
+ )
+
+ missing_list = frozenset(
+ room_id for room_id, ts in
+ sorted(start_ts.items(), key=lambda item: -item[1])
+ if ts < pagination_state.value
+ )
+
+ defer.returnValue(missing_list)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1085,17 +1270,26 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
- def __init__(self, sync_config, full_state, since_token, now_token):
+
+ __slots__ = (
+ "sync_config", "full_state", "batch_token", "since_token", "pagination_state",
+ "now_token", "presence", "account_data", "joined", "invited", "archived",
+ "pagination_info",
+ )
+
+ def __init__(self, sync_config, full_state, batch_token, now_token):
"""
Args:
sync_config(SyncConfig)
full_state(bool): The full_state flag as specified by user
- since_token(StreamToken): The token supplied by user, or None.
+ batch_token(SyncNextBatchToken): The token supplied by user, or None.
now_token(StreamToken): The token to sync up to.
"""
self.sync_config = sync_config
self.full_state = full_state
- self.since_token = since_token
+ self.batch_token = batch_token
+ self.since_token = batch_token.stream_token if batch_token else None
+ self.pagination_state = batch_token.pagination_state if batch_token else None
self.now_token = now_token
self.presence = []
@@ -1104,11 +1298,19 @@ class SyncResultBuilder(object):
self.invited = []
self.archived = []
+ self.pagination_info = {}
+
class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
"""
+
+ __slots__ = (
+ "room_id", "rtype", "events", "newly_joined", "full_state", "since_token",
+ "upto_token", "always_include",
+ )
+
def __init__(self, room_id, rtype, events, newly_joined, full_state,
since_token, upto_token):
"""
@@ -1129,3 +1331,4 @@ class RoomSyncResultBuilder(object):
self.full_state = full_state
self.since_token = since_token
self.upto_token = upto_token
+ self.always_include = False
|