summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-30 14:03:49 +0100
committerGitHub <noreply@github.com>2024-05-30 14:03:49 +0100
commit5624c8b961ed6a8310a2c6723ae13e854721756b (patch)
treee45dfc271ed2ec540170de66ccc3534b98de1992 /synapse/storage
parentFix deduplicating of membership events to not create unused state groups. (#1... (diff)
downloadsynapse-5624c8b961ed6a8310a2c6723ae13e854721756b.tar.xz
In sync wait for worker to catch up since token (#17215)
Otherwise things will get confused.

An alternative would be to make sure that for lagging stream we don't
return anything (and make sure the returned next_batch token doesn't go
backwards). But that is a faff.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events.py7
-rw-r--r--synapse/storage/databases/main/events_worker.py11
2 files changed, 16 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fd7167904d..f1bd85aa27 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -95,6 +95,10 @@ class DeltaState:
     to_insert: StateMap[str]
     no_longer_in_room: bool = False
 
+    def is_noop(self) -> bool:
+        """Whether this state delta is actually empty"""
+        return not self.to_delete and not self.to_insert and not self.no_longer_in_room
+
 
 class PersistEventsStore:
     """Contains all the functions for writing events to the database.
@@ -1017,6 +1021,9 @@ class PersistEventsStore:
     ) -> None:
         """Update the current state stored in the datatabase for the given room"""
 
+        if state_delta.is_noop():
+            return
+
         async with self._stream_id_gen.get_next() as stream_ordering:
             await self.db_pool.runInteraction(
                 "update_current_state",
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 426df2a9d2..c06c44deb1 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -200,7 +200,11 @@ class EventsWorkerStore(SQLBaseStore):
             notifier=hs.get_replication_notifier(),
             stream_name="events",
             instance_name=hs.get_instance_name(),
-            tables=[("events", "instance_name", "stream_ordering")],
+            tables=[
+                ("events", "instance_name", "stream_ordering"),
+                ("current_state_delta_stream", "instance_name", "stream_id"),
+                ("ex_outlier_stream", "instance_name", "event_stream_ordering"),
+            ],
             sequence_name="events_stream_seq",
             writers=hs.config.worker.writers.events,
         )
@@ -210,7 +214,10 @@ class EventsWorkerStore(SQLBaseStore):
             notifier=hs.get_replication_notifier(),
             stream_name="backfill",
             instance_name=hs.get_instance_name(),
-            tables=[("events", "instance_name", "stream_ordering")],
+            tables=[
+                ("events", "instance_name", "stream_ordering"),
+                ("ex_outlier_stream", "instance_name", "event_stream_ordering"),
+            ],
             sequence_name="events_backfill_stream_seq",
             positive=False,
             writers=hs.config.worker.writers.events,