diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 497744b9af..426a93b8dd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -202,19 +202,64 @@ class SyncHandler(BaseHandler):
return self.incremental_sync_with_gap(sync_config, batch_token)
@defer.inlineCallbacks
- def _get_room_timestamps_at_token(self, room_ids, token):
- room_to_last_ts = {}
+ def _get_room_timestamps_at_token(self, room_ids, token, sync_config,
+ limit):
+ room_to_entries = {}
@defer.inlineCallbacks
def _get_last_ts(room_id):
- ts = yield self.store.get_last_ts_for_room(
+ entry = yield self.store.get_last_ts_for_room(
room_id, token.room_key
)
- room_to_last_ts[room_id] = ts if ts else 0
- logger.info("room_to_last_ts: %r", room_to_last_ts)
+ # TODO: Is this ever possible?
+ room_to_entries[room_id] = entry if entry else {
+ "origin_server_ts": 0,
+ }
+
yield concurrently_execute(_get_last_ts, room_ids, 10)
- defer.returnValue(room_to_last_ts)
+
+ if len(room_to_entries) <= limit:
+ defer.returnValue({
+ room_id: entry["origin_server_ts"]
+ for room_id, entry in room_to_entries.items()
+ })
+
+ queued_events = sorted(
+ room_to_entries.items(),
+ key=lambda e: e[1]["origin_server_ts"]
+ )
+
+ to_return = {}
+
+ while len(to_return) < limit and len(queued_events) > 0:
+ to_fetch = queued_events[:limit - len(to_return)]
+ event_to_q = {
+ e["event_id"]: (room_id, e) for room_id, e in to_fetch.items()
+ if "event_id" in e
+ }
+
+ # Now we fetch each event to check if its been filtered out
+ event_map = yield self.store.get_events(event_to_q.keys())
+
+ recents = sync_config.filter_collection.filter_room_timeline(
+ event_map.values()
+ )
+ recents = yield filter_events_for_client(
+ self.store,
+ sync_config.user.to_string(),
+ recents,
+ )
+
+ to_return.update({r.room_id: r.origin_server_ts for r in recents})
+
+ for ev_id in set(event_map.keys()) - set(r.event_id for r in recents):
+ queued_events.append(event_to_q[ev_id])
+
+ # FIXME: Need to refetch TS
+ queued_events.sort(key=lambda e: e[1]["origin_server_ts"])
+
+ defer.returnValue(to_return)
@defer.inlineCallbacks
def full_state_sync(self, sync_config, batch_token):
@@ -292,6 +337,8 @@ class SyncHandler(BaseHandler):
e.room_id for e in room_list if e.membership == Membership.JOIN
],
token=now_token,
+ sync_config=sync_config,
+ limit=pagination_limit,
)
if room_to_last_ts:
@@ -512,8 +559,13 @@ class SyncHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts):
- start_ts = yield self._get_room_timestamps_at_token(room_ids, since_token)
+ def _get_rooms_that_need_full_state(self, room_ids, since_token, pa_ts,
+ sync_config, pagination_limit):
+ start_ts = yield self._get_room_timestamps_at_token(
+ room_ids, since_token,
+ sync_config=sync_config,
+ limit=pagination_limit,
+ )
missing_list = frozenset(
room_id for room_id, ts in
@@ -675,10 +727,13 @@ class SyncHandler(BaseHandler):
p_room_token = room_pagination_config.get("t", None)
if p_room_token:
+ pa_limit = room_pagination_config["l"]
needing_full_state = yield self._get_rooms_that_need_full_state(
[room_id],
since_token,
room_pagination_config.get("ts", 0),
+ sync_config=sync_config,
+ pagination_limit=pa_limit,
)
need_full_state = room_id in needing_full_state
else:
@@ -716,10 +771,13 @@ class SyncHandler(BaseHandler):
if not room_sync.timeline:
p_room_token = room_pagination_config.get("t", None)
if p_room_token:
+ pa_limit = room_pagination_config["l"]
needing_full_state = yield self._get_rooms_that_need_full_state(
[room_id],
since_token,
room_pagination_config.get("ts", 0),
+ sync_config=sync_config,
+ pagination_limit=pa_limit,
)
if room_id in needing_full_state:
continue
|