diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4c5b935012..33c05950c5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -24,6 +24,8 @@ from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
+from synapse.types import SyncNextBatchToken
+
from twisted.internet import defer
import collections
@@ -141,7 +143,7 @@ class SyncHandler(BaseHandler):
self.clock = hs.get_clock()
self.response_cache = ResponseCache()
- def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
+ def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
@@ -154,53 +156,68 @@ class SyncHandler(BaseHandler):
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
- sync_config, since_token, timeout, full_state
+ sync_config, batch_token, timeout, full_state
)
)
return result
@defer.inlineCallbacks
- def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
+ def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
full_state):
context = LoggingContext.current_context()
if context:
- if since_token is None:
+ if batch_token is None:
context.tag = "initial_sync"
elif full_state:
context.tag = "full_state_sync"
else:
context.tag = "incremental_sync"
- if timeout == 0 or since_token is None or full_state:
+ if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(
- sync_config, since_token, full_state=full_state,
+ sync_config, batch_token, full_state=full_state,
)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
- return self.current_sync_for_user(sync_config, since_token)
+ return self.current_sync_for_user(sync_config, batch_token)
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback,
- from_token=since_token,
+ from_token=batch_token.stream_token,
)
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, since_token=None,
+ def current_sync_for_user(self, sync_config, batch_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
- if since_token is None or full_state:
- return self.full_state_sync(sync_config, since_token)
+ if batch_token is None or full_state:
+ return self.full_state_sync(sync_config, batch_token)
else:
- return self.incremental_sync_with_gap(sync_config, since_token)
+ return self.incremental_sync_with_gap(sync_config, batch_token)
@defer.inlineCallbacks
- def full_state_sync(self, sync_config, timeline_since_token):
+ def _get_room_timestamps_at_token(self, room_ids, token):
+ room_to_last_ts = {}
+
+ @defer.inlineCallbacks
+ def _get_last_ts(room_id):
+ ts = yield self.store.get_last_ts_for_room(
+ room_id, token.room_key
+ )
+ room_to_last_ts[room_id] = ts if ts else 0
+
+ logger.info("room_to_last_ts: %r", room_to_last_ts)
+ yield concurrently_execute(_get_last_ts, room_ids, 10)
+ defer.returnValue(room_to_last_ts)
+
+ @defer.inlineCallbacks
+ def full_state_sync(self, sync_config, batch_token):
"""Get a sync for a client which is starting without any state.
If a 'message_since_token' is given, only timeline events which have
@@ -209,6 +226,11 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
+ if batch_token:
+ timeline_since_token = batch_token.stream_token
+ else:
+ timeline_since_token = None
+
now_token = yield self.event_sources.get_current_token()
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
@@ -258,24 +280,22 @@ class SyncHandler(BaseHandler):
user_id = sync_config.user.to_string()
- room_to_last_ts = {}
-
- @defer.inlineCallbacks
- def _get_last_ts(event):
- room_id = event.room_id
- if event.membership == Membership.JOIN:
- ts = yield self.store.get_last_ts_for_room(
- room_id, now_token.room_key
- )
- room_to_last_ts[room_id] = ts if ts else 0
+ pagination_limit = 20
+ room_pagination_config = {
+ "l": pagination_limit,
+ "o": 0,
+ "t": now_token,
+ }
- logger.info("room_to_last_ts: %r", room_to_last_ts)
- yield concurrently_execute(_get_last_ts, room_list, 10)
+ room_to_last_ts = yield self._get_room_timestamps_at_token(
+ room_ids=[e.room_id for e in room_list if e.membership == Membership.JOIN],
+ token=now_token,
+ )
joined_rooms_list = frozenset([
room_id for room_id, _f in
sorted(room_to_last_ts.items(), key=lambda item: -item[1])
- ][:20])
+ ][:pagination_limit])
@defer.inlineCallbacks
def _generate_room_entry(event):
@@ -326,9 +346,7 @@ class SyncHandler(BaseHandler):
self.account_data_for_user(account_data)
)
- presence = sync_config.filter_collection.filter_presence(
- presence
- )
+ presence = sync_config.filter_collection.filter_presence(presence)
defer.returnValue(SyncResult(
presence=presence,
@@ -336,7 +354,10 @@ class SyncHandler(BaseHandler):
joined=joined,
invited=invited,
archived=archived,
- next_batch=now_token,
+ next_batch=SyncNextBatchToken(
+ stream_token=now_token,
+ pagination_config=room_pagination_config,
+ ),
))
@defer.inlineCallbacks
@@ -480,12 +501,14 @@ class SyncHandler(BaseHandler):
)
@defer.inlineCallbacks
- def incremental_sync_with_gap(self, sync_config, since_token):
+ def incremental_sync_with_gap(self, sync_config, batch_token):
""" Get the incremental delta needed to bring the client up to
date with the server.
Returns:
A Deferred SyncResult.
"""
+ since_token = batch_token.stream_token
+
now_token = yield self.event_sources.get_current_token()
rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
@@ -693,7 +716,7 @@ class SyncHandler(BaseHandler):
joined=joined,
invited=invited,
archived=archived,
- next_batch=now_token,
+ next_batch=batch_token.replace(stream_token=now_token),
))
@defer.inlineCallbacks
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 60d3dc4030..37fd1539f6 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -19,7 +19,7 @@ from synapse.http.servlet import (
RestServlet, parse_string, parse_integer, parse_boolean
)
from synapse.handlers.sync import SyncConfig
-from synapse.types import StreamToken
+from synapse.types import SyncNextBatchToken
from synapse.events.utils import (
serialize_event, format_event_for_client_v2_without_room_id,
)
@@ -140,9 +140,9 @@ class SyncRestServlet(RestServlet):
)
if since is not None:
- since_token = StreamToken.from_string(since)
+ batch_token = SyncNextBatchToken.from_string(since)
else:
- since_token = None
+ batch_token = None
affect_presence = set_presence != PresenceState.OFFLINE
@@ -154,7 +154,7 @@ class SyncRestServlet(RestServlet):
)
with context:
sync_result = yield self.sync_handler.wait_for_sync_for_user(
- sync_config, since_token=since_token, timeout=timeout,
+ sync_config, batch_token=batch_token, timeout=timeout,
full_state=full_state
)
diff --git a/synapse/types.py b/synapse/types.py
index 42fd9c7204..b53d91747b 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -17,6 +17,9 @@ from synapse.api.errors import SynapseError
from collections import namedtuple
+from unpaddedbase64 import encode_base64, decode_base64
+import ujson as json
+
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
@@ -112,6 +115,30 @@ class EventID(DomainSpecificString):
SIGIL = "$"
+class SyncNextBatchToken(
+ namedtuple("SyncNextBatchToken", (
+ "stream_token",
+ "pagination_config",
+ ))
+):
+ @classmethod
+ def from_string(cls, string):
+ try:
+ d = json.loads(decode_base64(string))
+ return cls(StreamToken.from_string(d["t"]), d.get("pa", {}))
+ except:
+ raise SynapseError(400, "Invalid Token")
+
+ def to_string(self):
+ return encode_base64(json.dumps({
+ "t": self.stream_token.to_string(),
+ "pa": self.pagination_config,
+ }))
+
+ def replace(self, **kwargs):
+ return self._replace(**kwargs)
+
+
class StreamToken(
namedtuple("Token", (
"room_key",
|