summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/replication/tcp/streams/_base.py11
-rw-r--r--synapse/replication/tcp/streams/events.py6
-rw-r--r--synapse/storage/databases/main/events.py12
-rw-r--r--synapse/storage/databases/main/events_worker.py32
-rw-r--r--synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql17
5 files changed, 53 insertions, 25 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 54dccd15a6..61b282ab2d 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -240,13 +240,18 @@ class BackfillStream(Stream):
     ROW_TYPE = BackfillStreamRow
 
     def __init__(self, hs):
-        store = hs.get_datastore()
+        self.store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
-            current_token_without_instance(store.get_current_backfill_token),
-            store.get_all_new_backfill_event_rows,
+            self._current_token,
+            self.store.get_all_new_backfill_event_rows,
         )
 
+    def _current_token(self, instance_name: str) -> int:
+        # The backfill stream over replication operates on *positive* numbers,
+        # which means we need to negate it.
+        return -self.store._backfill_id_gen.get_current_token_for_writer(instance_name)
+
 
 class PresenceStream(Stream):
     PresenceStreamRow = namedtuple(
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ccc7ca30d8..82e9e0d64e 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -155,7 +155,7 @@ class EventsStream(Stream):
         # now we fetch up to that many rows from the events table
 
         event_rows = await self._store.get_all_new_forward_event_rows(
-            from_token, current_token, target_row_count
+            instance_name, from_token, current_token, target_row_count
         )  # type: List[Tuple]
 
         # we rely on get_all_new_forward_event_rows strictly honouring the limit, so
@@ -180,7 +180,7 @@ class EventsStream(Stream):
             upper_limit,
             state_rows_limited,
         ) = await self._store.get_all_updated_current_state_deltas(
-            from_token, upper_limit, target_row_count
+            instance_name, from_token, upper_limit, target_row_count
         )
 
         limited = limited or state_rows_limited
@@ -189,7 +189,7 @@ class EventsStream(Stream):
         # not to bother with the limit.
 
         ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
-            from_token, upper_limit
+            instance_name, from_token, upper_limit
         )  # type: List[Tuple]
 
         # we now need to turn the raw database rows returned into tuples suitable
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b4abd961b9..b19c424ba9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -426,12 +426,12 @@ class PersistEventsStore:
                 # so that async background tasks get told what happened.
                 sql = """
                     INSERT INTO current_state_delta_stream
-                        (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                    SELECT ?, room_id, type, state_key, null, event_id
+                        (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, room_id, type, state_key, null, event_id
                         FROM current_state_events
                         WHERE room_id = ?
                 """
-                txn.execute(sql, (stream_id, room_id))
+                txn.execute(sql, (stream_id, self._instance_name, room_id))
 
                 self.db_pool.simple_delete_txn(
                     txn, table="current_state_events", keyvalues={"room_id": room_id},
@@ -452,8 +452,8 @@ class PersistEventsStore:
                 #
                 sql = """
                     INSERT INTO current_state_delta_stream
-                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                    SELECT ?, ?, ?, ?, ?, (
+                    (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, ?, (
                         SELECT event_id FROM current_state_events
                         WHERE room_id = ? AND type = ? AND state_key = ?
                     )
@@ -463,6 +463,7 @@ class PersistEventsStore:
                     (
                         (
                             stream_id,
+                            self._instance_name,
                             room_id,
                             etype,
                             state_key,
@@ -755,6 +756,7 @@ class PersistEventsStore:
                         "event_stream_ordering": stream_order,
                         "event_id": event.event_id,
                         "state_group": state_group_id,
+                        "instance_name": self._instance_name,
                     },
                 )
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index b7ed8ca6ab..4e74fafe43 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1034,16 +1034,12 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {"v1": complexity_v1}
 
-    def get_current_backfill_token(self):
-        """The current minimum token that backfilled events have reached"""
-        return -self._backfill_id_gen.get_current_token()
-
     def get_current_events_token(self):
         """The current maximum token that events have reached"""
         return self._stream_id_gen.get_current_token()
 
     async def get_all_new_forward_event_rows(
-        self, last_id: int, current_id: int, limit: int
+        self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> List[Tuple]:
         """Returns new events, for the Events replication stream
 
@@ -1067,10 +1063,11 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < stream_ordering AND stream_ordering <= ?"
+                " AND instance_name = ?"
                 " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
             )
-            txn.execute(sql, (last_id, current_id, limit))
+            txn.execute(sql, (last_id, current_id, instance_name, limit))
             return txn.fetchall()
 
         return await self.db_pool.runInteraction(
@@ -1078,7 +1075,7 @@ class EventsWorkerStore(SQLBaseStore):
         )
 
     async def get_ex_outlier_stream_rows(
-        self, last_id: int, current_id: int
+        self, instance_name: str, last_id: int, current_id: int
     ) -> List[Tuple]:
         """Returns de-outliered events, for the Events replication stream
 
@@ -1097,16 +1094,17 @@ class EventsWorkerStore(SQLBaseStore):
                 "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " state_key, redacts, relates_to_id"
                 " FROM events AS e"
-                " INNER JOIN ex_outlier_stream USING (event_id)"
+                " INNER JOIN ex_outlier_stream AS out USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < event_stream_ordering"
                 " AND event_stream_ordering <= ?"
+                " AND out.instance_name = ?"
                 " ORDER BY event_stream_ordering ASC"
             )
 
-            txn.execute(sql, (last_id, current_id))
+            txn.execute(sql, (last_id, current_id, instance_name))
             return txn.fetchall()
 
         return await self.db_pool.runInteraction(
@@ -1119,6 +1117,9 @@ class EventsWorkerStore(SQLBaseStore):
         """Get updates for backfill replication stream, including all new
         backfilled events and events that have gone from being outliers to not.
 
+        NOTE: The IDs given here are from replication, and so should be
+        *positive*.
+
         Args:
             instance_name: The writer we want to fetch updates from. Unused
                 here since there is only ever one writer.
@@ -1149,10 +1150,11 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? > stream_ordering AND stream_ordering >= ?"
+                "  AND instance_name = ?"
                 " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
             )
-            txn.execute(sql, (-last_id, -current_id, limit))
+            txn.execute(sql, (-last_id, -current_id, instance_name, limit))
             new_event_updates = [(row[0], row[1:]) for row in txn]
 
             limited = False
@@ -1166,15 +1168,16 @@ class EventsWorkerStore(SQLBaseStore):
                 "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " state_key, redacts, relates_to_id"
                 " FROM events AS e"
-                " INNER JOIN ex_outlier_stream USING (event_id)"
+                " INNER JOIN ex_outlier_stream AS out USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? > event_stream_ordering"
                 " AND event_stream_ordering >= ?"
+                " AND out.instance_name = ?"
                 " ORDER BY event_stream_ordering DESC"
             )
-            txn.execute(sql, (-last_id, -upper_bound))
+            txn.execute(sql, (-last_id, -upper_bound, instance_name))
             new_event_updates.extend((row[0], row[1:]) for row in txn)
 
             if len(new_event_updates) >= limit:
@@ -1188,7 +1191,7 @@ class EventsWorkerStore(SQLBaseStore):
         )
 
     async def get_all_updated_current_state_deltas(
-        self, from_token: int, to_token: int, target_row_count: int
+        self, instance_name: str, from_token: int, to_token: int, target_row_count: int
     ) -> Tuple[List[Tuple], int, bool]:
         """Fetch updates from current_state_delta_stream
 
@@ -1214,9 +1217,10 @@ class EventsWorkerStore(SQLBaseStore):
                 SELECT stream_id, room_id, type, state_key, event_id
                 FROM current_state_delta_stream
                 WHERE ? < stream_id AND stream_id <= ?
+                    AND instance_name = ?
                 ORDER BY stream_id ASC LIMIT ?
             """
-            txn.execute(sql, (from_token, to_token, target_row_count))
+            txn.execute(sql, (from_token, to_token, instance_name, target_row_count))
             return txn.fetchall()
 
         def get_deltas_for_stream_id_txn(txn, stream_id):
diff --git a/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql
new file mode 100644
index 0000000000..ad1f481428
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE current_state_delta_stream ADD COLUMN instance_name TEXT;
+ALTER TABLE ex_outlier_stream ADD COLUMN instance_name TEXT;