summary refs log tree commit diff
diff options
context:
space:
mode:
authorJason Little <realtyem@gmail.com>2023-01-25 05:27:39 -0600
committerJason Little <realtyem@gmail.com>2023-01-25 05:27:39 -0600
commit02178ced595e7dd7af05dfeb2e54804bb551e903 (patch)
tree8634a2f6efd94ba5397b9fcb1e05dcfeeb3f31d4
parentInstall examples in start_for_complement.sh. (diff)
parentRequest partial joins by default (#14905) (diff)
downloadsynapse-02178ced595e7dd7af05dfeb2e54804bb551e903.tar.xz
Merge branch 'develop' into comp-worker-shorthand
-rw-r--r--changelog.d/14870.feature1
-rw-r--r--changelog.d/14873.bugfix1
-rw-r--r--changelog.d/14874.bugfix1
-rw-r--r--changelog.d/14896.misc1
-rw-r--r--changelog.d/14897.misc1
-rw-r--r--changelog.d/14899.misc1
-rw-r--r--changelog.d/14900.misc1
-rw-r--r--changelog.d/14901.misc1
-rw-r--r--changelog.d/14905.feature1
-rw-r--r--docs/upgrade.md13
-rw-r--r--poetry.lock60
-rw-r--r--pyproject.toml2
-rw-r--r--synapse/config/experimental.py2
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/handlers/sync.py65
-rw-r--r--synapse/notifier.py26
-rw-r--r--synapse/replication/tcp/client.py7
-rw-r--r--synapse/storage/controllers/state.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py6
-rw-r--r--synapse/storage/databases/main/relations.py1
-rw-r--r--synapse/storage/databases/main/room.py47
-rw-r--r--synapse/storage/databases/main/roommember.py19
-rw-r--r--synapse/storage/databases/main/stats.py13
-rw-r--r--synapse/storage/databases/main/stream.py40
-rw-r--r--synapse/streams/events.py6
-rw-r--r--synapse/types/__init__.py15
-rw-r--r--tests/rest/admin/test_room.py4
-rw-r--r--tests/rest/client/test_rooms.py10
-rw-r--r--tests/rest/client/test_sync.py4
-rw-r--r--tests/storage/databases/main/test_room.py88
30 files changed, 334 insertions, 119 deletions
diff --git a/changelog.d/14870.feature b/changelog.d/14870.feature
new file mode 100644
index 0000000000..44f701d1c9
--- /dev/null
+++ b/changelog.d/14870.feature
@@ -0,0 +1 @@
+Faster joins: allow non-lazy-loading ("eager") syncs to complete after a partial join by omitting partial state rooms until they become fully stated.
\ No newline at end of file
diff --git a/changelog.d/14873.bugfix b/changelog.d/14873.bugfix
new file mode 100644
index 0000000000..9b058576cd
--- /dev/null
+++ b/changelog.d/14873.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where the `populate_room_stats` background job could fail on broken rooms.
diff --git a/changelog.d/14874.bugfix b/changelog.d/14874.bugfix
new file mode 100644
index 0000000000..91ae2ea9bd
--- /dev/null
+++ b/changelog.d/14874.bugfix
@@ -0,0 +1 @@
+Faster joins: Fix a bug in worker deployments where the room stats and user directory would not get updated when finishing a fast join until another event is sent or received.
diff --git a/changelog.d/14896.misc b/changelog.d/14896.misc
new file mode 100644
index 0000000000..4f8a6c3f17
--- /dev/null
+++ b/changelog.d/14896.misc
@@ -0,0 +1 @@
+Bump types-opentracing from 2.4.10 to 2.4.10.1.
diff --git a/changelog.d/14897.misc b/changelog.d/14897.misc
new file mode 100644
index 0000000000..d192fa1c2f
--- /dev/null
+++ b/changelog.d/14897.misc
@@ -0,0 +1 @@
+Bump ruff from 0.0.224 to 0.0.230.
diff --git a/changelog.d/14899.misc b/changelog.d/14899.misc
new file mode 100644
index 0000000000..f1ca951ec0
--- /dev/null
+++ b/changelog.d/14899.misc
@@ -0,0 +1 @@
+Bump types-requests from 2.28.11.7 to 2.28.11.8.
diff --git a/changelog.d/14900.misc b/changelog.d/14900.misc
new file mode 100644
index 0000000000..69d6edb907
--- /dev/null
+++ b/changelog.d/14900.misc
@@ -0,0 +1 @@
+Bump types-psycopg2 from 2.9.21.2 to 2.9.21.4.
diff --git a/changelog.d/14901.misc b/changelog.d/14901.misc
new file mode 100644
index 0000000000..21ccec0063
--- /dev/null
+++ b/changelog.d/14901.misc
@@ -0,0 +1 @@
+Bump types-commonmark from 0.9.2 to 0.9.2.1.
diff --git a/changelog.d/14905.feature b/changelog.d/14905.feature
new file mode 100644
index 0000000000..f13a4af981
--- /dev/null
+++ b/changelog.d/14905.feature
@@ -0,0 +1 @@
+Faster joins: request partial joins by default. Admins can opt-out of this for the time being---see the upgrade notes.
diff --git a/docs/upgrade.md b/docs/upgrade.md
index 0d486a3c82..6316db563b 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -90,6 +90,19 @@ process, for example:
 
 # Upgrading to v1.76.0
 
+## Faster joins are enabled by default
+
+When joining a room for the first time, Synapse 1.76.0rc1 will request a partial join from the other server by default. Previously, server admins had to opt-in to this using an experimental config flag.
+
+Server admins can opt out of this feature for the time being by setting
+
+```yaml
+experimental:
+    faster_joins: false
+```
+
+in their server config.
+
 ## Changes to the account data replication streams
 
 Synapse has changed the format of the account data and devices replication
diff --git a/poetry.lock b/poetry.lock
index 178e3787f7..17a6645b55 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1906,28 +1906,28 @@ jupyter = ["ipywidgets (>=7.5.1,<8.0.0)"]
 
 [[package]]
 name = "ruff"
-version = "0.0.224"
+version = "0.0.230"
 description = "An extremely fast Python linter, written in Rust."
 category = "dev"
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "ruff-0.0.224-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:015277c45716733e99a19267fd244870bff2b619d4814065fe5cded5ab139b92"},
-    {file = "ruff-0.0.224-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:ca8211b316fa2df70d90e38819862d58e4aec87643c2c343ba5102a7ff5d3221"},
-    {file = "ruff-0.0.224-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:637a502a37da3aac9832a0dd21255376b0320cf66e3d420d11000e09deb3e682"},
-    {file = "ruff-0.0.224-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a85fe53b8193c3e9f7ca9bef7dfd3bcd079a86542a14b66f352ce0316de5c457"},
-    {file = "ruff-0.0.224-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:367c74a9ff9da165df7dc1e5e1fae5c4cda05cb94202bc9a6e5836243bc625c1"},
-    {file = "ruff-0.0.224-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:2d230fc497abdeb3b54d2808ba59802962b50e0611b558337d4c07e6d490ed6c"},
-    {file = "ruff-0.0.224-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3df2bcb525fec6c2d551f7ab9843e42e8e4fa33586479953445ef64cbe6d6352"},
-    {file = "ruff-0.0.224-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:30e494a23c2f23a07adbd4a9df526057a8cdb4c88536bbc513b5a73385c3d5a7"},
-    {file = "ruff-0.0.224-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eac5836f89f1388b7bb718a5c77cdd13e356737573d29581a18d7f575e42124c"},
-    {file = "ruff-0.0.224-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:d791073cfd40c8e697d4c79faa67e2ad54dc960854bfa0c0cba61ef4bb02d3b1"},
-    {file = "ruff-0.0.224-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ef72a081dfe24bfb8aa0568e0f1ee0174fffbc6ebb0ae2b8b4cd3f9457cc867b"},
-    {file = "ruff-0.0.224-py3-none-musllinux_1_2_i686.whl", hash = "sha256:666ecfcf0019f5b0e72e0eba7a7330760b680ba0fb6413b139a594b117e612db"},
-    {file = "ruff-0.0.224-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:815d5d448e2bf5107340d6f47cbddac74186cb365c56bdcc2315fbcf59ebc466"},
-    {file = "ruff-0.0.224-py3-none-win32.whl", hash = "sha256:17d98c1f03e98c15d3f2c49e0ffdedc57b221217c4e3d7b6f732893101083240"},
-    {file = "ruff-0.0.224-py3-none-win_amd64.whl", hash = "sha256:3db4fe992cea69405061e09974c3955b750611b1e76161471c27cd2e8ccffa05"},
-    {file = "ruff-0.0.224.tar.gz", hash = "sha256:3b07c2e8da29605a8577b1aef90f8ca0c34a66683b77b06007f1970bc0689003"},
+    {file = "ruff-0.0.230-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:fcc31d02cebda0a85e2e13a44642aea7f84362cb4f589e2f6b864e3928e4a7db"},
+    {file = "ruff-0.0.230-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:45a7f2c7155d520b8ca255a01235763d5c25fd5e7af055e50a78c6d91ece0ced"},
+    {file = "ruff-0.0.230-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4eca8b185ab56cac67acc23287c3c8c62a0c0ffadc0787a3bef3a6e77eaed82f"},
+    {file = "ruff-0.0.230-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ec2bcdb5040efd8082a3a98369eec4bdc5fd05f53cc6714cb2b725d557d4abe8"},
+    {file = "ruff-0.0.230-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:26571aee2b93b60e47e44478f72a9787b387f752e85b85f176739bd91b27cfd1"},
+    {file = "ruff-0.0.230-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4b69c9883c3e264f8bb2d52bdabb88b8d9672750ea05f33e0ff52532824bd5c5"},
+    {file = "ruff-0.0.230-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2b3dc88b83f200378a9b9c91036989f0285a10759514c42235ce02e5824ac8d0"},
+    {file = "ruff-0.0.230-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:767716f008dd3a40ec2318396f648fda437c6968087a4526cde5879e382cf477"},
+    {file = "ruff-0.0.230-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ac27a0f9b96d9923cef7d911790a21a19b51aec0f08375ccc47ad735b1054d78"},
+    {file = "ruff-0.0.230-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:729dfc7b7ad4f7d8761dc60c58f15372d6f5c2dd9b6c5952524f2bc3aec7de6a"},
+    {file = "ruff-0.0.230-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ad086cf2e5fef274687121f673f0f9b60c8981ec07c2bb0448c459cbaef81bcb"},
+    {file = "ruff-0.0.230-py3-none-musllinux_1_2_i686.whl", hash = "sha256:4feaed0978c24687133cd11c7380de20aa841f893e24430c735cc6c3faba4837"},
+    {file = "ruff-0.0.230-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1d1046d0d43a0f24b2e9e61d76bb201b486ad02e9787d3432af43bd7d16f2c2e"},
+    {file = "ruff-0.0.230-py3-none-win32.whl", hash = "sha256:4d627911c9ba57bcd2f2776f1c09a10d334db163cb5be8c892e7ec7b59ccf58c"},
+    {file = "ruff-0.0.230-py3-none-win_amd64.whl", hash = "sha256:27fd4891a1d0642f5b2038ebf86f8169bc3d466964bdfaa0ce2a65149bc7cced"},
+    {file = "ruff-0.0.230.tar.gz", hash = "sha256:a049f93af1057ac450e8c09559d44e371eda1c151b1b863c0013a1066fefddb0"},
 ]
 
 [[package]]
