summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2022-07-19 12:45:17 +0100
committerGitHub <noreply@github.com>2022-07-19 11:45:17 +0000
commitb9778673587941277e15b067ad39cdf084f7dde5 (patch)
tree1130f92b1869a63305aa4ceb1a12d540432f85fd /synapse/storage/databases
parentSafe async event cache (#13308) (diff)
downloadsynapse-b9778673587941277e15b067ad39cdf084f7dde5.tar.xz
Rate limit joins per-room (#13276)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events_worker.py22
1 files changed, 14 insertions, 8 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4435373146..5914a35420 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1490,7 +1490,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     async def get_all_new_forward_event_rows(
         self, instance_name: str, last_id: int, current_id: int, limit: int
-    ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+    ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
         """Returns new events, for the Events replication stream
 
         Args:
@@ -1506,10 +1506,11 @@ class EventsWorkerStore(SQLBaseStore):
 
         def get_all_new_forward_event_rows(
             txn: LoggingTransaction,
-        ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+        ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
             sql = (
                 "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
-                " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
+                " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
+                " e.outlier"
                 " FROM events AS e"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events AS se USING (event_id)"
@@ -1523,7 +1524,8 @@ class EventsWorkerStore(SQLBaseStore):
             )
             txn.execute(sql, (last_id, current_id, instance_name, limit))
             return cast(
-                List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
+                List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
+                txn.fetchall(),
             )
 
         return await self.db_pool.runInteraction(
@@ -1532,7 +1534,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     async def get_ex_outlier_stream_rows(
         self, instance_name: str, last_id: int, current_id: int
-    ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+    ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
         """Returns de-outliered events, for the Events replication stream
 
         Args:
@@ -1547,11 +1549,14 @@ class EventsWorkerStore(SQLBaseStore):
 
         def get_ex_outlier_stream_rows_txn(
             txn: LoggingTransaction,
-        ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+        ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
             sql = (
                 "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
-                " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
+                " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
+                " e.outlier"
                 " FROM events AS e"
+                # NB: the next line (inner join) is what makes this query different from
+                # get_all_new_forward_event_rows.
                 " INNER JOIN ex_outlier_stream AS out USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events AS se USING (event_id)"
@@ -1566,7 +1571,8 @@ class EventsWorkerStore(SQLBaseStore):
 
             txn.execute(sql, (last_id, current_id, instance_name))
             return cast(
-                List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
+                List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
+                txn.fetchall(),
             )
 
         return await self.db_pool.runInteraction(