diff options
author | Erik Johnston <erikj@element.io> | 2024-05-30 14:03:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-30 14:03:49 +0100 |
commit | 5624c8b961ed6a8310a2c6723ae13e854721756b (patch) | |
tree | e45dfc271ed2ec540170de66ccc3534b98de1992 /synapse/storage | |
parent | Fix deduplicating of membership events to not create unused state groups. (#1... (diff) | |
download | synapse-5624c8b961ed6a8310a2c6723ae13e854721756b.tar.xz |
In sync wait for worker to catch up since token (#17215)
Otherwise things will get confused. An alternative would be to make sure that for lagging stream we don't return anything (and make sure the returned next_batch token doesn't go backwards). But that is a faff.
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/events.py | 7 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 11 |
2 files changed, 16 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index fd7167904d..f1bd85aa27 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -95,6 +95,10 @@ class DeltaState: to_insert: StateMap[str] no_longer_in_room: bool = False + def is_noop(self) -> bool: + """Whether this state delta is actually empty""" + return not self.to_delete and not self.to_insert and not self.no_longer_in_room + class PersistEventsStore: """Contains all the functions for writing events to the database. @@ -1017,6 +1021,9 @@ class PersistEventsStore: ) -> None: """Update the current state stored in the datatabase for the given room""" + if state_delta.is_noop(): + return + async with self._stream_id_gen.get_next() as stream_ordering: await self.db_pool.runInteraction( "update_current_state", diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 426df2a9d2..c06c44deb1 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -200,7 +200,11 @@ class EventsWorkerStore(SQLBaseStore): notifier=hs.get_replication_notifier(), stream_name="events", instance_name=hs.get_instance_name(), - tables=[("events", "instance_name", "stream_ordering")], + tables=[ + ("events", "instance_name", "stream_ordering"), + ("current_state_delta_stream", "instance_name", "stream_id"), + ("ex_outlier_stream", "instance_name", "event_stream_ordering"), + ], sequence_name="events_stream_seq", writers=hs.config.worker.writers.events, ) @@ -210,7 +214,10 @@ class EventsWorkerStore(SQLBaseStore): notifier=hs.get_replication_notifier(), stream_name="backfill", instance_name=hs.get_instance_name(), - tables=[("events", "instance_name", "stream_ordering")], + tables=[ + ("events", "instance_name", "stream_ordering"), + ("ex_outlier_stream", "instance_name", "event_stream_ordering"), + ], sequence_name="events_backfill_stream_seq", positive=False, writers=hs.config.worker.writers.events, |