summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-12-19 14:57:51 +0000
committerGitHub <noreply@github.com>2022-12-19 14:57:51 +0000
commit2888d7ec83b33b3ce848d9219c921ffe0b88ffbf (patch)
tree28a37ca0169578045e5735ffdf47db83c5ca9265 /synapse/storage/databases/main
parentBump sentry-sdk from 1.11.1 to 1.12.0 (#14701) (diff)
downloadsynapse-2888d7ec83b33b3ce848d9219c921ffe0b88ffbf.tar.xz
Faster remote room joins: invalidate caches and unblock requests when receiving un-partial-stated event notifications over replication. [rei:frrj/streams/unpsr] (#14546)
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events_worker.py27
-rw-r--r--synapse/storage/databases/main/state.py18
2 files changed, 32 insertions, 13 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e19b16064b..761b15a815 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -59,8 +59,9 @@ from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
 )
-from synapse.replication.tcp.streams import BackfillStream
+from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream
 from synapse.replication.tcp.streams.events import EventsStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -391,6 +392,16 @@ class EventsWorkerStore(SQLBaseStore):
             self._stream_id_gen.advance(instance_name, token)
         elif stream_name == BackfillStream.NAME:
             self._backfill_id_gen.advance(instance_name, -token)
+        elif stream_name == UnPartialStatedEventStream.NAME:
+            for row in rows:
+                assert isinstance(row, UnPartialStatedEventStreamRow)
+
+                self.is_partial_state_event.invalidate((row.event_id,))
+
+                if row.rejection_status_changed:
+                    # If the partial-stated event became rejected or unrejected
+                    # when it wasn't before, we need to invalidate this cache.
+                    self._invalidate_local_get_event_cache(row.event_id)
 
         super().process_replication_rows(stream_name, instance_name, token, rows)
 
@@ -2380,6 +2391,9 @@ class EventsWorkerStore(SQLBaseStore):
 
         This can happen, for example, when resyncing state during a faster join.
 
+        It is the caller's responsibility to ensure that other workers are
+        sent a notification so that they call `_invalidate_local_get_event_cache()`.
+
         Args:
             txn:
             event_id: ID of event to update
@@ -2418,14 +2432,3 @@ class EventsWorkerStore(SQLBaseStore):
         )
 
         self.invalidate_get_event_cache_after_txn(txn, event_id)
-
-        # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
-        #   call '_send_invalidation_to_replication', but we actually need the other
-        #   end to call _invalidate_local_get_event_cache() rather than (just)
-        #   _get_event_cache.invalidate().
-        #
-        #   One solution might be to (somehow) get the workers to call
-        #   _invalidate_caches_for_event() (though that will invalidate more than
-        #   strictly necessary).
-        #
-        #   https://github.com/matrix-org/synapse/issues/12994
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index f855903c39..f32cbb2dec 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 import collections.abc
 import logging
-from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple
 
 import attr
 
@@ -24,6 +24,8 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.logging.opentracing import trace
+from synapse.replication.tcp.streams import UnPartialStatedEventStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -82,6 +84,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         super().__init__(database, db_conn, hs)
         self._instance_name: str = hs.get_instance_name()
 
+    def process_replication_rows(
+        self,
+        stream_name: str,
+        instance_name: str,
+        token: int,
+        rows: Iterable[Any],
+    ) -> None:
+        if stream_name == UnPartialStatedEventStream.NAME:
+            for row in rows:
+                assert isinstance(row, UnPartialStatedEventStreamRow)
+                self._get_state_group_for_event.invalidate((row.event_id,))
+
+        super().process_replication_rows(stream_name, instance_name, token, rows)
+
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
         Raises: