summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/user_directory.py39
1 files changed, 30 insertions, 9 deletions
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 15b8ea0460..9137fc24ea 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -207,16 +207,37 @@ class UserDirectoryStore(SQLBaseStore):
         if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
             return []
 
-        # TODO: Add limit
-        sql = """
-            SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
-            FROM current_state_delta_stream
-            WHERE stream_id > ?
-            ORDER BY stream_id ASC
-        """
+        def get_current_state_deltas_txn(txn):
+            # First we calculate the max stream id that will give us less than
+            # N results
+            sql = """
+                SELECT stream_id, count(*)
+                FROM current_state_delta_stream
+                WHERE stream_id > ?
+                GROUP BY stream_id
+                ORDER BY stream_id ASC
+                LIMIT 100
+            """
+            txn.execute(sql, (prev_stream_id,))
+
+            total = 0
+            for max_stream_id, count in txn:
+                total += count
+                if total > 50:
+                    break
 
-        return self._execute(
-            "get_current_state_deltas", self.cursor_to_dict, sql, prev_stream_id
+            # Now actually get the deltas
+            sql = """
+                SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(sql, (prev_stream_id, max_stream_id,))
+            return self.cursor_to_dict(txn)
+
+        return self.runInteraction(
+            "get_current_state_deltas", get_current_state_deltas_txn
         )
 
     def get_max_stream_id_in_current_state_deltas(self):