diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 15b8ea0460..9137fc24ea 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -207,16 +207,37 @@ class UserDirectoryStore(SQLBaseStore):
if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
return []
- # TODO: Add limit
- sql = """
- SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
- FROM current_state_delta_stream
- WHERE stream_id > ?
- ORDER BY stream_id ASC
- """
+ def get_current_state_deltas_txn(txn):
+ # First we calculate the max stream id that will give us less than
+ # N results
+ sql = """
+ SELECT stream_id, count(*)
+ FROM current_state_delta_stream
+ WHERE stream_id > ?
+ GROUP BY stream_id
+ ORDER BY stream_id ASC
+ LIMIT 100
+ """
+ txn.execute(sql, (prev_stream_id,))
+
+ total = 0
+ for max_stream_id, count in txn:
+ total += count
+ if total > 50:
+ break
- return self._execute(
- "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id
+ # Now actually get the deltas
+ sql = """
+ SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+ FROM current_state_delta_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ """
+ txn.execute(sql, (prev_stream_id, max_stream_id,))
+ return self.cursor_to_dict(txn)
+
+ return self.runInteraction(
+ "get_current_state_deltas", get_current_state_deltas_txn
)
def get_max_stream_id_in_current_state_deltas(self):
|