diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4435373146..5914a35420 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1490,7 +1490,7 @@ class EventsWorkerStore(SQLBaseStore):
async def get_all_new_forward_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
- ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+ ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns new events, for the Events replication stream
Args:
@@ -1506,10 +1506,11 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_forward_event_rows(
txn: LoggingTransaction,
- ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+ ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
- " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
+ " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
+ " e.outlier"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
@@ -1523,7 +1524,8 @@ class EventsWorkerStore(SQLBaseStore):
)
txn.execute(sql, (last_id, current_id, instance_name, limit))
return cast(
- List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
+ List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
+ txn.fetchall(),
)
return await self.db_pool.runInteraction(
@@ -1532,7 +1534,7 @@ class EventsWorkerStore(SQLBaseStore):
async def get_ex_outlier_stream_rows(
self, instance_name: str, last_id: int, current_id: int
- ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+ ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns de-outliered events, for the Events replication stream
Args:
@@ -1547,11 +1549,14 @@ class EventsWorkerStore(SQLBaseStore):
def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
- ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
+ ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
- " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
+ " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
+ " e.outlier"
" FROM events AS e"
+ # NB: the next line (inner join) is what makes this query different from
+ # get_all_new_forward_event_rows.
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
@@ -1566,7 +1571,8 @@ class EventsWorkerStore(SQLBaseStore):
txn.execute(sql, (last_id, current_id, instance_name))
return cast(
- List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
+ List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
+ txn.fetchall(),
)
return await self.db_pool.runInteraction(
|