diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4ca9ff4dbc..a880845605 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,7 +20,7 @@ from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
-from synapse.types import SyncNextBatchToken
+from synapse.types import SyncNextBatchToken, SyncPaginationState
from twisted.internet import defer
@@ -36,9 +36,19 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter_collection",
"is_guest",
"request_key",
+ "pagination_config",
])
+SyncPaginationConfig = collections.namedtuple("SyncPaginationConfig", [
+ "order",
+ "limit",
+])
+
+SYNC_PAGINATION_ORDER_TS = "o"
+SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
+
+
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
@@ -537,7 +547,7 @@ class SyncHandler(object):
archived=sync_result_builder.archived,
next_batch=SyncNextBatchToken(
stream_token=sync_result_builder.now_token,
- pagination_config=batch_token.pagination_config if batch_token else None,
+ pagination_state=sync_result_builder.pagination_state,
)
))
@@ -661,6 +671,7 @@ class SyncHandler(object):
`(newly_joined_rooms, newly_joined_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
+ sync_config = sync_result_builder.sync_config
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_result_builder.sync_config,
@@ -692,6 +703,59 @@ class SyncHandler(object):
tags_by_room = yield self.store.get_tags_for_user(user_id)
+ if sync_config.pagination_config:
+ pagination_config = sync_config.pagination_config
+ elif sync_result_builder.pagination_state:
+ pagination_config = SyncPaginationConfig(
+ order=sync_result_builder.pagination_state.order,
+ limit=sync_result_builder.pagination_state.limit,
+ )
+ else:
+ pagination_config = None
+
+ if sync_result_builder.pagination_state:
+ missing_state = yield self._get_rooms_that_need_full_state(
+ room_ids=[r.room_id for r in room_entries],
+ sync_config=sync_config,
+ since_token=sync_result_builder.since_token,
+ pagination_state=sync_result_builder.pagination_state,
+ )
+
+ if missing_state:
+ for r in room_entries:
+ if r.room_id in missing_state:
+ r.full_state = True
+
+ new_pagination_state = None
+ if pagination_config:
+ room_ids = [r.room_id for r in room_entries]
+ pagination_limit = pagination_config.limit
+
+ room_map = yield self._get_room_timestamps_at_token(
+ room_ids, sync_result_builder.now_token, sync_config, pagination_limit
+ )
+
+ if room_map:
+ sorted_list = sorted(
+ room_map.items(),
+ key=lambda item: -item[1]
+ )[:pagination_limit]
+
+ _, bottom_ts = sorted_list[-1]
+ value = bottom_ts
+
+ new_pagination_state = SyncPaginationState(
+ order=pagination_config.order, value=value, limit=pagination_limit,
+ )
+ else:
+ new_pagination_state = None
+
+ room_entries = [r for r in room_entries if r.room_id in room_map]
+
+ sync_result_builder.pagination_state = new_pagination_state
+
+ sync_result_builder.full_state |= sync_result_builder.since_token is None
+
def handle_room_entries(room_entry):
return self._generate_room_entry(
sync_result_builder,
@@ -948,6 +1012,7 @@ class SyncHandler(object):
even if empty.
"""
newly_joined = room_builder.newly_joined
+ always_include = always_include or newly_joined or sync_result_builder.full_state
full_state = (
room_builder.full_state
or newly_joined
@@ -956,7 +1021,7 @@ class SyncHandler(object):
events = room_builder.events
# We want to shortcut out as early as possible.
- if not (always_include or account_data or ephemeral or full_state):
+ if not (always_include or account_data or ephemeral):
if events == [] and tags is None:
return
@@ -995,7 +1060,7 @@ class SyncHandler(object):
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
- if not (always_include or batch or account_data or ephemeral or full_state):
+ if not (always_include or batch or account_data or ephemeral):
return
state = yield self.compute_state_delta(
@@ -1037,13 +1102,12 @@ class SyncHandler(object):
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
@defer.inlineCallbacks
- def _get_room_timestamps_at_token(self, room_ids, token, sync_config,
- limit):
+ def _get_room_timestamps_at_token(self, room_ids, token, sync_config, limit):
room_to_entries = {}
@defer.inlineCallbacks
def _get_last_ts(room_id):
- entry = yield self.store.get_last_ts_for_room(
+ entry = yield self.store.get_last_event_id_ts_for_room(
room_id, token.room_key
)
@@ -1096,6 +1160,23 @@ class SyncHandler(object):
defer.returnValue(to_return)
+ @defer.inlineCallbacks
+ def _get_rooms_that_need_full_state(self, room_ids, sync_config, since_token,
+ pagination_state):
+ start_ts = yield self._get_room_timestamps_at_token(
+ room_ids, since_token,
+ sync_config=sync_config,
+ limit=pagination_state.limit,
+ )
+
+ missing_list = frozenset(
+ room_id for room_id, ts in
+ sorted(start_ts.items(), key=lambda item: -item[1])
+ if ts < pagination_state.value
+ )
+
+ defer.returnValue(missing_list)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1147,6 +1228,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
+
+ __slots__ = (
+ "sync_config", "full_state", "batch_token", "since_token", "pagination_state",
+ "now_token", "presence", "account_data", "joined", "invited", "archived",
+ )
+
def __init__(self, sync_config, full_state, batch_token, now_token):
"""
Args:
@@ -1159,6 +1246,7 @@ class SyncResultBuilder(object):
self.full_state = full_state
self.batch_token = batch_token
self.since_token = batch_token.stream_token if batch_token else None
+ self.pagination_state = batch_token.pagination_state if batch_token else None
self.now_token = now_token
self.presence = []
@@ -1172,6 +1260,12 @@ class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
"""
+
+ __slots__ = (
+ "room_id", "rtype", "events", "newly_joined", "full_state", "since_token",
+ "upto_token",
+ )
+
def __init__(self, room_id, rtype, events, newly_joined, full_state,
since_token, upto_token):
"""
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index e1ece5d406..3df9743132 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -19,7 +19,7 @@ from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean,
parse_json_object_from_request,
)
-from synapse.handlers.sync import SyncConfig
+from synapse.handlers.sync import SyncConfig, SyncPaginationConfig
from synapse.types import SyncNextBatchToken
from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_room_id,
@@ -116,6 +116,7 @@ class SyncRestServlet(RestServlet):
filter_id = body.get("filter_id", None)
filter_dict = body.get("filter", None)
+ pagination_config = body.get("pagination_config", None)
if filter_dict is not None and filter_id is not None:
raise SynapseError(
@@ -143,6 +144,10 @@ class SyncRestServlet(RestServlet):
filter_collection=filter_collection,
is_guest=requester.is_guest,
request_key=request_key,
+ pagination_config=SyncPaginationConfig(
+ order=pagination_config["order"],
+ limit=pagination_config["limit"],
+ ) if pagination_config else None,
)
if since is not None:
@@ -213,6 +218,7 @@ class SyncRestServlet(RestServlet):
filter_collection=filter,
is_guest=requester.is_guest,
request_key=request_key,
+ pagination_config=None,
)
if since is not None:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a85cdac038..ab991e877d 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -547,7 +547,7 @@ class StreamStore(SQLBaseStore):
else:
return None
- return self.runInteraction("get_last_ts_for_room", f)
+ return self.runInteraction("get_last_event_id_ts_for_room", f)
@defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit):
diff --git a/synapse/types.py b/synapse/types.py
index 0c9efdfd00..fd7c0ffe7a 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -118,7 +118,7 @@ class EventID(DomainSpecificString):
class SyncNextBatchToken(
namedtuple("SyncNextBatchToken", (
"stream_token",
- "pagination_config",
+ "pagination_state",
))
):
@classmethod
@@ -127,10 +127,10 @@ class SyncNextBatchToken(
d = json.loads(decode_base64(string))
pa = d.get("pa", None)
if pa:
- pa = SyncPaginationConfig.from_dict(pa)
+ pa = SyncPaginationState.from_dict(pa)
return cls(
stream_token=StreamToken.from_string(d["t"]),
- pagination_config=pa,
+ pagination_state=pa,
)
except:
raise SynapseError(400, "Invalid Token")
@@ -138,23 +138,24 @@ class SyncNextBatchToken(
def to_string(self):
return encode_base64(json.dumps({
"t": self.stream_token.to_string(),
- "pa": self.pagination_config.to_dict() if self.pagination_config else None,
+ "pa": self.pagination_state.to_dict() if self.pagination_state else None,
}))
def replace(self, **kwargs):
return self._replace(**kwargs)
-class SyncPaginationConfig(
- namedtuple("SyncPaginationConfig", (
+class SyncPaginationState(
+ namedtuple("SyncPaginationState", (
"order",
"value",
+ "limit",
))
):
@classmethod
def from_dict(cls, d):
try:
- return cls(d["o"], d["v"])
+ return cls(d["o"], d["v"], d["l"])
except:
raise SynapseError(400, "Invalid Token")
@@ -162,6 +163,7 @@ class SyncPaginationConfig(
return {
"o": self.order,
"v": self.value,
+ "l": self.limit,
}
def replace(self, **kwargs):
|