diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a880845605..fef81f5f9b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -49,6 +49,12 @@ SYNC_PAGINATION_ORDER_TS = "o"
SYNC_PAGINATION_VALID_ORDERS = (SYNC_PAGINATION_ORDER_TS,)
+SyncExtras = collections.namedtuple("SyncExtras", [
+ "paginate",
+ "rooms",
+])
+
+
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
@@ -152,7 +158,7 @@ class SyncHandler(object):
self.response_cache = ResponseCache()
def wait_for_sync_for_user(self, sync_config, batch_token=None, timeout=0,
- full_state=False):
+ full_state=False, extras=None):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@@ -164,14 +170,14 @@ class SyncHandler(object):
result = self.response_cache.set(
sync_config.request_key,
self._wait_for_sync_for_user(
- sync_config, batch_token, timeout, full_state
+ sync_config, batch_token, timeout, full_state, extras,
)
)
return result
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, batch_token, timeout,
- full_state):
+ full_state, extras=None):
context = LoggingContext.current_context()
if context:
if batch_token is None:
@@ -184,13 +190,15 @@ class SyncHandler(object):
if timeout == 0 or batch_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
- result = yield self.current_sync_for_user(
- sync_config, batch_token, full_state=full_state,
+ result = yield self.generate_sync_result(
+ sync_config, batch_token, full_state=full_state, extras=extras,
)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
- return self.current_sync_for_user(sync_config, batch_token)
+ return self.generate_sync_result(
+ sync_config, batch_token, full_state=False, extras=extras,
+ )
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback,
@@ -198,14 +206,6 @@ class SyncHandler(object):
)
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, batch_token=None,
- full_state=False):
- """Get the sync for client needed to match what the server has now.
- Returns:
- A Deferred SyncResult.
- """
- return self.generate_sync_result(sync_config, batch_token, full_state)
-
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
@@ -502,7 +502,8 @@ class SyncHandler(object):
defer.returnValue(None)
@defer.inlineCallbacks
- def generate_sync_result(self, sync_config, batch_token=None, full_state=False):
+ def generate_sync_result(self, sync_config, batch_token=None, full_state=False,
+ extras=None):
"""Generates a sync result.
Args:
@@ -531,7 +532,7 @@ class SyncHandler(object):
)
res = yield self._generate_sync_entry_for_rooms(
- sync_result_builder, account_data_by_room
+ sync_result_builder, account_data_by_room, extras,
)
newly_joined_rooms, newly_joined_users = res
@@ -658,7 +659,8 @@ class SyncHandler(object):
sync_result_builder.presence = presence
@defer.inlineCallbacks
- def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room,
+ extras):
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -713,6 +715,8 @@ class SyncHandler(object):
else:
pagination_config = None
+ include_map = extras.get("peek", {}) if extras else {}
+
if sync_result_builder.pagination_state:
missing_state = yield self._get_rooms_that_need_full_state(
room_ids=[r.room_id for r in room_entries],
@@ -725,34 +729,54 @@ class SyncHandler(object):
for r in room_entries:
if r.room_id in missing_state:
r.full_state = True
+ if r.room_id in include_map:
+ r.always_include = True
+ r.events = None
+ r.since_token = None
+ r.upto_token = now_token
- new_pagination_state = None
if pagination_config:
room_ids = [r.room_id for r in room_entries]
pagination_limit = pagination_config.limit
+ extra_limit = extras.get("paginate", {}).get("limit", 0) if extras else 0
+
room_map = yield self._get_room_timestamps_at_token(
- room_ids, sync_result_builder.now_token, sync_config, pagination_limit
+ room_ids, sync_result_builder.now_token, sync_config,
+ pagination_limit + extra_limit,
)
if room_map:
sorted_list = sorted(
room_map.items(),
key=lambda item: -item[1]
- )[:pagination_limit]
+ )[:pagination_limit + extra_limit]
+
+ if sorted_list[pagination_limit:]:
+ new_room_ids = set(r[0] for r in sorted_list[pagination_limit:])
+ for r in room_entries:
+ if r.room_id in new_room_ids:
+ r.full_state = True
+ r.always_include = True
+ r.since_token = None
+ r.upto_token = now_token
+ r.events = None
_, bottom_ts = sorted_list[-1]
value = bottom_ts
- new_pagination_state = SyncPaginationState(
- order=pagination_config.order, value=value, limit=pagination_limit,
+ sync_result_builder.pagination_state = SyncPaginationState(
+ order=pagination_config.order, value=value,
+ limit=pagination_limit + extra_limit,
)
- else:
- new_pagination_state = None
- room_entries = [r for r in room_entries if r.room_id in room_map]
+ if len(room_map) == len(room_entries):
+ sync_result_builder.pagination_state = None
- sync_result_builder.pagination_state = new_pagination_state
+ room_entries = [
+ r for r in room_entries
+ if r.room_id in room_map or r.always_include
+ ]
sync_result_builder.full_state |= sync_result_builder.since_token is None
@@ -764,7 +788,6 @@ class SyncHandler(object):
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
account_data=account_data_by_room.get(room_entry.room_id, {}),
- always_include=sync_result_builder.full_state,
)
yield concurrently_execute(handle_room_entries, room_entries, 10)
@@ -995,8 +1018,7 @@ class SyncHandler(object):
@defer.inlineCallbacks
def _generate_room_entry(self, sync_result_builder, ignored_users,
- room_builder, ephemeral, tags, account_data,
- always_include=False):
+ room_builder, ephemeral, tags, account_data):
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
@@ -1012,7 +1034,11 @@ class SyncHandler(object):
even if empty.
"""
newly_joined = room_builder.newly_joined
- always_include = always_include or newly_joined or sync_result_builder.full_state
+ always_include = (
+ newly_joined
+ or sync_result_builder.full_state
+ or room_builder.always_include
+ )
full_state = (
room_builder.full_state
or newly_joined
@@ -1025,7 +1051,6 @@ class SyncHandler(object):
if events == [] and tags is None:
return
- since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -1166,7 +1191,7 @@ class SyncHandler(object):
start_ts = yield self._get_room_timestamps_at_token(
room_ids, since_token,
sync_config=sync_config,
- limit=pagination_state.limit,
+ limit=len(room_ids),
)
missing_list = frozenset(
@@ -1263,7 +1288,7 @@ class RoomSyncResultBuilder(object):
__slots__ = (
"room_id", "rtype", "events", "newly_joined", "full_state", "since_token",
- "upto_token",
+ "upto_token", "always_include",
)
def __init__(self, room_id, rtype, events, newly_joined, full_state,
@@ -1286,3 +1311,4 @@ class RoomSyncResultBuilder(object):
self.full_state = full_state
self.since_token = since_token
self.upto_token = upto_token
+ self.always_include = False
|