| diff --git a/changelog.d/14752.misc b/changelog.d/14752.misc
new file mode 100644
index 0000000000..1f9675c53b
--- /dev/null
+++ b/changelog.d/14752.misc
@@ -0,0 +1 @@
+Enable Complement tests for Faster Remote Room Joins against worker-mode Synapse.
\ No newline at end of file
diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2
 index 281157846a..63acf86a46 100644
--- a/docker/complement/conf/workers-shared-extra.yaml.j2
+++ b/docker/complement/conf/workers-shared-extra.yaml.j2
@@ -94,10 +94,8 @@ allow_device_name_lookup_over_federation: true
 experimental_features:
   # Enable history backfilling support
   msc2716_enabled: true
-  {% if not workers_in_use %}
   # client-side support for partial state in /send_join responses
   faster_joins: true
-  {% endif %}
   # Enable support for polls
   msc3381_polls_enabled: true
   # Enable deleting device-specific notification settings stored in account data
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
 index a183653d52..e72d96fd16 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -190,7 +190,7 @@ fi
 
 extra_test_args=()
 
-test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,msc3930"
+test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,msc3930,faster_joins"
 
 # All environment variables starting with PASS_ will be shared.
 # (The prefix is stripped off before reaching the container.)
@@ -223,12 +223,9 @@ else
     export PASS_SYNAPSE_COMPLEMENT_DATABASE=sqlite
   fi
 
-  # We only test faster room joins on monoliths, because they are purposefully
-  # being developed without worker support to start with.
-  #
-  # The tests for importing historical messages (MSC2716) also only pass with monoliths,
-  # currently.
-  test_tags="$test_tags,faster_joins,msc2716"
+  # The tests for importing historical messages (MSC2716)
+  # only pass with monoliths, currently.
+  test_tags="$test_tags,msc2716"
 fi
 
 
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
 index 8108b1e98f..946f3a3807 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -282,13 +282,6 @@ def start(config_options: List[str]) -> None:
         "synapse.app.user_dir",
     )
 
-    if config.experimental.faster_joins_enabled:
-        raise ConfigError(
-            "You have enabled the experimental `faster_joins` config option, but it is "
-            "not compatible with worker deployments yet. Please disable `faster_joins` "
-            "or run Synapse as a single process deployment instead."
-        )
-
     synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts
     synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
 index 0640ea79a0..58180ae2fa 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -974,6 +974,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
         self.federation = hs.get_federation_client()
         self.clock = hs.get_clock()
         self.device_handler = device_handler
+        self._notifier = hs.get_notifier()
 
         self._remote_edu_linearizer = Linearizer(name="remote_device_list")
 
@@ -1054,6 +1055,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
                 user_id,
                 device_id,
             )
+            self._notifier.notify_replication()
 
         room_ids = await self.store.get_rooms_for_user(user_id)
         if not room_ids:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
 index 2123ace8a6..7620245e26 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1870,14 +1870,15 @@ class FederationHandler:
                     logger.info("Clearing partial-state flag for %s", room_id)
                     success = await self.store.clear_partial_state_room(room_id)
 
+                    # Poke the notifier so that other workers see the write to
+                    # the un-partial-stated rooms stream.
+                    self._notifier.notify_replication()
+
                 if success:
                     logger.info("State resync complete for %s", room_id)
                     self._storage_controllers.state.notify_room_un_partial_stated(
                         room_id
                     )
-                    # Poke the notifier so that other workers see the write to
-                    # the un-partial-stated rooms stream.
-                    self._notifier.notify_replication()
 
                     # TODO(faster_joins) update room stats and user directory?
                     #   https://github.com/matrix-org/synapse/issues/12814
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
 index b5a2ae74b6..a8ce5ffd72 100644
--- a/synapse/replication/tcp/streams/partial_state.py
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -16,7 +16,6 @@ from typing import TYPE_CHECKING
 import attr
 
 from synapse.replication.tcp.streams import Stream
