summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events.py12
-rw-r--r--synapse/storage/databases/main/events_worker.py32
-rw-r--r--synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql17
3 files changed, 42 insertions, 19 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b4abd961b9..b19c424ba9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -426,12 +426,12 @@ class PersistEventsStore:
                 # so that async background tasks get told what happened.
                 sql = """
                     INSERT INTO current_state_delta_stream
-                        (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                    SELECT ?, room_id, type, state_key, null, event_id
+                        (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, room_id, type, state_key, null, event_id
                         FROM current_state_events
                         WHERE room_id = ?
                 """
-                txn.execute(sql, (stream_id, room_id))
+                txn.execute(sql, (stream_id, self._instance_name, room_id))
 
                 self.db_pool.simple_delete_txn(
                     txn, table="current_state_events", keyvalues={"room_id": room_id},
@@ -452,8 +452,8 @@ class PersistEventsStore:
                 #
                 sql = """
                     INSERT INTO current_state_delta_stream
-                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                    SELECT ?, ?, ?, ?, ?, (
+                    (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, ?, (
                         SELECT event_id FROM current_state_events
                         WHERE room_id = ? AND type = ? AND state_key = ?
                     )
@@ -463,6 +463,7 @@ class PersistEventsStore:
                     (
                         (
                             stream_id,
+                            self._instance_name,
                             room_id,
                             etype,
                             state_key,
@@ -755,6 +756,7 @@ class PersistEventsStore:
                         "event_stream_ordering": stream_order,
                         "event_id": event.event_id,
                         "state_group": state_group_id,
+                        "instance_name": self._instance_name,
                     },
                 )
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index b7ed8ca6ab..4e74fafe43 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1034,16 +1034,12 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {"v1": complexity_v1}
 
-    def get_current_backfill_token(self):
-        """The current minimum token that backfilled events have reached"""
-        return -self._backfill_id_gen.get_current_token()
-
     def get_current_events_token(self):
         """The current maximum token that events have reached"""
         return self._stream_id_gen.get_current_token()
 
     async def get_all_new_forward_event_rows(
-        self, last_id: int, current_id: int, limit: int
+        self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> List[Tuple]:
         """Returns new events, for the Events replication stream
 
@@ -1067,10 +1063,11 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < stream_ordering AND stream_ordering <= ?"
+                " AND instance_name = ?"
                 " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
             )
-            txn.execute(sql, (last_id, current_id, limit))
+            txn.execute(sql, (last_id, current_id, instance_name, limit))
             return txn.fetchall()
 
         return await self.db_pool.runInteraction(
@@ -1078,7 +1075,7 @@ class EventsWorkerStore(SQLBaseStore):
         )
 
     async def get_ex_outlier_stream_rows(
-        self, last_id: int, current_id: int
+        self, instance_name: str, last_id: int, current_id: int
     ) -> List[Tuple]:
         """Returns de-outliered events, for the Events replication stream
 
@@ -1097,16 +1094,17 @@ class EventsWorkerStore(SQLBaseStore):
                 "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " state_key, redacts, relates_to_id"
                 " FROM events AS e"
-                " INNER JOIN ex_outlier_stream USING (event_id)"
+                " INNER JOIN ex_outlier_stream AS out USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < event_stream_ordering"
                 " AND event_stream_ordering <= ?"
+                " AND out.instance_name = ?"
                 " ORDER BY event_stream_ordering ASC"
             )
 
-            txn.execute(sql, (last_id, current_id))
+            txn.execute(sql, (last_id, current_id, instance_name))
             return txn.fetchall()
 
         return await self.db_pool.runInteraction(
@@ -1119,6 +1117,9 @@ class EventsWorkerStore(SQLBaseStore):
         """Get updates for backfill replication stream, including all new
         backfilled events and events that have gone from being outliers to not.
 
+        NOTE: The IDs given here are from replication, and so should be
+        *positive*.
+
         Args:
             instance_name: The writer we want to fetch updates from. Unused
                 here since there is only ever one writer.
@@ -1149,10 +1150,11 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? > stream_ordering AND stream_ordering >= ?"
+                "  AND instance_name = ?"
                 " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
             )
-            txn.execute(sql, (-last_id, -current_id, limit))
+            txn.execute(sql, (-last_id, -current_id, instance_name, limit))
             new_event_updates = [(row[0], row[1:]) for row in txn]
 
             limited = False
@@ -1166,15 +1168,16 @@ class EventsWorkerStore(SQLBaseStore):
                 "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " state_key, redacts, relates_to_id"
                 " FROM events AS e"
-                " INNER JOIN ex_outlier_stream USING (event_id)"
+                " INNER JOIN ex_outlier_stream AS out USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? > event_stream_ordering"
                 " AND event_stream_ordering >= ?"
+                " AND out.instance_name = ?"
                 " ORDER BY event_stream_ordering DESC"
             )
-            txn.execute(sql, (-last_id, -upper_bound))
+            txn.execute(sql, (-last_id, -upper_bound, instance_name))
             new_event_updates.extend((row[0], row[1:]) for row in txn)
 
             if len(new_event_updates) >= limit:
@@ -1188,7 +1191,7 @@ class EventsWorkerStore(SQLBaseStore):
         )
 
     async def get_all_updated_current_state_deltas(
-        self, from_token: int, to_token: int, target_row_count: int
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
     ) -> Tuple[List[Tuple], int, bool]:
         """Fetch updates from current_state_delta_stream
 
@@ -1214,9 +1217,10 @@ class EventsWorkerStore(SQLBaseStore):
                 SELECT stream_id, room_id, type, state_key, event_id
                 FROM current_state_delta_stream
                 WHERE ? < stream_id AND stream_id <= ?
+                    AND instance_name = ?
                 ORDER BY stream_id ASC LIMIT ?
             """
-            txn.execute(sql, (from_token, to_token, target_row_count))
+            txn.execute(sql, (from_token, to_token, instance_name, target_row_count))
             return txn.fetchall()
 
         def get_deltas_for_stream_id_txn(txn, stream_id):
diff --git a/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql
new file mode 100644
index 0000000000..ad1f481428
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE current_state_delta_stream ADD COLUMN instance_name TEXT;
+ALTER TABLE ex_outlier_stream ADD COLUMN instance_name TEXT;