@@ -2494,14 +2494,14 @@ files = [
 
 [[package]]
 name = "types-commonmark"
-version = "0.9.2"
+version = "0.9.2.1"
 description = "Typing stubs for commonmark"
 category = "dev"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-commonmark-0.9.2.tar.gz", hash = "sha256:b894b67750c52fd5abc9a40a9ceb9da4652a391d75c1b480bba9cef90f19fc86"},
-    {file = "types_commonmark-0.9.2-py3-none-any.whl", hash = "sha256:56f20199a1f9a2924443211a0ef97f8b15a8a956a7f4e9186be6950bf38d6d02"},
+    {file = "types-commonmark-0.9.2.1.tar.gz", hash = "sha256:db8277e6aeb83429265eccece98a24954a9a502dde7bc7cf840a8741abd96b86"},
+    {file = "types_commonmark-0.9.2.1-py3-none-any.whl", hash = "sha256:9d5f500cb7eced801bde728137b0a10667bd853d328db641d03141f189e3aab4"},
 ]
 
 [[package]]
@@ -2570,14 +2570,14 @@ files = [
 
 [[package]]
 name = "types-opentracing"
-version = "2.4.10"
+version = "2.4.10.1"
 description = "Typing stubs for opentracing"
 category = "dev"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-opentracing-2.4.10.tar.gz", hash = "sha256:6101414f3b6d3b9c10f1c510a261e8439b6c8d67c723d5c2872084697b4580a7"},
-    {file = "types_opentracing-2.4.10-py3-none-any.whl", hash = "sha256:66d9cfbbdc4a6f8ca8189a15ad26f0fe41cee84c07057759c5d194e2505b84c2"},
+    {file = "types-opentracing-2.4.10.1.tar.gz", hash = "sha256:49e7e52b8b6e221865a9201fc8c2df0bcda8e7098d4ebb35903dbfa4b4d29195"},
+    {file = "types_opentracing-2.4.10.1-py3-none-any.whl", hash = "sha256:eb63394acd793e7d9e327956242349fee14580a87c025408dc268d4dd883cc24"},
 ]
 
 [[package]]