-from synapse.replication.tcp.streams._base import current_token_without_instance
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -42,8 +41,7 @@ class UnPartialStatedRoomStream(Stream):
         store = hs.get_datastores().main
         super().__init__(
             hs.get_instance_name(),
-            # TODO(faster_joins, multiple writers): we need to account for instance names
-            current_token_without_instance(store.get_un_partial_stated_rooms_token),
+            store.get_un_partial_stated_rooms_token,
             store.get_un_partial_stated_rooms_from_stream,
         )
 
@@ -70,7 +68,6 @@ class UnPartialStatedEventStream(Stream):
         store = hs.get_datastores().main
         super().__init__(
             hs.get_instance_name(),
-            # TODO(faster_joins, multiple writers): we need to account for instance names
-            current_token_without_instance(store.get_un_partial_stated_events_token),
+            store.get_un_partial_stated_events_token,
             store.get_un_partial_stated_events_from_stream,
         )
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
 index d8a8bcafb6..24127d0364 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -322,11 +322,12 @@ class EventsWorkerStore(SQLBaseStore):
                 "stream_id",
             )
 
-    def get_un_partial_stated_events_token(self) -> int:
-        # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
-        #     writers because workers that don't write often will hold all
-        #     readers up.
-        return self._un_partial_stated_events_stream_id_gen.get_current_token()
+    def get_un_partial_stated_events_token(self, instance_name: str) -> int:
+        return (
+            self._un_partial_stated_events_stream_id_gen.get_current_token_for_writer(
+                instance_name
+            )
+        )
 
     async def get_un_partial_stated_events_from_stream(
         self, instance_name: str, last_id: int, current_id: int, limit: int
@@ -416,6 +417,8 @@ class EventsWorkerStore(SQLBaseStore):
             self._stream_id_gen.advance(instance_name, token)
         elif stream_name == BackfillStream.NAME:
             self._backfill_id_gen.advance(instance_name, -token)
+        elif stream_name == UnPartialStatedEventStream.NAME:
+            self._un_partial_stated_events_stream_id_gen.advance(instance_name, token)
         super().process_replication_position(stream_name, instance_name, token)
 
     async def have_censored_event(self, event_id: str) -> bool:
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
 index 7264a33cd4..6a65b2a89b 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -43,6 +43,7 @@ from synapse.api.errors import StoreError
 from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.config.homeserver import HomeServerConfig
 from synapse.events import EventBase
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -144,6 +145,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
                 "stream_id",
             )
 
+    def process_replication_position(
+        self, stream_name: str, instance_name: str, token: int
+    ) -> None:
+        if stream_name == UnPartialStatedRoomStream.NAME:
+            self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
+        return super().process_replication_position(stream_name, instance_name, token)
+
     async def store_room(
         self,
         room_id: str,
@@ -1281,13 +1289,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         )
         return result["join_event_id"], result["device_lists_stream_id"]
 
-    def get_un_partial_stated_rooms_token(self) -> int:
-        # TODO(faster_joins, multiple writers): This is inappropriate if there
-        #     are multiple writers because workers that don't write often will
-        #     hold all readers up.
-        #     (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
-        #      explanation.)
-        return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
+    def get_un_partial_stated_rooms_token(self, instance_name: str) -> int:
+        return self._un_partial_stated_rooms_stream_id_gen.get_current_token_for_writer(
+            instance_name
+        )
 
     async def get_un_partial_stated_rooms_from_stream(
         self, instance_name: str, last_id: int, current_id: int, limit: int
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
 index f32cbb2dec..ba325d390b 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -95,6 +95,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             for row in rows:
                 assert isinstance(row, UnPartialStatedEventStreamRow)
                 self._get_state_group_for_event.invalidate((row.event_id,))
+                self.is_partial_state_event.invalidate((row.event_id,))
 
         super().process_replication_rows(stream_name, instance_name, token, rows)
 
@@ -485,6 +486,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 "rejection_status_changed": rejection_status_changed,
             },
         )
+        txn.call_after(self.hs.get_notifier().on_new_replication_data)
 
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 |