diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 921215469f..4c5b935012 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -258,19 +258,39 @@ class SyncHandler(BaseHandler):
user_id = sync_config.user.to_string()
+ room_to_last_ts = {}
+
@defer.inlineCallbacks
- def _generate_room_entry(event):
+ def _get_last_ts(event):
+ room_id = event.room_id
if event.membership == Membership.JOIN:
- room_result = yield self.full_state_sync_for_joined_room(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
+ ts = yield self.store.get_last_ts_for_room(
+ room_id, now_token.room_key
)
- joined.append(room_result)
+ room_to_last_ts[room_id] = ts if ts else 0
+
+ logger.info("room_to_last_ts: %r", room_to_last_ts)
+ yield concurrently_execute(_get_last_ts, room_list, 10)
+
+ joined_rooms_list = frozenset([
+ room_id for room_id, _f in
+ sorted(room_to_last_ts.items(), key=lambda item: -item[1])
+ ][:20])
+
+ @defer.inlineCallbacks
+ def _generate_room_entry(event):
+ if event.membership == Membership.JOIN:
+ if event.room_id in joined_rooms_list:
+ room_result = yield self.full_state_sync_for_joined_room(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ joined.append(room_result)
elif event.membership == Membership.INVITE:
if event.sender in ignored_users:
return
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 95b12559a6..60a8384fae 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -529,6 +529,26 @@ class StreamStore(SQLBaseStore):
int(stream),
)
+ def get_last_ts_for_room(self, room_id, token):
+ stream_ordering = RoomStreamToken.parse_stream_token(token).stream
+
+ sql = (
+ "SELECT origin_server_ts FROM events"
+ " WHERE room_id = ? AND stream_ordering <= ?"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT 1"
+ )
+
+ def f(txn):
+ txn.execute(sql, (room_id, stream_ordering))
+ rows = txn.fetchall()
+ if rows:
+ return rows[0][0]
+ else:
+ return None
+
+ return self.runInteraction("get_last_ts_for_room", f)
+
@defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit):
"""Retrieve events and pagination tokens around a given event in a
|