@@ -2594,14 +2594,14 @@ files = [
 
 [[package]]
 name = "types-psycopg2"
-version = "2.9.21.2"
+version = "2.9.21.4"
 description = "Typing stubs for psycopg2"
 category = "dev"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-psycopg2-2.9.21.2.tar.gz", hash = "sha256:bff045579642ce00b4a3c8f2e401b7f96dfaa34939f10be64b0dd3b53feca57d"},
-    {file = "types_psycopg2-2.9.21.2-py3-none-any.whl", hash = "sha256:084558d6bc4b2cfa249b06be0fdd9a14a69d307bae5bb5809a2f14cfbaa7a23f"},
+    {file = "types-psycopg2-2.9.21.4.tar.gz", hash = "sha256:d43dda166a70d073ddac40718e06539836b5844c99b58ef8d4489a8df2edf5c0"},
+    {file = "types_psycopg2-2.9.21.4-py3-none-any.whl", hash = "sha256:6a05dca0856996aa37d7abe436751803bf47ec006cabbefea092e057f23bc95d"},
 ]
 
 [[package]]
@@ -2633,14 +2633,14 @@ files = [
 
 [[package]]
 name = "types-requests"
-version = "2.28.11.7"
+version = "2.28.11.8"
 description = "Typing stubs for requests"
 category = "dev"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-requests-2.28.11.7.tar.gz", hash = "sha256:0ae38633734990d019b80f5463dfa164ebd3581998ac8435f526da6fe4d598c3"},
-    {file = "types_requests-2.28.11.7-py3-none-any.whl", hash = "sha256:b6a2fca8109f4fdba33052f11ed86102bddb2338519e1827387137fefc66a98b"},
+    {file = "types-requests-2.28.11.8.tar.gz", hash = "sha256:e67424525f84adfbeab7268a159d3c633862dafae15c5b19547ce1b55954f0a3"},
+    {file = "types_requests-2.28.11.8-py3-none-any.whl", hash = "sha256:61960554baca0008ae7e2db2bd3b322ca9a144d3e80ce270f5fb640817e40994"},
 ]
 
 [package.dependencies]
