diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4ca9ff4dbc..a880845605 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,7 +20,7 @@ from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
-from synapse.types import SyncNextBatchToken
+from synapse.types import SyncNextBatchToken, SyncPaginationState
from twisted.internet import defer
@@ -36,9 +36,19 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter_collection",
"is_guest",
"request_key",
+ "pagination_config",
])
+SyncPaginationConfig = collections.namedtuple("SyncPaginationConfig", [
+ "order",
+ "limit",
+])
+
+SYNC_PAGINATION_ORDER_TS = "o"
+SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
+
+
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
@@ -537,7 +547,7 @@ class SyncHandler(object):
archived=sync_result_builder.archived,
next_batch=SyncNextBatchToken(
stream_token=sync_result_builder.now_token,
- pagination_config=batch_token.pagination_config if batch_token else None,
+ pagination_state=sync_result_builder.pagination_state,
)
))
@@ -661,6 +671,7 @@ class SyncHandler(object):
`(newly_joined_rooms, newly_joined_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
+ sync_config = sync_result_builder.sync_config
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_result_builder.sync_config,
@@ -692,6 +703,59 @@ class SyncHandler(object):
tags_by_room = yield self.store.get_tags_for_user(user_id)
+ if sync_config.pagination_config:
+ pagination_config = sync_config.pagination_config
+ elif sync_result_builder.pagination_state:
+ pagination_config = SyncPaginationConfig(
+ order=sync_result_builder.pagination_state.order,
+ limit=sync_result_builder.pagination_state.limit,
+ )
+ else:
+ pagination_config = None
+
+ if sync_result_builder.pagination_state:
+ missing_state = yield self._get_rooms_that_need_full_state(
+ room_ids=[r.room_id for r in room_entries],
+ sync_config=sync_config,
+ since_token=sync_result_builder.since_token,
+ pagination_state=sync_result_builder.pagination_state,
+ )
+
+ if missing_state:
+ for r in room_entries:
+ if r.room_id in missing_state:
+ r.full_state = True
+
+ new_pagination_state = None
+ if pagination_config:
+ room_ids = [r.room_id for r in room_entries]
+ pagination_limit = pagination_config.limit
+
+ room_map = yield self._get_room_timestamps_at_token(
+ room_ids, sync_result_builder.now_token, sync_config, pagination_limit
+ )
+
+ if room_map:
+ sorted_list = sorted(
+ room_map.items(),
+ key=lambda item: -item[1]
+ )[:pagination_limit]
+
+ _, bottom_ts = sorted_list[-1]
+ value = bottom_ts
+
+ new_pagination_state = SyncPaginationState(
+ order=pagination_config.order, value=value, limit=pagination_limit,
+ )
+ else:
+ new_pagination_state = None
+
+ room_entries = [r for r in room_entries if r.room_id in room_map]
+
+ sync_result_builder.pagination_state = new_pagination_state
+
+ sync_result_builder.full_state |= sync_result_builder.since_token is None
+
def handle_room_entries(room_entry):
return self._generate_room_entry(
sync_result_builder,
@@ -948,6 +1012,7 @@ class SyncHandler(object):
even if empty.
"""
newly_joined = room_builder.newly_joined
+ always_include = always_include or newly_joined or sync_result_builder.full_state
full_state = (
room_builder.full_state
or newly_joined
@@ -956,7 +1021,7 @@ class SyncHandler(object):
events = room_builder.events
# We want to shortcut out as early as possible.
- if not (always_include or account_data or ephemeral or full_state):
+ if not (always_include or account_data or ephemeral):
if events == [] and tags is None:
return
@@ -995,7 +1060,7 @@ class SyncHandler(object):
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
- if not (always_include or batch or account_data or ephemeral or full_state):
+ if not (always_include or batch or account_data or ephemeral):
return
state = yield self.compute_state_delta(
@@ -1037,13 +1102,12 @@ class SyncHandler(object):
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
@defer.inlineCallbacks
- def _get_room_timestamps_at_token(self, room_ids, token, sync_config,
- limit):
+ def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
room_to_entries = {}
@defer.inlineCallbacks
def _get_last_ts(room_id):
- entry = yield self.store.get_last_ts_for_room(
+ entry = yield self.store.get_last_event_id_ts_for_room(
room_id, token.room_key
)
@@ -1096,6 +1160,23 @@ class SyncHandler(object):
defer.returnValue(to_return)
+ @defer.inlineCallbacks
+ def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
+ pagination_state):
+ start_ts = yield self._get_room_timestamps_at_token(
+ room_ids, since_token,
+ sync_config=sync_config,
+ limit=pagination_state.limit,
+ )
+
+ missing_list = frozenset(
+ room_id for room_id, ts in
+ sorted(start_ts.items(), key=lambda item: -item[1])
+ if ts < pagination_state.value
+ )
+
+ defer.returnValue(missing_list)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1147,6 +1228,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
+
+ __slots__ = (
+ "sync_config", "full_state", "batch_token", "since_token", "pagination_state",
+ "now_token", "presence", "account_data", "joined", "invited", "archived",
+ )
+
def __init__(self, sync_config, full_state, batch_token, now_token):
"""
Args:
@@ -1159,6 +1246,7 @@ class SyncResultBuilder(object):
self.full_state = full_state
self.batch_token = batch_token
self.since_token = batch_token.stream_token if batch_token else None
+ self.pagination_state = batch_token.pagination_state if batch_token else None
self.now_token = now_token
self.presence = []
@@ -1172,6 +1260,12 @@ class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
"""
+
+ __slots__ = (
+ "room_id", "rtype", "events", "newly_joined", "full_state", "since_token",
+ "upto_token",
+ )
+
def __init__(self, room_id, rtype, events, newly_joined, full_state,
since_token, upto_token):
"""
|