diff options
author | David Robertson <davidr@element.io> | 2022-06-29 18:25:54 +0100 |
---|---|---|
committer | David Robertson <davidr@element.io> | 2022-07-04 19:10:14 +0100 |
commit | ae788ca796f208b2b2a7999e1082707b7816a3d4 (patch) | |
tree | a904ac18f4a46608c2670122ee8f7e58d115645c | |
parent | Room member: drive-by-comment (diff) | |
download | synapse-ae788ca796f208b2b2a7999e1082707b7816a3d4.tar.xz |
Replication: include `outlier` in event rows
Warn about replication problem in upgrade notes
-rw-r--r-- | docs/upgrade.md | 12 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/events.py | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 22 |
3 files changed, 27 insertions, 8 deletions
diff --git a/docs/upgrade.md b/docs/upgrade.md index 312f0b87fe..d5753ae6f6 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -89,6 +89,18 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.63.0 + +## Changes to the event replication streams + +Synapse now includes a flag indicating if an event is an outlier when +replicating it to other workers. This is a forwards- and backwards-incompatible +change: v1.62 and workers cannot process events replicated by v1.63 workers, and +vice versa. + +Once all workers are upgraded to v1.63 (or downgraded to v1.62), event +replication will resume as normal. + # Upgrading to v1.62.0 ## New signatures for spam checker callbacks diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 26f4fa7cfd..14b6705862 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow): relates_to: Optional[str] membership: Optional[str] rejected: bool + outlier: bool @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b99b107784..fc874c1e6c 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1465,7 +1465,7 @@ class EventsWorkerStore(SQLBaseStore): async def get_all_new_forward_event_rows( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: """Returns new events, for the Events replication stream Args: @@ -1481,10 +1481,11 @@ class EventsWorkerStore(SQLBaseStore): def get_all_new_forward_event_rows( txn: LoggingTransaction, - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL," + " e.outlier" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events AS se USING (event_id)" @@ -1498,7 +1499,8 @@ class EventsWorkerStore(SQLBaseStore): ) txn.execute(sql, (last_id, current_id, instance_name, limit)) return cast( - List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall() + List[Tuple[int, str, str, str, str, str, str, str, bool, bool]], + txn.fetchall(), ) return await self.db_pool.runInteraction( @@ -1507,7 +1509,7 @@ class EventsWorkerStore(SQLBaseStore): async def get_ex_outlier_stream_rows( self, instance_name: str, last_id: int, current_id: int - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: """Returns de-outliered events, for the Events replication stream Args: @@ -1522,11 +1524,14 @@ class EventsWorkerStore(SQLBaseStore): def get_ex_outlier_stream_rows_txn( txn: LoggingTransaction, - ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: + ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL," + " e.outlier" " FROM events AS e" + # NB: the next line (inner join) is what makes this query different from + # get_all_new_forward_event_rows. " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events AS se USING (event_id)" @@ -1541,7 +1546,8 @@ class EventsWorkerStore(SQLBaseStore): txn.execute(sql, (last_id, current_id, instance_name)) return cast( - List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall() + List[Tuple[int, str, str, str, str, str, str, str, bool, bool]], + txn.fetchall(), ) return await self.db_pool.runInteraction( |