@@ -2964,4 +2964,4 @@ user-search = ["pyicu"]
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.7.1"
-content-hash = "38867861f77c6faca817487efd02fbb7271b424d56744ad9ad248cd1dd297566"
+content-hash = "2673ef0530a42dae1df998bacfcaf88a563529b39461003a980743a97f02996f"
diff --git a/pyproject.toml b/pyproject.toml
index d54dde4a2f..400eec6ac2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -309,7 +309,7 @@ all = [
 # We pin black so that our tests don't start failing on new releases.
 isort = ">=5.10.1"
 black = ">=22.3.0"
-ruff = "0.0.224"
+ruff = "0.0.230"
 
 # Typechecking
 mypy = "*"
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 89586db763..2590c88cde 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -84,7 +84,7 @@ class ExperimentalConfig(Config):
         # experimental support for faster joins over federation
         # (MSC2775, MSC3706, MSC3895)
         # requires a target server that can provide a partial join response (MSC3706)
-        self.faster_joins_enabled: bool = experimental.get("faster_joins", False)
+        self.faster_joins_enabled: bool = experimental.get("faster_joins", True)
 
         # MSC3720 (Account status endpoint)
         self.msc3720_enabled: bool = experimental.get("msc3720_enabled", False)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7620245e26..233f8c113d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1868,21 +1868,17 @@ class FederationHandler:
 
                 async with self._is_partial_state_room_linearizer.queue(room_id):
                     logger.info("Clearing partial-state flag for %s", room_id)
-                    success = await self.store.clear_partial_state_room(room_id)
+                    new_stream_id = await self.store.clear_partial_state_room(room_id)
 
-                    # Poke the notifier so that other workers see the write to
-                    # the un-partial-stated rooms stream.
-                    self._notifier.notify_replication()
-
-                if success:
+                if new_stream_id is not None:
                     logger.info("State resync complete for %s", room_id)
                     self._storage_controllers.state.notify_room_un_partial_stated(
                         room_id
                     )
 
-                    # TODO(faster_joins) update room stats and user directory?
-                    #   https://github.com/matrix-org/synapse/issues/12814
-                    #   https://github.com/matrix-org/synapse/issues/12815
+                    await self._notifier.on_un_partial_stated_room(
+                        room_id, new_stream_id
+                    )
                     return
 
                 # we raced against more events arriving with partial state. Go round
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 78d488f2b1..ee11764567 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -290,7 +290,7 @@ class SyncHandler:
             expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
         )
 
-        self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
+        self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
     async def wait_for_sync_for_user(
         self,
@@ -1340,7 +1340,10 @@ class SyncHandler:
         membership_change_events = []
         if since_token:
             membership_change_events = await self.store.get_membership_changes_for_user(
-                user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+                user_id,
+                since_token.room_key,
+                now_token.room_key,
+                self.rooms_to_exclude_globally,
             )
 
             mem_last_change_by_room_id: Dict[str, EventBase] = {}
@@ -1375,12 +1378,39 @@ class SyncHandler:
                 else:
                     mutable_joined_room_ids.discard(room_id)
 
+        # Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
+        mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
+        if not sync_config.filter_collection.lazy_load_members():
+            # Non-lazy syncs should never include partially stated rooms.
+            # Exclude all partially stated rooms from this sync.
+            for room_id in mutable_joined_room_ids:
+                if await self.store.is_partial_state_room(room_id):
+                    mutable_rooms_to_exclude.add(room_id)
+
+        # Incremental eager syncs should additionally include rooms that
+        # - we are joined to
+        # - are full-stated
+        # - became fully-stated at some point during the sync period
+        #   (These rooms will have been omitted during a previous eager sync.)
+        forced_newly_joined_room_ids = set()
+        if since_token and not sync_config.filter_collection.lazy_load_members():
+            un_partial_stated_rooms = (
+                await self.store.get_un_partial_stated_rooms_between(
+                    since_token.un_partial_stated_rooms_key,
+                    now_token.un_partial_stated_rooms_key,
+                    mutable_joined_room_ids,
+                )
+            )
+            for room_id in un_partial_stated_rooms:
+                if not await self.store.is_partial_state_room(room_id):
+                    forced_newly_joined_room_ids.add(room_id)
+
         # Now we have our list of joined room IDs, exclude as configured and freeze
         joined_room_ids = frozenset(
             (
                 room_id
                 for room_id in mutable_joined_room_ids
-                if room_id not in self.rooms_to_exclude
+                if room_id not in mutable_rooms_to_exclude
             )
         )
 
@@ -1397,6 +1427,8 @@ class SyncHandler:
             since_token=since_token,
             now_token=now_token,
             joined_room_ids=joined_room_ids,
+            excluded_room_ids=frozenset(mutable_rooms_to_exclude),
+            forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
             membership_change_events=membership_change_events,
         )
 
@@ -1834,14 +1866,16 @@ class SyncHandler:
         # 3. Work out which rooms need reporting in the sync response.
         ignored_users = await self.store.ignored_users(user_id)
         if since_token:
-            room_changes = await self._get_rooms_changed(
+            room_changes = await self._get_room_changes_for_incremental_sync(
                 sync_result_builder, ignored_users
             )
             tags_by_room = await self.store.get_updated_tags(
                 user_id, since_token.account_data_key
             )
         else:
-            room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
+            room_changes = await self._get_room_changes_for_initial_sync(
+                sync_result_builder, ignored_users
+            )
             tags_by_room = await self.store.get_tags_for_user(user_id)
 
         log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1900,7 +1934,7 @@ class SyncHandler:
 
         assert since_token
 
-        if membership_change_events:
+        if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
             return True
 
         stream_id = since_token.room_key.stream
@@ -1909,7 +1943,7 @@ class SyncHandler:
                 return True
         return False
 
-    async def _get_rooms_changed(
+    async def _get_room_changes_for_incremental_sync(
         self,
         sync_result_builder: "SyncResultBuilder",
         ignored_users: FrozenSet[str],
@@ -1947,7 +1981,9 @@ class SyncHandler:
         for event in membership_change_events:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
-        newly_joined_rooms: List[str] = []
+        newly_joined_rooms: List[str] = list(
+            sync_result_builder.forced_newly_joined_room_ids
+        )
         newly_left_rooms: List[str] = []
         room_entries: List[RoomSyncResultBuilder] = []
         invited: List[InvitedSyncResult] = []
@@ -2153,7 +2189,7 @@ class SyncHandler:
             newly_left_rooms,
         )
 
-    async def _get_all_rooms(
+    async def _get_room_changes_for_initial_sync(
         self,
         sync_result_builder: "SyncResultBuilder",
         ignored_users: FrozenSet[str],
@@ -2178,7 +2214,7 @@ class SyncHandler:
         room_list = await self.store.get_rooms_for_local_user_where_membership_is(
             user_id=user_id,
             membership_list=Membership.LIST,
-            excluded_rooms=self.rooms_to_exclude,
+            excluded_rooms=sync_result_builder.excluded_room_ids,
         )
 
         room_entries = []
@@ -2549,6 +2585,13 @@ class SyncResultBuilder:
         since_token: The token supplied by user, or None.
         now_token: The token to sync up to.
         joined_room_ids: List of rooms the user is joined to
+        excluded_room_ids: Set of room ids we should omit from the /sync response.
+        forced_newly_joined_room_ids:
+            Rooms that should be presented in the /sync response as if they were
+            newly joined during the sync period, even if that's not the case.
+            (This is useful if the room was previously excluded from a /sync response,
+            and now the client should be made aware of it.)
+            Only used by incremental syncs.
 
         # The following mirror the fields in a sync response
         presence
@@ -2565,6 +2608,8 @@ class SyncResultBuilder:
     since_token: Optional[StreamToken]
     now_token: StreamToken
     joined_room_ids: FrozenSet[str]
+    excluded_room_ids: FrozenSet[str]
+    forced_newly_joined_room_ids: FrozenSet[str]
     membership_change_events: List[EventBase]
 
     presence: List[UserPresenceState] = attr.Factory(list)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 28f0d4a25a..2b0e52f23c 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -314,6 +314,32 @@ class Notifier:
             event_entries.append((entry, event.event_id))
         await self.notify_new_room_events(event_entries, max_room_stream_token)
 
+    async def on_un_partial_stated_room(
+        self,
+        room_id: str,
+        new_token: int,
+    ) -> None:
+        """Used by the resync background processes to wake up all listeners
+        of this room when it is un-partial-stated.
+
+        It will also notify replication listeners of the change in stream.
+        """
+
+        # Wake up all related user stream notifiers
+        user_streams = self.room_to_user_streams.get(room_id, set())
+        time_now_ms = self.clock.time_msec()
+        for user_stream in user_streams:
+            try:
+                user_stream.notify(
+                    StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms
+                )
+            except Exception:
+                logger.exception("Failed to notify listener")
+
+        # Poke the replication so that other workers also see the write to
+        # the un-partial-stated rooms stream.
+        self.notify_replication()
+
     async def notify_new_room_events(
         self,
         event_entries: List[Tuple[_PendingRoomEventEntry, str]],
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 493f616679..cc0528bd8e 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -207,6 +207,12 @@ class ReplicationDataHandler:
             # we don't need to optimise this for multiple rows.
             for row in rows:
                 if row.type != EventsStreamEventRow.TypeId:
+                    # The row's data is an `EventsStreamCurrentStateRow`.
+                    # When we recompute the current state of a room based on forward
+                    # extremities (see `update_current_state`), no new events are
+                    # persisted, so we must poke the replication callbacks ourselves.
+                    # This functionality is used when finishing up a partial state join.
+                    self.notifier.notify_replication()
                     continue
                 assert isinstance(row, EventsStreamRow)
                 assert isinstance(row.data, EventsStreamEventRow)
@@ -254,6 +260,7 @@ class ReplicationDataHandler:
                 self._state_storage_controller.notify_room_un_partial_stated(
                     row.room_id
                 )
+                await self.notifier.on_un_partial_stated_room(row.room_id, token)
         elif stream_name == UnPartialStatedEventStream.NAME:
             for row in rows:
                 assert isinstance(row, UnPartialStatedEventStreamRow)
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 26d79c6e62..2045169b9a 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -493,8 +493,6 @@ class StateStorageController:
                  up to date.
         """
         # FIXME(faster_joins): what do we do here?
-        #   https://github.com/matrix-org/synapse/issues/12814
-        #   https://github.com/matrix-org/synapse/issues/12815
         #   https://github.com/matrix-org/synapse/issues/13008
 
         return await self.stores.main.get_partial_current_state_deltas(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 24127d0364..f42af34a2f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -110,6 +110,10 @@ event_fetch_ongoing_gauge = Gauge(
 )
 
 
+class InvalidEventError(Exception):
+    """The event retrieved from the database is invalid and cannot be used."""
+
+
 @attr.s(slots=True, auto_attribs=True)
 class EventCacheEntry:
     event: EventBase
@@ -1310,7 +1314,7 @@ class EventsWorkerStore(SQLBaseStore):
                 # invites, so just accept it for all membership events.
                 #
                 if d["type"] != EventTypes.Member:
-                    raise Exception(
+                    raise InvalidEventError(
                         "Room %s for event %s is unknown" % (d["room_id"], event_id)
                     )
 
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index aea96e9d24..84f844b79e 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -292,6 +292,7 @@ class RelationsWorkerStore(SQLBaseStore):
                         to_device_key=0,
                         device_list_key=0,
                         groups_key=0,
+                        un_partial_stated_rooms_key=0,
                     )
 
             return events[:limit], next_token
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 6a65b2a89b..3aa7b94560 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -26,6 +26,7 @@ from typing import (
     Mapping,
     Optional,
     Sequence,
+    Set,
     Tuple,
     Union,
     cast,
@@ -1294,10 +1295,44 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             instance_name
         )
 
+    async def get_un_partial_stated_rooms_between(
+        self, last_id: int, current_id: int, room_ids: Collection[str]
+    ) -> Set[str]:
+        """Get all rooms that got un partial stated between `last_id` exclusive and
+        `current_id` inclusive.
+
+        Returns:
+            The list of room ids.
+        """
+
+        if last_id == current_id:
+            return set()
+
+        def _get_un_partial_stated_rooms_between_txn(
+            txn: LoggingTransaction,
+        ) -> Set[str]:
+            sql = """
+                SELECT DISTINCT room_id FROM un_partial_stated_room_stream
+                WHERE ? < stream_id AND stream_id <= ? AND
+            """
+
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", room_ids
+            )
+
+            txn.execute(sql + clause, [last_id, current_id] + args)
+
+            return {r[0] for r in txn}
+
+        return await self.db_pool.runInteraction(
+            "get_un_partial_stated_rooms_between",
+            _get_un_partial_stated_rooms_between_txn,
+        )
+
     async def get_un_partial_stated_rooms_from_stream(
         self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
-        """Get updates for caches replication stream.
+        """Get updates for un partial stated rooms replication stream.
 
         Args:
             instance_name: The writer we want to fetch updates from. Unused
@@ -2304,16 +2339,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             (room_id,),
         )
 
-    async def clear_partial_state_room(self, room_id: str) -> bool:
+    async def clear_partial_state_room(self, room_id: str) -> Optional[int]:
         """Clears the partial state flag for a room.
 
         Args:
             room_id: The room whose partial state flag is to be cleared.
 
         Returns:
-            `True` if the partial state flag has been cleared successfully.
+            The corresponding stream id for the un-partial-stated rooms stream.
 
-            `False` if the partial state flag could not be cleared because the room
+            `None` if the partial state flag could not be cleared because the room
             still contains events with partial state.
         """
         try:
@@ -2324,7 +2359,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
                     room_id,
                     un_partial_state_room_stream_id,
                 )
-                return True
+                return un_partial_state_room_stream_id
         except self.db_pool.engine.module.IntegrityError as e:
             # Assume that any `IntegrityError`s are due to partial state events.
             logger.info(
@@ -2332,7 +2367,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
                 room_id,
                 e,
             )
-            return False
+            return None
 
     def _clear_partial_state_room_txn(
         self,
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index f02c1d7ea7..8e2ba7b7b4 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -15,6 +15,7 @@
 import logging
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Collection,
     Dict,
     FrozenSet,
@@ -47,7 +48,13 @@ from synapse.storage.roommember import (
     ProfileInfo,
     RoomsForUser,
 )
-from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain_from_id
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    StateMap,
+    StrCollection,
+    get_domain_from_id,
+)
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -385,7 +392,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         self,
         user_id: str,
         membership_list: Collection[str],
-        excluded_rooms: Optional[List[str]] = None,
+        excluded_rooms: StrCollection = (),
     ) -> List[RoomsForUser]:
         """Get all the rooms for this *local* user where the membership for this user
         matches one in the membership list.
@@ -412,10 +419,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         )
 
         # Now we filter out forgotten and excluded rooms
-        rooms_to_exclude: Set[str] = await self.get_forgotten_rooms_for_user(user_id)
+        rooms_to_exclude = await self.get_forgotten_rooms_for_user(user_id)
 
         if excluded_rooms is not None:
-            rooms_to_exclude.update(set(excluded_rooms))
+            # Take a copy to avoid mutating the in-cache set
+            rooms_to_exclude = set(rooms_to_exclude)
+            rooms_to_exclude.update(excluded_rooms)
 
         return [room for room in rooms if room.room_id not in rooms_to_exclude]
 
@@ -1169,7 +1178,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         return count == 0
 
     @cached()
-    async def get_forgotten_rooms_for_user(self, user_id: str) -> Set[str]:
+    async def get_forgotten_rooms_for_user(self, user_id: str) -> AbstractSet[str]:
         """Gets all rooms the user has forgotten.
 
         Args:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 356d4ca788..0c1cbd540d 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -29,6 +29,7 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
+from synapse.storage.databases.main.events_worker import InvalidEventError
 from synapse.storage.databases.main.state_deltas import StateDeltasStore
 from synapse.types import JsonDict
 from synapse.util.caches.descriptors import cached
@@ -554,7 +555,17 @@ class StatsStore(StateDeltasStore):
             "get_initial_state_for_room", _fetch_current_state_stats
         )
 
-        state_event_map = await self.get_events(event_ids, get_prev_content=False)  # type: ignore[attr-defined]
+        try:
+            state_event_map = await self.get_events(event_ids, get_prev_content=False)  # type: ignore[attr-defined]
+        except InvalidEventError as e:
+            # If an exception occurs fetching events then the room is broken;
+            # skip process it to avoid being stuck on a room.
+            logger.warning(
+                "Failed to fetch events for room %s, skipping stats calculation: %r.",
+                room_id,
+                e,
+            )
+            return
 
         room_state: Dict[str, Union[None, bool, str]] = {
             "join_rules": None,
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 63d8350530..d28fc65df9 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -67,7 +67,7 @@ from synapse.storage.database import (
     make_in_list_sql_clause,
 )
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
 from synapse.types import PersistedEventPosition, RoomStreamToken
 from synapse.util.caches.descriptors import cached
@@ -944,12 +944,40 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             room_id
             stream_key
         """
-        sql = (
-            "SELECT coalesce(MIN(topological_ordering), 0) FROM events"
-            " WHERE room_id = ? AND stream_ordering >= ?"
-        )
+        if isinstance(self.database_engine, PostgresEngine):
+            min_function = "LEAST"
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            min_function = "MIN"
+        else:
+            raise RuntimeError(f"Unknown database engine {self.database_engine}")
+
+        # This query used to be
+        #    SELECT COALESCE(MIN(topological_ordering), 0) FROM events
+        #    WHERE room_id = ? and events.stream_ordering >= {stream_key}
+        # which returns 0 if the stream_key is newer than any event in
+        # the room. That's not wrong, but it seems to interact oddly with backfill,
+        # requiring a second call to /messages to actually backfill from a remote
+        # homeserver.
+        #
+        # Instead, rollback the stream ordering to that after the most recent event in
+        # this room.
+        sql = f"""
+            WITH fallback(max_stream_ordering) AS (
+                SELECT MAX(stream_ordering)
+                FROM events
+                WHERE room_id = ?
+            )
+            SELECT COALESCE(MIN(topological_ordering), 0) FROM events
+            WHERE
+                room_id = ?
+                AND events.stream_ordering >= {min_function}(
+                    ?,
+                    (SELECT max_stream_ordering FROM fallback)
+                )
+        """
+
         row = await self.db_pool.execute(
-            "get_current_topological_token", None, sql, room_id, stream_key
+            "get_current_topological_token", None, sql, room_id, room_id, stream_key
         )
         return row[0][0] if row else 0
 
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 619eb7f601..d7084d2358 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -53,11 +53,15 @@ class EventSources:
             *(attribute.type(hs) for attribute in attr.fields(_EventSourcesInner))
         )
         self.store = hs.get_datastores().main
+        self._instance_name = hs.get_instance_name()
 
     def get_current_token(self) -> StreamToken:
         push_rules_key = self.store.get_max_push_rules_stream_id()
         to_device_key = self.store.get_to_device_stream_token()
         device_list_key = self.store.get_device_stream_token()
+        un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token(
+            self._instance_name
+        )
 
         token = StreamToken(
             room_key=self.sources.room.get_current_key(),
@@ -70,6 +74,7 @@ class EventSources:
             device_list_key=device_list_key,
             # Groups key is unused.
             groups_key=0,
+            un_partial_stated_rooms_key=un_partial_stated_rooms_key,
         )
         return token
 
@@ -107,5 +112,6 @@ class EventSources:
             to_device_key=0,
             device_list_key=0,
             groups_key=0,
+            un_partial_stated_rooms_key=0,
         )
         return token
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index c59eca2430..f82d1cfc29 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -17,6 +17,7 @@ import re
 import string
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Any,
     ClassVar,
     Dict,
@@ -79,7 +80,7 @@ JsonSerializable = object
 
 # Collection[str] that does not include str itself; str being a Sequence[str]
 # is very misleading and results in bugs.
-StrCollection = Union[Tuple[str, ...], List[str], Set[str]]
+StrCollection = Union[Tuple[str, ...], List[str], AbstractSet[str]]
 
 
 # Note that this seems to require inheriting *directly* from Interface in order
@@ -633,6 +634,7 @@ class StreamKeyType:
     PUSH_RULES: Final = "push_rules_key"
     TO_DEVICE: Final = "to_device_key"
     DEVICE_LIST: Final = "device_list_key"
+    UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -640,7 +642,7 @@ class StreamToken:
     """A collection of keys joined together by underscores in the following
     order and which represent the position in their respective streams.
 
-    ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1`
+    ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
         1. `room_key`: `s2633508` which is a `RoomStreamToken`
            - `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
            - See the docstring for `RoomStreamToken` for more details.
@@ -652,12 +654,13 @@ class StreamToken:
         7. `to_device_key`: `274711`
         8. `device_list_key`: `265584`
         9. `groups_key`: `1` (note that this key is now unused)
+        10. `un_partial_stated_rooms_key`: `379`
 
     You can see how many of these keys correspond to the various
     fields in a "/sync" response:
     ```json
     {
-        "next_batch": "s12_4_0_1_1_1_1_4_1",
+        "next_batch": "s12_4_0_1_1_1_1_4_1_1",
         "presence": {
             "events": []
         },
@@ -669,7 +672,7 @@ class StreamToken:
                 "!QrZlfIDQLNLdZHqTnt:hs1": {
                     "timeline": {
                         "events": [],
-                        "prev_batch": "s10_4_0_1_1_1_1_4_1",
+                        "prev_batch": "s10_4_0_1_1_1_1_4_1_1",
                         "limited": false
                     },
                     "state": {
@@ -705,6 +708,7 @@ class StreamToken:
     device_list_key: int
     # Note that the groups key is no longer used and may have bogus values.
     groups_key: int
+    un_partial_stated_rooms_key: int
 
     _SEPARATOR = "_"
     START: ClassVar["StreamToken"]
@@ -743,6 +747,7 @@ class StreamToken:
                 # serialized so that there will not be confusion in the future
                 # if additional tokens are added.
                 str(self.groups_key),
+                str(self.un_partial_stated_rooms_key),
             ]
         )
 
@@ -775,7 +780,7 @@ class StreamToken:
         return attr.evolve(self, **{key: new_value})
 
 
-StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
+StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0)
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index e0f5d54aba..453a6e979c 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -1831,7 +1831,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
 
     def test_topo_token_is_accepted(self) -> None:
         """Test Topo Token is accepted."""
-        token = "t1-0_0_0_0_0_0_0_0_0"
+        token = "t1-0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
@@ -1845,7 +1845,7 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
 
     def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
         """Test that stream token is accepted for forward pagination."""
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index b4daace556..9222cab198 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -1987,7 +1987,7 @@ class RoomMessageListTestCase(RoomBase):
         self.room_id = self.helper.create_room_as(self.user_id)
 
     def test_topo_token_is_accepted(self) -> None:
-        token = "t1-0_0_0_0_0_0_0_0_0"
+        token = "t1-0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
         )
@@ -1998,7 +1998,7 @@ class RoomMessageListTestCase(RoomBase):
         self.assertTrue("end" in channel.json_body)
 
     def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
         )
@@ -2728,7 +2728,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
         """Test that we can filter by a label on a /messages request."""
         self._send_labelled_messages_in_room()
 
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2745,7 +2745,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
         """Test that we can filter by the absence of a label on a /messages request."""
         self._send_labelled_messages_in_room()
 
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
@@ -2768,7 +2768,7 @@ class LabelsTestCase(unittest.HomeserverTestCase):
         """
         self._send_labelled_messages_in_room()
 
-        token = "s0_0_0_0_0_0_0_0_0"
+        token = "s0_0_0_0_0_0_0_0_0_0"
         channel = self.make_request(
             "GET",
             "/rooms/%s/messages?access_token=%s&from=%s&filter=%s"
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 0af643ecd9..c9afa0f3dd 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -913,7 +913,9 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
 
         # We need to manually append the room ID, because we can't know the ID before
         # creating the room, and we can't set the config after starting the homeserver.
-        self.hs.get_sync_handler().rooms_to_exclude.append(self.excluded_room_id)
+        self.hs.get_sync_handler().rooms_to_exclude_globally.append(
+            self.excluded_room_id
+        )
 
     def test_join_leave(self) -> None:
         """Tests that rooms are correctly excluded from the 'join' and 'leave' sections of
diff --git a/tests/storage/databases/main/test_room.py b/tests/storage/databases/main/test_room.py
index 7d961fac64..3108ca3444 100644
--- a/tests/storage/databases/main/test_room.py
+++ b/tests/storage/databases/main/test_room.py
@@ -40,9 +40,23 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
         self.token = self.login("foo", "pass")
 
     def _generate_room(self) -> str:
-        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
+        """Create a room and return the room ID."""
+        return self.helper.create_room_as(self.user_id, tok=self.token)
 
-        return room_id
+    def run_background_updates(self, update_name: str) -> None:
+        """Insert and run the background update."""
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "background_updates",
+                {"update_name": update_name, "progress_json": "{}"},
+            )
+        )
+
+        # ... and tell the DataStore that it hasn't finished all updates yet
+        self.store.db_pool.updates._all_done = False
+
+        # Now let's actually drive the updates to completion
+        self.wait_for_background_updates()
 
     def test_background_populate_rooms_creator_column(self) -> None:
         """Test that the background update to populate the rooms creator column
@@ -71,22 +85,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
         )
         self.assertEqual(room_creator_before, None)
 
-        # Insert and run the background update.
-        self.get_success(
-            self.store.db_pool.simple_insert(
-                "background_updates",
-                {
-                    "update_name": _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
-                    "progress_json": "{}",
-                },
-            )
-        )
-
-        # ... and tell the DataStore that it hasn't finished all updates yet
-        self.store.db_pool.updates._all_done = False
-
-        # Now let's actually drive the updates to completion
-        self.wait_for_background_updates()
+        self.run_background_updates(_BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN)
 
         # Make sure the background update filled in the room creator
         room_creator_after = self.get_success(
@@ -137,22 +136,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
             )
         )
 
-        # Insert and run the background update
-        self.get_success(
-            self.store.db_pool.simple_insert(
-                "background_updates",
-                {
-                    "update_name": _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN,
-                    "progress_json": "{}",
-                },
-            )
-        )
-
-        # ... and tell the DataStore that it hasn't finished all updates yet
-        self.store.db_pool.updates._all_done = False
-
-        # Now let's actually drive the updates to completion
-        self.wait_for_background_updates()
+        self.run_background_updates(_BackgroundUpdates.ADD_ROOM_TYPE_COLUMN)
 
         # Make sure the background update filled in the room type
         room_type_after = self.get_success(
@@ -164,3 +148,39 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
             )
         )
         self.assertEqual(room_type_after, RoomTypes.SPACE)
+
+    def test_populate_stats_broken_rooms(self) -> None:
+        """Ensure that re-populating room stats skips broken rooms."""
+
+        # Create a good room.
+        good_room_id = self._generate_room()
+
+        # Create a room and then break it by having no room version.
+        room_id = self._generate_room()
+        self.get_success(
+            self.store.db_pool.simple_update(
+                table="rooms",
+                keyvalues={"room_id": room_id},
+                updatevalues={"room_version": None},
+                desc="test",
+            )
+        )
+
+        # Nuke any current stats in the database.
+        self.get_success(
+            self.store.db_pool.simple_delete(
+                table="room_stats_state", keyvalues={"1": 1}, desc="test"
+            )
+        )
+
+        self.run_background_updates("populate_stats_process_rooms")
+
+        # Only the good room appears in the stats tables.
+        results = self.get_success(
+            self.store.db_pool.simple_select_onecol(
+                table="room_stats_state",
+                keyvalues={},
+                retcol="room_id",
+            )
+        )
+        self.assertEqual(results, [good_room_id])