diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/generic_worker.py | 7 | ||||
-rw-r--r-- | synapse/handlers/device.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 7 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/partial_state.py | 7 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 13 | ||||
-rw-r--r-- | synapse/storage/databases/main/room.py | 19 | ||||
-rw-r--r-- | synapse/storage/databases/main/state.py | 2 |
7 files changed, 30 insertions, 27 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 8108b1e98f..946f3a3807 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -282,13 +282,6 @@ def start(config_options: List[str]) -> None: "synapse.app.user_dir", ) - if config.experimental.faster_joins_enabled: - raise ConfigError( - "You have enabled the experimental `faster_joins` config option, but it is " - "not compatible with worker deployments yet. Please disable `faster_joins` " - "or run Synapse as a single process deployment instead." - ) - synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0640ea79a0..58180ae2fa 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -974,6 +974,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): self.federation = hs.get_federation_client() self.clock = hs.get_clock() self.device_handler = device_handler + self._notifier = hs.get_notifier() self._remote_edu_linearizer = Linearizer(name="remote_device_list") @@ -1054,6 +1055,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): user_id, device_id, ) + self._notifier.notify_replication() room_ids = await self.store.get_rooms_for_user(user_id) if not room_ids: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2123ace8a6..7620245e26 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1870,14 +1870,15 @@ class FederationHandler: logger.info("Clearing partial-state flag for %s", room_id) success = await self.store.clear_partial_state_room(room_id) + # Poke the notifier so that other workers see the write to + # the un-partial-stated rooms stream. + self._notifier.notify_replication() + if success: logger.info("State resync complete for %s", room_id) self._storage_controllers.state.notify_room_un_partial_stated( room_id ) - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() # TODO(faster_joins) update room stats and user directory? # https://github.com/matrix-org/synapse/issues/12814 diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index b5a2ae74b6..a8ce5ffd72 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -16,7 +16,6 @@ from typing import TYPE_CHECKING import attr from synapse.replication.tcp.streams import Stream -from synapse.replication.tcp.streams._base import current_token_without_instance if TYPE_CHECKING: from synapse.server import HomeServer @@ -42,8 +41,7 @@ class UnPartialStatedRoomStream(Stream): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - # TODO(faster_joins, multiple writers): we need to account for instance names - current_token_without_instance(store.get_un_partial_stated_rooms_token), + store.get_un_partial_stated_rooms_token, store.get_un_partial_stated_rooms_from_stream, ) @@ -70,7 +68,6 @@ class UnPartialStatedEventStream(Stream): store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - # TODO(faster_joins, multiple writers): we need to account for instance names - current_token_without_instance(store.get_un_partial_stated_events_token), + store.get_un_partial_stated_events_token, store.get_un_partial_stated_events_from_stream, ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d8a8bcafb6..24127d0364 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -322,11 +322,12 @@ class EventsWorkerStore(SQLBaseStore): "stream_id", ) - def get_un_partial_stated_events_token(self) -> int: - # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple - # writers because workers that don't write often will hold all - # readers up. - return self._un_partial_stated_events_stream_id_gen.get_current_token() + def get_un_partial_stated_events_token(self, instance_name: str) -> int: + return ( + self._un_partial_stated_events_stream_id_gen.get_current_token_for_writer( + instance_name + ) + ) async def get_un_partial_stated_events_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -416,6 +417,8 @@ class EventsWorkerStore(SQLBaseStore): self._stream_id_gen.advance(instance_name, token) elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) + elif stream_name == UnPartialStatedEventStream.NAME: + self._un_partial_stated_events_stream_id_gen.advance(instance_name, token) super().process_replication_position(stream_name, instance_name, token) async def have_censored_event(self, event_id: str) -> bool: diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 7264a33cd4..6a65b2a89b 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -43,6 +43,7 @@ from synapse.api.errors import StoreError from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.config.homeserver import HomeServerConfig from synapse.events import EventBase +from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -144,6 +145,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): "stream_id", ) + def process_replication_position( + self, stream_name: str, instance_name: str, token: int + ) -> None: + if stream_name == UnPartialStatedRoomStream.NAME: + self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token) + return super().process_replication_position(stream_name, instance_name, token) + async def store_room( self, room_id: str, @@ -1281,13 +1289,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): ) return result["join_event_id"], result["device_lists_stream_id"] - def get_un_partial_stated_rooms_token(self) -> int: - # TODO(faster_joins, multiple writers): This is inappropriate if there - # are multiple writers because workers that don't write often will - # hold all readers up. - # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an - # explanation.) - return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + def get_un_partial_stated_rooms_token(self, instance_name: str) -> int: + return self._un_partial_stated_rooms_stream_id_gen.get_current_token_for_writer( + instance_name + ) async def get_un_partial_stated_rooms_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index f32cbb2dec..ba325d390b 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -95,6 +95,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): for row in rows: assert isinstance(row, UnPartialStatedEventStreamRow) self._get_state_group_for_event.invalidate((row.event_id,)) + self.is_partial_state_event.invalidate((row.event_id,)) super().process_replication_rows(stream_name, instance_name, token, rows) @@ -485,6 +486,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): "rejection_status_changed": rejection_status_changed, }, ) + txn.call_after(self.hs.get_notifier().on_new_replication_data) class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): |