summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2023-01-22 21:10:11 +0000
committerGitHub <noreply@github.com>2023-01-22 21:10:11 +0000
commit22cc93afe38d34c859d8863a99996e7e72ca1733 (patch)
treedde310a5866bd5a765c687f7e7bed11888504f78 /synapse/storage/databases/main
parentFaster joins: Fix incompatibility with restricted joins (#14882) (diff)
downloadsynapse-22cc93afe38d34c859d8863a99996e7e72ca1733.tar.xz
Enable Faster Remote Room Joins against worker-mode Synapse. (#14752)
* Enable Complement tests for Faster Remote Room Joins on worker-mode

* (dangerous) Add an override to allow Complement to use FRRJ under workers

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>

* Fix race where we didn't send out replication notification

* MORE HACKS

* Fix get_un_partial_stated_rooms_token to take instance_name

* Fix bad merge

* Remove warning

* Correctly advance un_partial_stated_room_stream

* Fix merge

* Add another notify_replication

* Fixups

* Create a separate ReplicationNotifier

* Fix test

* Fix portdb

* Create a separate ReplicationNotifier

* Fix test

* Fix portdb

* Fix presence test

* Newsfile

* Apply suggestions from code review

* Update changelog.d/14752.misc

Co-authored-by: Erik Johnston <erik@matrix.org>

* lint

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
Co-authored-by: Erik Johnston <erik@matrix.org>
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events_worker.py13
-rw-r--r--synapse/storage/databases/main/room.py19
-rw-r--r--synapse/storage/databases/main/state.py2
3 files changed, 22 insertions, 12 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d8a8bcafb6..24127d0364 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -322,11 +322,12 @@ class EventsWorkerStore(SQLBaseStore):
                 "stream_id",
             )
 
-    def get_un_partial_stated_events_token(self) -> int:
-        # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
-        #     writers because workers that don't write often will hold all
-        #     readers up.
-        return self._un_partial_stated_events_stream_id_gen.get_current_token()
+    def get_un_partial_stated_events_token(self, instance_name: str) -> int:
+        return (
+            self._un_partial_stated_events_stream_id_gen.get_current_token_for_writer(
+                instance_name
+            )
+        )
 
     async def get_un_partial_stated_events_from_stream(
         self, instance_name: str, last_id: int, current_id: int, limit: int
@@ -416,6 +417,8 @@ class EventsWorkerStore(SQLBaseStore):
             self._stream_id_gen.advance(instance_name, token)
         elif stream_name == BackfillStream.NAME:
             self._backfill_id_gen.advance(instance_name, -token)
+        elif stream_name == UnPartialStatedEventStream.NAME:
+            self._un_partial_stated_events_stream_id_gen.advance(instance_name, token)
         super().process_replication_position(stream_name, instance_name, token)
 
     async def have_censored_event(self, event_id: str) -> bool:
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 7264a33cd4..6a65b2a89b 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -43,6 +43,7 @@ from synapse.api.errors import StoreError
 from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.config.homeserver import HomeServerConfig
 from synapse.events import EventBase
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -144,6 +145,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
                 "stream_id",
             )
 
+    def process_replication_position(
+        self, stream_name: str, instance_name: str, token: int
+    ) -> None:
+        if stream_name == UnPartialStatedRoomStream.NAME:
+            self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
+        return super().process_replication_position(stream_name, instance_name, token)
+
     async def store_room(
         self,
         room_id: str,
@@ -1281,13 +1289,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         )
         return result["join_event_id"], result["device_lists_stream_id"]
 
-    def get_un_partial_stated_rooms_token(self) -> int:
-        # TODO(faster_joins, multiple writers): This is inappropriate if there
-        #     are multiple writers because workers that don't write often will
-        #     hold all readers up.
-        #     (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
-        #      explanation.)
-        return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
+    def get_un_partial_stated_rooms_token(self, instance_name: str) -> int:
+        return self._un_partial_stated_rooms_stream_id_gen.get_current_token_for_writer(
+            instance_name
+        )
 
     async def get_un_partial_stated_rooms_from_stream(
         self, instance_name: str, last_id: int, current_id: int, limit: int
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index f32cbb2dec..ba325d390b 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -95,6 +95,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             for row in rows:
                 assert isinstance(row, UnPartialStatedEventStreamRow)
                 self._get_state_group_for_event.invalidate((row.event_id,))
+                self.is_partial_state_event.invalidate((row.event_id,))
 
         super().process_replication_rows(stream_name, instance_name, token, rows)
 
@@ -485,6 +486,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 "rejection_status_changed": rejection_status_changed,
             },
         )
+        txn.call_after(self.hs.get_notifier().on_new_replication_data)
 
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):