diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b4abd961b9..b19c424ba9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -426,12 +426,12 @@ class PersistEventsStore:
# so that async background tasks get told what happened.
sql = """
INSERT INTO current_state_delta_stream
- (stream_id, room_id, type, state_key, event_id, prev_event_id)
- SELECT ?, room_id, type, state_key, null, event_id
+ (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, room_id, type, state_key, null, event_id
FROM current_state_events
WHERE room_id = ?
"""
- txn.execute(sql, (stream_id, room_id))
+ txn.execute(sql, (stream_id, self._instance_name, room_id))
self.db_pool.simple_delete_txn(
txn, table="current_state_events", keyvalues={"room_id": room_id},
@@ -452,8 +452,8 @@ class PersistEventsStore:
#
sql = """
INSERT INTO current_state_delta_stream
- (stream_id, room_id, type, state_key, event_id, prev_event_id)
- SELECT ?, ?, ?, ?, ?, (
+ (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, ?, (
SELECT event_id FROM current_state_events
WHERE room_id = ? AND type = ? AND state_key = ?
)
@@ -463,6 +463,7 @@ class PersistEventsStore:
(
(
stream_id,
+ self._instance_name,
room_id,
etype,
state_key,
@@ -755,6 +756,7 @@ class PersistEventsStore:
"event_stream_ordering": stream_order,
"event_id": event.event_id,
"state_group": state_group_id,
+ "instance_name": self._instance_name,
},
)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index b7ed8ca6ab..4e74fafe43 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1034,16 +1034,12 @@ class EventsWorkerStore(SQLBaseStore):
return {"v1": complexity_v1}
- def get_current_backfill_token(self):
- """The current minimum token that backfilled events have reached"""
- return -self._backfill_id_gen.get_current_token()
-
def get_current_events_token(self):
"""The current maximum token that events have reached"""
return self._stream_id_gen.get_current_token()
async def get_all_new_forward_event_rows(
- self, last_id: int, current_id: int, limit: int
+ self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple]:
"""Returns new events, for the Events replication stream
@@ -1067,10 +1063,11 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
+ " AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
- txn.execute(sql, (last_id, current_id, limit))
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
return txn.fetchall()
return await self.db_pool.runInteraction(
@@ -1078,7 +1075,7 @@ class EventsWorkerStore(SQLBaseStore):
)
async def get_ex_outlier_stream_rows(
- self, last_id: int, current_id: int
+ self, instance_name: str, last_id: int, current_id: int
) -> List[Tuple]:
"""Returns de-outliered events, for the Events replication stream
@@ -1097,16 +1094,17 @@ class EventsWorkerStore(SQLBaseStore):
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" FROM events AS e"
- " INNER JOIN ex_outlier_stream USING (event_id)"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
+ " AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
- txn.execute(sql, (last_id, current_id))
+ txn.execute(sql, (last_id, current_id, instance_name))
return txn.fetchall()
return await self.db_pool.runInteraction(
@@ -1119,6 +1117,9 @@ class EventsWorkerStore(SQLBaseStore):
"""Get updates for backfill replication stream, including all new
backfilled events and events that have gone from being outliers to not.
+ NOTE: The IDs given here are from replication, and so should be
+ *positive*.
+
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
@@ -1149,10 +1150,11 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
+ " AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
- txn.execute(sql, (-last_id, -current_id, limit))
+ txn.execute(sql, (-last_id, -current_id, instance_name, limit))
new_event_updates = [(row[0], row[1:]) for row in txn]
limited = False
@@ -1166,15 +1168,16 @@ class EventsWorkerStore(SQLBaseStore):
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" FROM events AS e"
- " INNER JOIN ex_outlier_stream USING (event_id)"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
+ " AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
- txn.execute(sql, (-last_id, -upper_bound))
+ txn.execute(sql, (-last_id, -upper_bound, instance_name))
new_event_updates.extend((row[0], row[1:]) for row in txn)
if len(new_event_updates) >= limit:
@@ -1188,7 +1191,7 @@ class EventsWorkerStore(SQLBaseStore):
)
async def get_all_updated_current_state_deltas(
- self, from_token: int, to_token: int, target_row_count: int
+ self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple], int, bool]:
"""Fetch updates from current_state_delta_stream
@@ -1214,9 +1217,10 @@ class EventsWorkerStore(SQLBaseStore):
SELECT stream_id, room_id, type, state_key, event_id
FROM current_state_delta_stream
WHERE ? < stream_id AND stream_id <= ?
+ AND instance_name = ?
ORDER BY stream_id ASC LIMIT ?
"""
- txn.execute(sql, (from_token, to_token, target_row_count))
+ txn.execute(sql, (from_token, to_token, instance_name, target_row_count))
return txn.fetchall()
def get_deltas_for_stream_id_txn(txn, stream_id):
diff --git a/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql
new file mode 100644
index 0000000000..ad1f481428
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE current_state_delta_stream ADD COLUMN instance_name TEXT;
+ALTER TABLE ex_outlier_stream ADD COLUMN instance_name TEXT;
|