summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/latest_deps.yml2
-rw-r--r--.github/workflows/release-artifacts.yml10
-rw-r--r--.github/workflows/tests.yml8
-rw-r--r--changelog.d/13776.feature1
-rw-r--r--changelog.d/13824.feature1
-rw-r--r--changelog.d/13877.feature1
-rw-r--r--changelog.d/13878.feature1
-rw-r--r--changelog.d/13983.misc1
-rw-r--r--changelog.d/13984.misc1
-rw-r--r--changelog.d/13985.misc1
-rw-r--r--changelog.d/13986.misc1
-rw-r--r--changelog.d/13987.misc1
-rw-r--r--changelog.d/13988.misc1
-rw-r--r--changelog.d/13991.misc1
-rw-r--r--changelog.d/13996.feature1
-rw-r--r--changelog.d/13997.feature1
-rw-r--r--changelog.d/14006.misc1
-rw-r--r--changelog.d/14032.feature1
-rw-r--r--changelog.d/14033.misc1
-rw-r--r--changelog.d/14041.misc1
-rw-r--r--changelog.d/14046.misc1
-rw-r--r--changelog.d/14053.bugfix1
-rw-r--r--changelog.d/14063.misc1
-rw-r--r--changelog.d/14087.doc1
-rw-r--r--docker/Dockerfile-workers6
-rw-r--r--docs/development/contributing_guide.md2
-rw-r--r--docs/openid.md2
-rw-r--r--poetry.lock113
-rw-r--r--pyproject.toml2
-rw-r--r--rust/src/push/base_rules.rs2
-rw-r--r--rust/src/push/mod.rs9
-rw-r--r--stubs/synapse/synapse_rust/push.pyi6
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/filtering.py10
-rw-r--r--synapse/appservice/api.py23
-rw-r--r--synapse/config/experimental.py5
-rw-r--r--synapse/handlers/federation_event.py4
-rw-r--r--synapse/handlers/message.py25
-rw-r--r--synapse/handlers/sync.py189
-rw-r--r--synapse/notifier.py75
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py9
-rw-r--r--synapse/push/push_tools.py9
-rw-r--r--synapse/replication/tcp/client.py19
-rw-r--r--synapse/rest/client/receipts.py22
-rw-r--r--synapse/rest/client/sync.py4
-rw-r--r--synapse/rest/client/versions.py4
-rw-r--r--synapse/storage/database.py2
-rw-r--r--synapse/storage/databases/main/event_push_actions.py509
-rw-r--r--synapse/storage/databases/main/push_rule.py9
-rw-r--r--synapse/storage/databases/main/relations.py36
-rw-r--r--synapse/storage/databases/main/room.py43
-rw-r--r--synapse/storage/schema/__init__.py6
-rw-r--r--synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql29
-rw-r--r--synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres19
-rw-r--r--synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite101
-rw-r--r--synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres23
-rw-r--r--synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite76
-rw-r--r--tests/appservice/test_api.py8
-rw-r--r--tests/replication/slave/storage/test_events.py17
-rw-r--r--tests/rest/client/test_rooms.py41
-rw-r--r--tests/storage/test_event_push_actions.py489
61 files changed, 1551 insertions, 441 deletions
diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml
index e240bf4e4f..b1e45ee648 100644
--- a/.github/workflows/latest_deps.yml
+++ b/.github/workflows/latest_deps.yml
@@ -76,7 +76,7 @@ jobs:
             -e POSTGRES_PASSWORD=postgres \
             -e POSTGRES_INITDB_ARGS="--lc-collate C --lc-ctype C --encoding UTF8" \
             postgres:${{ matrix.postgres-version }}
-      - uses: actions/setup-python@v2
+      - uses: actions/setup-python@v4
         with:
           python-version: "3.x"
       - run: pip install .[all,test]
diff --git a/.github/workflows/release-artifacts.yml b/.github/workflows/release-artifacts.yml
index eb12d88fbc..1c004fbc11 100644
--- a/.github/workflows/release-artifacts.yml
+++ b/.github/workflows/release-artifacts.yml
@@ -26,7 +26,7 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
-      - uses: actions/setup-python@v2
+      - uses: actions/setup-python@v4
       - id: set-distros
         run: |
           # if we're running from a tag, get the full list of distros; otherwise just use debian:sid
@@ -69,7 +69,7 @@ jobs:
             ${{ runner.os }}-buildx-
 
       - name: Set up python
-        uses: actions/setup-python@v2
+        uses: actions/setup-python@v4
 
       - name: Build the packages
         # see https://github.com/docker/build-push-action/issues/252
@@ -107,7 +107,11 @@ jobs:
     steps:
       - uses: actions/checkout@v3
 
-      - uses: actions/setup-python@v3
+      - uses: actions/setup-python@v4
+        with:
+          # setup-python@v4 doesn't impose a default python version. Need to use 3.x
+          # here, because `python` on osx points to Python 2.7.
+          python-version: "3.x"
 
       - name: Install cibuildwheel
         run: python -m pip install cibuildwheel==2.9.0 poetry==1.2.0
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 94eb58b59d..1115dcc5f3 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -32,7 +32,7 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
-      - uses: actions/setup-python@v2
+      - uses: actions/setup-python@v4
       - uses: matrix-org/setup-python-poetry@v1
         with:
           extras: "all"
@@ -43,7 +43,7 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
-      - uses: actions/setup-python@v2
+      - uses: actions/setup-python@v4
       - run: "pip install 'click==8.1.1' 'GitPython>=3.1.20'"
       - run: scripts-dev/check_schema_delta.py --force-colors
 
@@ -67,7 +67,7 @@ jobs:
         with:
           ref: ${{ github.event.pull_request.head.sha }}
           fetch-depth: 0
-      - uses: actions/setup-python@v2
+      - uses: actions/setup-python@v4
       - run: "pip install 'towncrier>=18.6.0rc1'"
       - run: scripts-dev/check-newsfragment.sh
         env:
@@ -142,7 +142,7 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v3
-      - uses: actions/setup-python@v2
+      - uses: actions/setup-python@v4
       - id: get-matrix
         run: .ci/scripts/calculate_jobs.py
     outputs:
diff --git a/changelog.d/13776.feature b/changelog.d/13776.feature
new file mode 100644
index 0000000000..22bce125ce
--- /dev/null
+++ b/changelog.d/13776.feature
@@ -0,0 +1 @@
+Experimental support for thread-specific notifications ([MSC3773](https://github.com/matrix-org/matrix-spec-proposals/pull/3773)).
diff --git a/changelog.d/13824.feature b/changelog.d/13824.feature
new file mode 100644
index 0000000000..d0cb902dff
--- /dev/null
+++ b/changelog.d/13824.feature
@@ -0,0 +1 @@
+Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
diff --git a/changelog.d/13877.feature b/changelog.d/13877.feature
new file mode 100644
index 0000000000..d0cb902dff
--- /dev/null
+++ b/changelog.d/13877.feature
@@ -0,0 +1 @@
+Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
diff --git a/changelog.d/13878.feature b/changelog.d/13878.feature
new file mode 100644
index 0000000000..d0cb902dff
--- /dev/null
+++ b/changelog.d/13878.feature
@@ -0,0 +1 @@
+Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
diff --git a/changelog.d/13983.misc b/changelog.d/13983.misc
new file mode 100644
index 0000000000..69ed75be90
--- /dev/null
+++ b/changelog.d/13983.misc
@@ -0,0 +1 @@
+Bump actions/setup-python from 2 to 4.
diff --git a/changelog.d/13984.misc b/changelog.d/13984.misc
new file mode 100644
index 0000000000..120d042fad
--- /dev/null
+++ b/changelog.d/13984.misc
@@ -0,0 +1 @@
+Bump types-pyopenssl from 22.0.0 to 22.0.10.
diff --git a/changelog.d/13985.misc b/changelog.d/13985.misc
new file mode 100644
index 0000000000..48e8960720
--- /dev/null
+++ b/changelog.d/13985.misc
@@ -0,0 +1 @@
+Bump jsonschema from 4.4.0 to 4.16.0.
diff --git a/changelog.d/13986.misc b/changelog.d/13986.misc
new file mode 100644
index 0000000000..f71aed474b
--- /dev/null
+++ b/changelog.d/13986.misc
@@ -0,0 +1 @@
+Bump types-setuptools from 57.4.9 to 65.4.0.0.
diff --git a/changelog.d/13987.misc b/changelog.d/13987.misc
new file mode 100644
index 0000000000..b1164201be
--- /dev/null
+++ b/changelog.d/13987.misc
@@ -0,0 +1 @@
+Bump types-requests from 2.27.11 to 2.28.11.
diff --git a/changelog.d/13988.misc b/changelog.d/13988.misc
new file mode 100644
index 0000000000..e8c6b4e9b1
--- /dev/null
+++ b/changelog.d/13988.misc
@@ -0,0 +1 @@
+Bump isort from 5.7.0 to 5.10.1.
diff --git a/changelog.d/13991.misc b/changelog.d/13991.misc
new file mode 100644
index 0000000000..f425fb17b2
--- /dev/null
+++ b/changelog.d/13991.misc
@@ -0,0 +1 @@
+Optimise queries used to get a users rooms during sync. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/changelog.d/13996.feature b/changelog.d/13996.feature
new file mode 100644
index 0000000000..771f1c97a3
--- /dev/null
+++ b/changelog.d/13996.feature
@@ -0,0 +1 @@
+Send application service access tokens as a header (and query parameter). Implement [MSC2832](https://github.com/matrix-org/matrix-spec-proposals/pull/2832).
diff --git a/changelog.d/13997.feature b/changelog.d/13997.feature
new file mode 100644
index 0000000000..23f7ed106f
--- /dev/null
+++ b/changelog.d/13997.feature
@@ -0,0 +1 @@
+Ignore server ACL changes when generating pushes. Implement [MSC3786](https://github.com/matrix-org/matrix-spec-proposals/pull/3786).
diff --git a/changelog.d/14006.misc b/changelog.d/14006.misc
new file mode 100644
index 0000000000..c06dcadf02
--- /dev/null
+++ b/changelog.d/14006.misc
@@ -0,0 +1 @@
+Update authlib from 0.15.5 to 1.1.0.
diff --git a/changelog.d/14032.feature b/changelog.d/14032.feature
new file mode 100644
index 0000000000..bb221d3ca6
--- /dev/null
+++ b/changelog.d/14032.feature
@@ -0,0 +1 @@
+Advertise Matrix 1.3 support on `/_matrix/client/versions`.
diff --git a/changelog.d/14033.misc b/changelog.d/14033.misc
new file mode 100644
index 0000000000..fe42852aa5
--- /dev/null
+++ b/changelog.d/14033.misc
@@ -0,0 +1 @@
+Don't repeatedly wake up the same users for batched events.
\ No newline at end of file
diff --git a/changelog.d/14041.misc b/changelog.d/14041.misc
new file mode 100644
index 0000000000..a2119627f8
--- /dev/null
+++ b/changelog.d/14041.misc
@@ -0,0 +1 @@
+Bump types-pyyaml from 6.0.4 to 6.0.12.
diff --git a/changelog.d/14046.misc b/changelog.d/14046.misc
new file mode 100644
index 0000000000..69ed75be90
--- /dev/null
+++ b/changelog.d/14046.misc
@@ -0,0 +1 @@
+Bump actions/setup-python from 2 to 4.
diff --git a/changelog.d/14053.bugfix b/changelog.d/14053.bugfix
new file mode 100644
index 0000000000..07769f51d0
--- /dev/null
+++ b/changelog.d/14053.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 1.53.0 when querying `/publicRooms` with both a `room_type` filter and a `third_party_instance_id`.
diff --git a/changelog.d/14063.misc b/changelog.d/14063.misc
new file mode 100644
index 0000000000..f0d1e47f1a
--- /dev/null
+++ b/changelog.d/14063.misc
@@ -0,0 +1 @@
+Complement test image: capture logs from nginx.
diff --git a/changelog.d/14087.doc b/changelog.d/14087.doc
new file mode 100644
index 0000000000..28d1ce67c5
--- /dev/null
+++ b/changelog.d/14087.doc
@@ -0,0 +1 @@
+The changelog entry ending in a full stop or exclamation mark is not optional.
diff --git a/docker/Dockerfile-workers b/docker/Dockerfile-workers
index 003a1cc3bf..0c2d4f3047 100644
--- a/docker/Dockerfile-workers
+++ b/docker/Dockerfile-workers
@@ -40,7 +40,11 @@ FROM matrixdotorg/synapse:$SYNAPSE_VERSION
     COPY --from=deps_base /etc/nginx /etc/nginx
     RUN rm /etc/nginx/sites-enabled/default
     RUN mkdir /var/log/nginx /var/lib/nginx
-    RUN chown www-data /var/log/nginx /var/lib/nginx
+    RUN chown www-data /var/lib/nginx
+
+    # have nginx log to stderr/out
+    RUN ln -sf /dev/stdout /var/log/nginx/access.log
+    RUN ln -sf /dev/stderr /var/log/nginx/error.log
 
     # Copy Synapse worker, nginx and supervisord configuration template files
     COPY ./docker/conf-workers/* /conf/
diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md
index 5c37225168..7f99220a3b 100644
--- a/docs/development/contributing_guide.md
+++ b/docs/development/contributing_guide.md
@@ -390,7 +390,7 @@ This file will become part of our [changelog](
 https://github.com/matrix-org/synapse/blob/master/CHANGES.md) at the next
 release, so the content of the file should be a short description of your
 change in the same style as the rest of the changelog. The file can contain Markdown
-formatting, and should end with a full stop (.) or an exclamation mark (!) for
+formatting, and must end with a full stop (.) or an exclamation mark (!) for
 consistency.
 
 Adding credits to the changelog is encouraged, we value your
diff --git a/docs/openid.md b/docs/openid.md
index ce9b026228..45ba1947b3 100644
--- a/docs/openid.md
+++ b/docs/openid.md
@@ -423,7 +423,7 @@ Synapse config:
     user_mapping_provider:
       config:
         display_name_template: "{{ user.name }}"
-        email_template: "{{ '{{ user.email }}' }}"
+        email_template: "{{ user.email }}"
 ```
 
 Relevant documents:
diff --git a/poetry.lock b/poetry.lock
index 63ef8573a0..f43fedd4a7 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -13,18 +13,15 @@ tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympl
 tests_no_zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"]
 
 [[package]]
-name = "authlib"
-version = "0.15.5"
-description = "The ultimate Python library in building OAuth and OpenID Connect servers."
+name = "Authlib"
+version = "1.1.0"
+description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients."
 category = "main"
 optional = true
 python-versions = "*"
 
 [package.dependencies]
-cryptography = "*"
-
-[package.extras]
-client = ["requests"]
+cryptography = ">=3.2"
 
 [[package]]
 name = "automat"
@@ -399,15 +396,16 @@ scripts = ["click (>=6.0)", "twisted (>=16.4.0)"]
 
 [[package]]
 name = "isort"
-version = "5.7.0"
+version = "5.10.1"
 description = "A Python utility / library to sort Python imports."
 category = "dev"
 optional = false
-python-versions = ">=3.6,<4.0"
+python-versions = ">=3.6.1,<4.0"
 
 [package.extras]
 colors = ["colorama (>=0.4.3,<0.5.0)"]
 pipfile_deprecated_finder = ["pipreqs", "requirementslib"]
+plugins = ["setuptools"]
 requirements_deprecated_finder = ["pip-api", "pipreqs"]
 
 [[package]]
@@ -455,7 +453,7 @@ i18n = ["Babel (>=2.7)"]
 
 [[package]]
 name = "jsonschema"
-version = "4.4.0"
+version = "4.16.0"
 description = "An implementation of JSON Schema validation for Python"
 category = "main"
 optional = false
@@ -465,12 +463,13 @@ python-versions = ">=3.7"
 attrs = ">=17.4.0"
 importlib-metadata = {version = "*", markers = "python_version < \"3.8\""}
 importlib-resources = {version = ">=1.4.0", markers = "python_version < \"3.9\""}
+pkgutil-resolve-name = {version = ">=1.3.10", markers = "python_version < \"3.9\""}
 pyrsistent = ">=0.14.0,<0.17.0 || >0.17.0,<0.17.1 || >0.17.1,<0.17.2 || >0.17.2"
 typing-extensions = {version = "*", markers = "python_version < \"3.8\""}
 
 [package.extras]
 format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"]
-format_nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=1.11)"]
+format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=1.11)"]
 
 [[package]]
 name = "keyring"
@@ -691,6 +690,14 @@ python-versions = "*"
 testing = ["coverage", "nose"]
 
 [[package]]
+name = "pkgutil_resolve_name"
+version = "1.3.10"
+description = "Resolve a name to an object."
+category = "main"
+optional = false
+python-versions = ">=3.6"
+
+[[package]]
 name = "platformdirs"
 version = "2.5.1"
 description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
@@ -1442,8 +1449,8 @@ optional = false
 python-versions = "*"
 
 [[package]]
-name = "types-pyopenssl"
-version = "22.0.0"
+name = "types-pyOpenSSL"
+version = "22.0.10"
 description = "Typing stubs for pyOpenSSL"
 category = "dev"
 optional = false
@@ -1453,8 +1460,8 @@ python-versions = "*"
 types-cryptography = "*"
 
 [[package]]
-name = "types-pyyaml"
-version = "6.0.4"
+name = "types-PyYAML"
+version = "6.0.12"
 description = "Typing stubs for PyYAML"
 category = "dev"
 optional = false
@@ -1462,7 +1469,7 @@ python-versions = "*"
 
 [[package]]
 name = "types-requests"
-version = "2.27.11"
+version = "2.28.11"
 description = "Typing stubs for requests"
 category = "dev"
 optional = false
@@ -1473,7 +1480,7 @@ types-urllib3 = "<1.27"
 
 [[package]]
 name = "types-setuptools"
-version = "57.4.9"
+version = "65.4.0.0"
 description = "Typing stubs for setuptools"
 category = "dev"
 optional = false
@@ -1626,16 +1633,16 @@ url_preview = ["lxml"]
 [metadata]
 lock-version = "1.1"
 python-versions = "^3.7.1"
-content-hash = "1b14fc274d9e2a495a7f864150f3ffcf4d9f585e09a67e53301ae4ef3c2f3e48"
+content-hash = "9d74da808739e4c3d15a2d3473f01ad419f62aec8bf28613b03bd69136c4745b"
 
 [metadata.files]
 attrs = [
     {file = "attrs-21.4.0-py2.py3-none-any.whl", hash = "sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4"},
     {file = "attrs-21.4.0.tar.gz", hash = "sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"},
 ]
-authlib = [
-    {file = "Authlib-0.15.5-py2.py3-none-any.whl", hash = "sha256:ecf4a7a9f2508c0bb07e93a752dd3c495cfaffc20e864ef0ffc95e3f40d2abaf"},
-    {file = "Authlib-0.15.5.tar.gz", hash = "sha256:b83cf6360c8e92b0e9df0d1f32d675790bcc4e3c03977499b1eed24dcdef4252"},
+Authlib = [
+    {file = "Authlib-1.1.0-py2.py3-none-any.whl", hash = "sha256:be4b6a1dea51122336c210a6945b27a105b9ac572baffd15b07bcff4376c1523"},
+    {file = "Authlib-1.1.0.tar.gz", hash = "sha256:0a270c91409fc2b7b0fbee6996e09f2ee3187358762111a9a4225c874b94e891"},
 ]
 automat = [
     {file = "Automat-20.2.0-py2.py3-none-any.whl", hash = "sha256:b6feb6455337df834f6c9962d6ccf771515b7d939bca142b29c20c2376bc6111"},
@@ -1970,8 +1977,8 @@ incremental = [
     {file = "incremental-21.3.0.tar.gz", hash = "sha256:02f5de5aff48f6b9f665d99d48bfc7ec03b6e3943210de7cfc88856d755d6f57"},
 ]
 isort = [
-    {file = "isort-5.7.0-py3-none-any.whl", hash = "sha256:fff4f0c04e1825522ce6949973e83110a6e907750cd92d128b0d14aaaadbffdc"},
-    {file = "isort-5.7.0.tar.gz", hash = "sha256:c729845434366216d320e936b8ad6f9d681aab72dc7cbc2d51bedc3582f3ad1e"},
+    {file = "isort-5.10.1-py3-none-any.whl", hash = "sha256:6f62d78e2f89b4500b080fe3a81690850cd254227f27f75c3a0c491a1f351ba7"},
+    {file = "isort-5.10.1.tar.gz", hash = "sha256:e8443a5e7a020e9d7f97f1d7d9cd17c88bcb3bc7e218bf9cf5095fe550be2951"},
 ]
 jaeger-client = [
     {file = "jaeger-client-4.8.0.tar.gz", hash = "sha256:3157836edab8e2c209bd2d6ae61113db36f7ee399e66b1dcbb715d87ab49bfe0"},
@@ -1985,18 +1992,15 @@ jinja2 = [
     {file = "Jinja2-3.0.3.tar.gz", hash = "sha256:611bb273cd68f3b993fabdc4064fc858c5b47a973cb5aa7999ec1ba405c87cd7"},
 ]
 jsonschema = [
-    {file = "jsonschema-4.4.0-py3-none-any.whl", hash = "sha256:77281a1f71684953ee8b3d488371b162419767973789272434bbc3f29d9c8823"},
-    {file = "jsonschema-4.4.0.tar.gz", hash = "sha256:636694eb41b3535ed608fe04129f26542b59ed99808b4f688aa32dcf55317a83"},
+    {file = "jsonschema-4.16.0-py3-none-any.whl", hash = "sha256:9e74b8f9738d6a946d70705dc692b74b5429cd0960d58e79ffecfc43b2221eb9"},
+    {file = "jsonschema-4.16.0.tar.gz", hash = "sha256:165059f076eff6971bae5b742fc029a7b4ef3f9bcf04c14e4776a7605de14b23"},
 ]
 keyring = [
     {file = "keyring-23.5.0-py3-none-any.whl", hash = "sha256:b0d28928ac3ec8e42ef4cc227822647a19f1d544f21f96457965dc01cf555261"},
     {file = "keyring-23.5.0.tar.gz", hash = "sha256:9012508e141a80bd1c0b6778d5c610dd9f8c464d75ac6774248500503f972fb9"},
 ]
 ldap3 = [
-    {file = "ldap3-2.9.1-py2.6.egg", hash = "sha256:5ab7febc00689181375de40c396dcad4f2659cd260fc5e94c508b6d77c17e9d5"},
-    {file = "ldap3-2.9.1-py2.7.egg", hash = "sha256:2bc966556fc4d4fa9f445a1c31dc484ee81d44a51ab0e2d0fd05b62cac75daa6"},
     {file = "ldap3-2.9.1-py2.py3-none-any.whl", hash = "sha256:5869596fc4948797020d3f03b7939da938778a0f9e2009f7a072ccf92b8e8d70"},
-    {file = "ldap3-2.9.1-py3.9.egg", hash = "sha256:5630d1383e09ba94839e253e013f1aa1a2cf7a547628ba1265cb7b9a844b5687"},
     {file = "ldap3-2.9.1.tar.gz", hash = "sha256:f3e7fc4718e3f09dda568b57100095e0ce58633bcabbed8667ce3f8fbaa4229f"},
 ]
 lxml = [
@@ -2259,6 +2263,10 @@ pkginfo = [
     {file = "pkginfo-1.8.2-py2.py3-none-any.whl", hash = "sha256:c24c487c6a7f72c66e816ab1796b96ac6c3d14d49338293d2141664330b55ffc"},
     {file = "pkginfo-1.8.2.tar.gz", hash = "sha256:542e0d0b6750e2e21c20179803e40ab50598d8066d51097a0e382cba9eb02bff"},
 ]
+pkgutil_resolve_name = [
+    {file = "pkgutil_resolve_name-1.3.10-py3-none-any.whl", hash = "sha256:ca27cc078d25c5ad71a9de0a7a330146c4e014c2462d9af19c6b828280649c5e"},
+    {file = "pkgutil_resolve_name-1.3.10.tar.gz", hash = "sha256:357d6c9e6a755653cfd78893817c0853af365dd51ec97f3d358a819373bbd174"},
+]
 platformdirs = [
     {file = "platformdirs-2.5.1-py3-none-any.whl", hash = "sha256:bcae7cab893c2d310a711b70b24efb93334febe65f8de776ee320b517471e227"},
     {file = "platformdirs-2.5.1.tar.gz", hash = "sha256:7535e70dfa32e84d4b34996ea99c5e432fa29a708d0f4e394bbcb2a8faa4f16d"},
@@ -2287,34 +2295,12 @@ psycopg2cffi-compat = [
     {file = "psycopg2cffi-compat-1.1.tar.gz", hash = "sha256:d25e921748475522b33d13420aad5c2831c743227dc1f1f2585e0fdb5c914e05"},
 ]
 pyasn1 = [
-    {file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"},
-    {file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"},
-    {file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"},
-    {file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"},
     {file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"},
-    {file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"},
-    {file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"},
-    {file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"},
-    {file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"},
-    {file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"},
-    {file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"},
-    {file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"},
     {file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"},
 ]
 pyasn1-modules = [
     {file = "pyasn1-modules-0.2.8.tar.gz", hash = "sha256:905f84c712230b2c592c19470d3ca8d552de726050d1d1716282a1f6146be65e"},
-    {file = "pyasn1_modules-0.2.8-py2.4.egg", hash = "sha256:0fe1b68d1e486a1ed5473f1302bd991c1611d319bba158e98b106ff86e1d7199"},
-    {file = "pyasn1_modules-0.2.8-py2.5.egg", hash = "sha256:fe0644d9ab041506b62782e92b06b8c68cca799e1a9636ec398675459e031405"},
-    {file = "pyasn1_modules-0.2.8-py2.6.egg", hash = "sha256:a99324196732f53093a84c4369c996713eb8c89d360a496b599fb1a9c47fc3eb"},
-    {file = "pyasn1_modules-0.2.8-py2.7.egg", hash = "sha256:0845a5582f6a02bb3e1bde9ecfc4bfcae6ec3210dd270522fee602365430c3f8"},
     {file = "pyasn1_modules-0.2.8-py2.py3-none-any.whl", hash = "sha256:a50b808ffeb97cb3601dd25981f6b016cbb3d31fbf57a8b8a87428e6158d0c74"},
-    {file = "pyasn1_modules-0.2.8-py3.1.egg", hash = "sha256:f39edd8c4ecaa4556e989147ebf219227e2cd2e8a43c7e7fcb1f1c18c5fd6a3d"},
-    {file = "pyasn1_modules-0.2.8-py3.2.egg", hash = "sha256:b80486a6c77252ea3a3e9b1e360bc9cf28eaac41263d173c032581ad2f20fe45"},
-    {file = "pyasn1_modules-0.2.8-py3.3.egg", hash = "sha256:65cebbaffc913f4fe9e4808735c95ea22d7a7775646ab690518c056784bc21b4"},
-    {file = "pyasn1_modules-0.2.8-py3.4.egg", hash = "sha256:15b7c67fabc7fc240d87fb9aabf999cf82311a6d6fb2c70d00d3d0604878c811"},
-    {file = "pyasn1_modules-0.2.8-py3.5.egg", hash = "sha256:426edb7a5e8879f1ec54a1864f16b882c2837bfd06eee62f2c982315ee2473ed"},
-    {file = "pyasn1_modules-0.2.8-py3.6.egg", hash = "sha256:cbac4bc38d117f2a49aeedec4407d23e8866ea4ac27ff2cf7fb3e5b570df19e0"},
-    {file = "pyasn1_modules-0.2.8-py3.7.egg", hash = "sha256:c29a5e5cc7a3f05926aff34e097e84f8589cd790ce0ed41b67aed6857b26aafd"},
 ]
 pycodestyle = [
     {file = "pycodestyle-2.8.0-py2.py3-none-any.whl", hash = "sha256:720f8b39dde8b293825e7ff02c475f3077124006db4f440dcbc9a20b76548a20"},
@@ -2452,6 +2438,13 @@ pyyaml = [
     {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"},
     {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"},
     {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"},
+    {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"},
+    {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"},
+    {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"},
+    {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"},
+    {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"},
+    {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"},
+    {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"},
     {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"},
     {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"},
     {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"},
@@ -2755,21 +2748,21 @@ types-psycopg2 = [
     {file = "types-psycopg2-2.9.9.tar.gz", hash = "sha256:4f9d4d52eeb343dc00fd5ed4f1513a8a5c18efba0a072eb82706d15cf4f20a2e"},
     {file = "types_psycopg2-2.9.9-py3-none-any.whl", hash = "sha256:cec9291d4318ad70b407310f8304b3d40f6d0358f09870448f7a65e3027c80af"},
 ]
-types-pyopenssl = [
-    {file = "types-pyOpenSSL-22.0.0.tar.gz", hash = "sha256:d86dde7f6fe2f1ac9fe0b6282e489f649f480364bdaa9d6a4696d52505f4477e"},
-    {file = "types_pyOpenSSL-22.0.0-py3-none-any.whl", hash = "sha256:da685f57b864979f36df0157895139c8244ad4aad19b551f1678206fbad0108a"},
+types-pyOpenSSL = [
+    {file = "types-pyOpenSSL-22.0.10.tar.gz", hash = "sha256:f943b834f5b97e5e808764c2f6e37be1a2e226c46792296f61558196acfcc3a1"},
+    {file = "types_pyOpenSSL-22.0.10-py3-none-any.whl", hash = "sha256:63baea211768bea580a769ac5c0d637ae8cd3150314aadc5726ca22e4c4f241a"},
 ]
-types-pyyaml = [
-    {file = "types-PyYAML-6.0.4.tar.gz", hash = "sha256:6252f62d785e730e454dfa0c9f0fb99d8dae254c5c3c686903cf878ea27c04b7"},
-    {file = "types_PyYAML-6.0.4-py3-none-any.whl", hash = "sha256:693b01c713464a6851f36ff41077f8adbc6e355eda929addfb4a97208aea9b4b"},
+types-PyYAML = [
+    {file = "types-PyYAML-6.0.12.tar.gz", hash = "sha256:f6f350418125872f3f0409d96a62a5a5ceb45231af5cc07ee0034ec48a3c82fa"},
+    {file = "types_PyYAML-6.0.12-py3-none-any.whl", hash = "sha256:29228db9f82df4f1b7febee06bbfb601677882e98a3da98132e31c6874163e15"},
 ]
 types-requests = [
-    {file = "types-requests-2.27.11.tar.gz", hash = "sha256:6a7ed24b21780af4a5b5e24c310b2cd885fb612df5fd95584d03d87e5f2a195a"},
-    {file = "types_requests-2.27.11-py3-none-any.whl", hash = "sha256:506279bad570c7b4b19ac1f22e50146538befbe0c133b2cea66a9b04a533a859"},
+    {file = "types-requests-2.28.11.tar.gz", hash = "sha256:7ee827eb8ce611b02b5117cfec5da6455365b6a575f5e3ff19f655ba603e6b4e"},
+    {file = "types_requests-2.28.11-py3-none-any.whl", hash = "sha256:af5f55e803cabcfb836dad752bd6d8a0fc8ef1cd84243061c0e27dee04ccf4fd"},
 ]
 types-setuptools = [
-    {file = "types-setuptools-57.4.9.tar.gz", hash = "sha256:536ef74744f8e1e4be4fc719887f886e74e4cf3c792b4a06984320be4df450b5"},
-    {file = "types_setuptools-57.4.9-py3-none-any.whl", hash = "sha256:948dc6863373750e2cd0b223a84f1fb608414cde5e55cf38ea657b93aeb411d2"},
+    {file = "types-setuptools-65.4.0.0.tar.gz", hash = "sha256:d9021d6a70690b34e7bd2947e7ab10167c646fbf062508cb56581be2e2a1615e"},
+    {file = "types_setuptools-65.4.0.0-py3-none-any.whl", hash = "sha256:ce178b3f7dbd6c0e67f8eee7ae29c1be280ade7e5188bdd9e620843de4060d85"},
 ]
 types-urllib3 = [
     {file = "types-urllib3-1.26.10.tar.gz", hash = "sha256:a26898f530e6c3f43f25b907f2b884486868ffd56a9faa94cbf9b3eb6e165d6a"},
diff --git a/pyproject.toml b/pyproject.toml
index 622d6a9e89..5e36baf40d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -267,7 +267,7 @@ all = [
 
 [tool.poetry.dev-dependencies]
 ## We pin black so that our tests don't start failing on new releases.
-isort = "==5.7.0"
+isort = "==5.10.1"
 black = "==22.3.0"
 flake8-comprehensions = "*"
 flake8-bugbear = "==21.3.2"
diff --git a/rust/src/push/base_rules.rs b/rust/src/push/base_rules.rs
index bb59676bde..2a09cf99ae 100644
--- a/rust/src/push/base_rules.rs
+++ b/rust/src/push/base_rules.rs
@@ -173,7 +173,7 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
         default_enabled: true,
     },
     PushRule {
-        rule_id: Cow::Borrowed("global/override/.org.matrix.msc3786.rule.room.server_acl"),
+        rule_id: Cow::Borrowed("global/override/.m.rule.room.server_acl"),
         priority_class: 5,
         conditions: Cow::Borrowed(&[
             Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
diff --git a/rust/src/push/mod.rs b/rust/src/push/mod.rs
index 30fffc31ad..208b9c0d73 100644
--- a/rust/src/push/mod.rs
+++ b/rust/src/push/mod.rs
@@ -401,7 +401,6 @@ impl PushRules {
 pub struct FilteredPushRules {
     push_rules: PushRules,
     enabled_map: BTreeMap<String, bool>,
-    msc3786_enabled: bool,
     msc3772_enabled: bool,
 }
 
@@ -411,13 +410,11 @@ impl FilteredPushRules {
     pub fn py_new(
         push_rules: PushRules,
         enabled_map: BTreeMap<String, bool>,
-        msc3786_enabled: bool,
         msc3772_enabled: bool,
     ) -> Self {
         Self {
             push_rules,
             enabled_map,
-            msc3786_enabled,
             msc3772_enabled,
         }
     }
@@ -437,12 +434,6 @@ impl FilteredPushRules {
             .iter()
             .filter(|rule| {
                 // Ignore disabled experimental push rules
-                if !self.msc3786_enabled
-                    && rule.rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl"
-                {
-                    return false;
-                }
-
                 if !self.msc3772_enabled
                     && rule.rule_id == "global/underride/.org.matrix.msc3772.thread_reply"
                 {
diff --git a/stubs/synapse/synapse_rust/push.pyi b/stubs/synapse/synapse_rust/push.pyi
index fffb8419c6..5900e61450 100644
--- a/stubs/synapse/synapse_rust/push.pyi
+++ b/stubs/synapse/synapse_rust/push.pyi
@@ -26,11 +26,7 @@ class PushRules:
 
 class FilteredPushRules:
     def __init__(
-        self,
-        push_rules: PushRules,
-        enabled_map: Dict[str, bool],
-        msc3786_enabled: bool,
-        msc3772_enabled: bool,
+        self, push_rules: PushRules, enabled_map: Dict[str, bool], msc3772_enabled: bool
     ): ...
     def rules(self) -> Collection[Tuple[PushRule, bool]]: ...
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index c031903b1a..44c5ffc6a5 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -31,6 +31,9 @@ MAX_ALIAS_LENGTH = 255
 # the maximum length for a user id is 255 characters
 MAX_USERID_LENGTH = 255
 
+# Constant value used for the pseudo-thread which is the main timeline.
+MAIN_TIMELINE: Final = "main"
+
 
 class Membership:
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index f7f46f8d80..c6e44dcf82 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -84,6 +84,7 @@ ROOM_EVENT_FILTER_SCHEMA = {
         "contains_url": {"type": "boolean"},
         "lazy_load_members": {"type": "boolean"},
         "include_redundant_members": {"type": "boolean"},
+        "org.matrix.msc3773.unread_thread_notifications": {"type": "boolean"},
         # Include or exclude events with the provided labels.
         # cf https://github.com/matrix-org/matrix-doc/pull/2326
         "org.matrix.labels": {"type": "array", "items": {"type": "string"}},
@@ -240,6 +241,9 @@ class FilterCollection:
     def include_redundant_members(self) -> bool:
         return self._room_state_filter.include_redundant_members
 
+    def unread_thread_notifications(self) -> bool:
+        return self._room_timeline_filter.unread_thread_notifications
+
     async def filter_presence(
         self, events: Iterable[UserPresenceState]
     ) -> List[UserPresenceState]:
@@ -304,6 +308,12 @@ class Filter:
         self.include_redundant_members = filter_json.get(
             "include_redundant_members", False
         )
+        if hs.config.experimental.msc3773_enabled:
+            self.unread_thread_notifications: bool = filter_json.get(
+                "org.matrix.msc3773.unread_thread_notifications", False
+            )
+        else:
+            self.unread_thread_notifications = False
 
         self.types = filter_json.get("types", None)
         self.not_types = filter_json.get("not_types", [])
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 0963fb3bb4..fbac4375b0 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -120,7 +120,11 @@ class ApplicationServiceApi(SimpleHttpClient):
 
         uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
         try:
-            response = await self.get_json(uri, {"access_token": service.hs_token})
+            response = await self.get_json(
+                uri,
+                {"access_token": service.hs_token},
+                headers={"Authorization": f"Bearer {service.hs_token}"},
+            )
             if response is not None:  # just an empty json object
                 return True
         except CodeMessageException as e:
@@ -140,7 +144,11 @@ class ApplicationServiceApi(SimpleHttpClient):
 
         uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
         try:
-            response = await self.get_json(uri, {"access_token": service.hs_token})
+            response = await self.get_json(
+                uri,
+                {"access_token": service.hs_token},
+                headers={"Authorization": f"Bearer {service.hs_token}"},
+            )
             if response is not None:  # just an empty json object
                 return True
         except CodeMessageException as e:
@@ -181,7 +189,9 @@ class ApplicationServiceApi(SimpleHttpClient):
                 **fields,
                 b"access_token": service.hs_token,
             }
-            response = await self.get_json(uri, args=args)
+            response = await self.get_json(
+                uri, args=args, headers={"Authorization": f"Bearer {service.hs_token}"}
+            )
             if not isinstance(response, list):
                 logger.warning(
                     "query_3pe to %s returned an invalid response %r", uri, response
@@ -217,7 +227,11 @@ class ApplicationServiceApi(SimpleHttpClient):
                 urllib.parse.quote(protocol),
             )
             try:
-                info = await self.get_json(uri, {"access_token": service.hs_token})
+                info = await self.get_json(
+                    uri,
+                    {"access_token": service.hs_token},
+                    headers={"Authorization": f"Bearer {service.hs_token}"},
+                )
 
                 if not _is_valid_3pe_metadata(info):
                     logger.warning(
@@ -313,6 +327,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 uri=uri,
                 json_body=body,
                 args={"access_token": service.hs_token},
+                headers={"Authorization": f"Bearer {service.hs_token}"},
             )
             if logger.isEnabledFor(logging.DEBUG):
                 logger.debug(
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 31834fb27d..6503ce6e34 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -95,13 +95,12 @@ class ExperimentalConfig(Config):
         # MSC2815 (allow room moderators to view redacted event content)
         self.msc2815_enabled: bool = experimental.get("msc2815_enabled", False)
 
-        # MSC3786 (Add a default push rule to ignore m.room.server_acl events)
-        self.msc3786_enabled: bool = experimental.get("msc3786_enabled", False)
-
         # MSC3771: Thread read receipts
         self.msc3771_enabled: bool = experimental.get("msc3771_enabled", False)
         # MSC3772: A push rule for mutual relations.
         self.msc3772_enabled: bool = experimental.get("msc3772_enabled", False)
+        # MSC3773: Thread notifications
+        self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)
 
         # MSC3715: dir param on /relations.
         self.msc3715_enabled: bool = experimental.get("msc3715_enabled", False)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 778d8869b3..da319943cc 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -2240,8 +2240,8 @@ class FederationEventHandler:
         event_pos = PersistedEventPosition(
             self._instance_name, event.internal_metadata.stream_ordering
         )
-        await self._notifier.on_new_room_event(
-            event, event_pos, max_stream_token, extra_users=extra_users
+        await self._notifier.on_new_room_events(
+            [(event, event_pos)], max_stream_token, extra_users=extra_users
         )
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 00e7645ba5..da1acea275 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1872,6 +1872,7 @@ class EventCreationHandler:
             events_and_context, backfilled=backfilled
         )
 
+        events_and_pos = []
         for event in persisted_events:
             if self._ephemeral_events_enabled:
                 # If there's an expiry timestamp on the event, schedule its expiry.
@@ -1880,25 +1881,23 @@ class EventCreationHandler:
             stream_ordering = event.internal_metadata.stream_ordering
             assert stream_ordering is not None
             pos = PersistedEventPosition(self._instance_name, stream_ordering)
-
-            async def _notify() -> None:
-                try:
-                    await self.notifier.on_new_room_event(
-                        event, pos, max_stream_token, extra_users=extra_users
-                    )
-                except Exception:
-                    logger.exception(
-                        "Error notifying about new room event %s",
-                        event.event_id,
-                    )
-
-            run_in_background(_notify)
+            events_and_pos.append((event, pos))
 
             if event.type == EventTypes.Message:
                 # We don't want to block sending messages on any presence code. This
                 # matters as sometimes presence code can take a while.
                 run_in_background(self._bump_active_time, requester.user)
 
+        async def _notify() -> None:
+            try:
+                await self.notifier.on_new_room_events(
+                    events_and_pos, max_stream_token, extra_users=extra_users
+                )
+            except Exception:
+                logger.exception("Error notifying about new room events")
+
+        run_in_background(_notify)
+
         return persisted_events[-1]
 
     async def _maybe_kick_guest_users(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4abb9b6127..0f684857ca 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,7 +40,7 @@ from synapse.handlers.relations import BundledAggregations
 from synapse.logging.context import current_context
 from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
 from synapse.push.clientformat import format_push_rules_for_user
-from synapse.storage.databases.main.event_push_actions import NotifCounts
+from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
 from synapse.types import (
@@ -128,6 +128,7 @@ class JoinedSyncResult:
     ephemeral: List[JsonDict]
     account_data: List[JsonDict]
     unread_notifications: JsonDict
+    unread_thread_notifications: JsonDict
     summary: Optional[JsonDict]
     unread_count: int
 
@@ -278,6 +279,8 @@ class SyncHandler:
 
         self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
 
+        self._msc3773_enabled = hs.config.experimental.msc3773_enabled
+
     async def wait_for_sync_for_user(
         self,
         requester: Requester,
@@ -1288,7 +1291,7 @@ class SyncHandler:
 
     async def unread_notifs_for_room_id(
         self, room_id: str, sync_config: SyncConfig
-    ) -> NotifCounts:
+    ) -> RoomNotifCounts:
         with Measure(self.clock, "unread_notifs_for_room_id"):
 
             return await self.store.get_unread_event_push_actions_by_room_for_user(
@@ -1314,6 +1317,19 @@ class SyncHandler:
         At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
         instance to signify that the sync calculation is complete.
         """
+
+        user_id = sync_config.user.to_string()
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
+
+        # Note: we get the users room list *before* we get the current token, this
+        # avoids checking back in history if rooms are joined after the token is fetched.
+        token_before_rooms = self.event_sources.get_current_token()
+        mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
+
         # NB: The now_token gets changed by some of the generate_sync_* methods,
         # this is due to some of the underlying streams not supporting the ability
         # to query up to a given point.
@@ -1321,6 +1337,57 @@ class SyncHandler:
         now_token = self.event_sources.get_current_token()
         log_kv({"now_token": now_token})
 
+        # Since we fetched the users room list before the token, there's a small window
+        # during which membership events may have been persisted, so we fetch these now
+        # and modify the joined room list for any changes between the get_rooms_for_user
+        # call and the get_current_token call.
+        membership_change_events = []
+        if since_token:
+            membership_change_events = await self.store.get_membership_changes_for_user(
+                user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+            )
+
+            mem_last_change_by_room_id: Dict[str, EventBase] = {}
+            for event in membership_change_events:
+                mem_last_change_by_room_id[event.room_id] = event
+
+            # For the latest membership event in each room found, add/remove the room ID
+            # from the joined room list accordingly. In this case we only care if the
+            # latest change is JOIN.
+
+            for room_id, event in mem_last_change_by_room_id.items():
+                assert event.internal_metadata.stream_ordering
+                if (
+                    event.internal_metadata.stream_ordering
+                    < token_before_rooms.room_key.stream
+                ):
+                    continue
+
+                logger.info(
+                    "User membership change between getting rooms and current token: %s %s %s",
+                    user_id,
+                    event.membership,
+                    room_id,
+                )
+                # User joined a room - we have to then check the room state to ensure we
+                # respect any bans if there's a race between the join and ban events.
+                if event.membership == Membership.JOIN:
+                    user_ids_in_room = await self.store.get_users_in_room(room_id)
+                    if user_id in user_ids_in_room:
+                        mutable_joined_room_ids.add(room_id)
+                # The user left the room, or left and was re-invited but not joined yet
+                else:
+                    mutable_joined_room_ids.discard(room_id)
+
+        # Now we have our list of joined room IDs, exclude as configured and freeze
+        joined_room_ids = frozenset(
+            (
+                room_id
+                for room_id in mutable_joined_room_ids
+                if room_id not in self.rooms_to_exclude
+            )
+        )
+
         logger.debug(
             "Calculating sync response for %r between %s and %s",
             sync_config.user,
@@ -1328,22 +1395,13 @@ class SyncHandler:
             now_token,
         )
 
-        user_id = sync_config.user.to_string()
-        app_service = self.store.get_app_service_by_user_id(user_id)
-        if app_service:
-            # We no longer support AS users using /sync directly.
-            # See https://github.com/matrix-org/matrix-doc/issues/1144
-            raise NotImplementedError()
-        else:
-            joined_room_ids = await self.get_rooms_for_user_at(
-                user_id, now_token.room_key
-            )
         sync_result_builder = SyncResultBuilder(
             sync_config,
             full_state,
             since_token=since_token,
             now_token=now_token,
             joined_room_ids=joined_room_ids,
+            membership_change_events=membership_change_events,
         )
 
         logger.debug("Fetching account data")
@@ -1824,19 +1882,12 @@ class SyncHandler:
 
         Does not modify the `sync_result_builder`.
         """
-        user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
-        now_token = sync_result_builder.now_token
+        membership_change_events = sync_result_builder.membership_change_events
 
         assert since_token
 
-        # Get a list of membership change events that have happened to the user
-        # requesting the sync.
-        membership_changes = await self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key
-        )
-
-        if membership_changes:
+        if membership_change_events:
             return True
 
         stream_id = since_token.room_key.stream
@@ -1875,16 +1926,10 @@ class SyncHandler:
         since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
+        membership_change_events = sync_result_builder.membership_change_events
 
         assert since_token
 
-        # TODO: we've already called this function and ran this query in
-        #       _have_rooms_changed. We could keep the results in memory to avoid a
-        #       second query, at the cost of more complicated source code.
-        membership_change_events = await self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
-        )
-
         mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
         for event in membership_change_events:
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
@@ -2353,6 +2398,7 @@ class SyncHandler:
                     ephemeral=ephemeral,
                     account_data=account_data_events,
                     unread_notifications=unread_notifications,
+                    unread_thread_notifications={},
                     summary=summary,
                     unread_count=0,
                 )
@@ -2360,10 +2406,36 @@ class SyncHandler:
                 if room_sync or always_include:
                     notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
 
-                    unread_notifications["notification_count"] = notifs.notify_count
-                    unread_notifications["highlight_count"] = notifs.highlight_count
+                    # Notifications for the main timeline.
+                    notify_count = notifs.main_timeline.notify_count
+                    highlight_count = notifs.main_timeline.highlight_count
+                    unread_count = notifs.main_timeline.unread_count
 
-                    room_sync.unread_count = notifs.unread_count
+                    # Check the sync configuration.
+                    if (
+                        self._msc3773_enabled
+                        and sync_config.filter_collection.unread_thread_notifications()
+                    ):
+                        # And add info for each thread.
+                        room_sync.unread_thread_notifications = {
+                            thread_id: {
+                                "notification_count": thread_notifs.notify_count,
+                                "highlight_count": thread_notifs.highlight_count,
+                            }
+                            for thread_id, thread_notifs in notifs.threads.items()
+                            if thread_id is not None
+                        }
+
+                    else:
+                        # Combine the unread counts for all threads and main timeline.
+                        for thread_notifs in notifs.threads.values():
+                            notify_count += thread_notifs.notify_count
+                            highlight_count += thread_notifs.highlight_count
+                            unread_count += thread_notifs.unread_count
+
+                    unread_notifications["notification_count"] = notify_count
+                    unread_notifications["highlight_count"] = highlight_count
+                    room_sync.unread_count = unread_count
 
                     sync_result_builder.joined.append(room_sync)
 
@@ -2385,60 +2457,6 @@ class SyncHandler:
             else:
                 raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
-    async def get_rooms_for_user_at(
-        self,
-        user_id: str,
-        room_key: RoomStreamToken,
-    ) -> FrozenSet[str]:
-        """Get set of joined rooms for a user at the given stream ordering.
-
-        The stream ordering *must* be recent, otherwise this may throw an
-        exception if older than a month. (This function is called with the
-        current token, which should be perfectly fine).
-
-        Args:
-            user_id
-            stream_ordering
-
-        ReturnValue:
-            Set of room_ids the user is in at given stream_ordering.
-        """
-        joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
-
-        joined_room_ids = set()
-
-        # We need to check that the stream ordering of the join for each room
-        # is before the stream_ordering asked for. This might not be the case
-        # if the user joins a room between us getting the current token and
-        # calling `get_rooms_for_user_with_stream_ordering`.
-        # If the membership's stream ordering is after the given stream
-        # ordering, we need to go and work out if the user was in the room
-        # before.
-        # We also need to check whether the room should be excluded from sync
-        # responses as per the homeserver config.
-        for joined_room in joined_rooms:
-            if joined_room.room_id in self.rooms_to_exclude:
-                continue
-
-            if not joined_room.event_pos.persisted_after(room_key):
-                joined_room_ids.add(joined_room.room_id)
-                continue
-
-            logger.info("User joined room after current token: %s", joined_room.room_id)
-
-            extrems = (
-                await self.store.get_forward_extremities_for_room_at_stream_ordering(
-                    joined_room.room_id, joined_room.event_pos.stream
-                )
-            )
-            user_ids_in_room = await self.state.get_current_user_ids_in_room(
-                joined_room.room_id, extrems
-            )
-            if user_id in user_ids_in_room:
-                joined_room_ids.add(joined_room.room_id)
-
-        return frozenset(joined_room_ids)
-
 
 def _action_has_highlight(actions: List[JsonDict]) -> bool:
     for action in actions:
@@ -2535,6 +2553,7 @@ class SyncResultBuilder:
     since_token: Optional[StreamToken]
     now_token: StreamToken
     joined_room_ids: FrozenSet[str]
+    membership_change_events: List[EventBase]
 
     presence: List[UserPresenceState] = attr.Factory(list)
     account_data: List[JsonDict] = attr.Factory(list)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c42bb8266a..26b97cf766 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -294,35 +294,31 @@ class Notifier:
         """
         self._new_join_in_room_callbacks.append(cb)
 
-    async def on_new_room_event(
+    async def on_new_room_events(
         self,
-        event: EventBase,
-        event_pos: PersistedEventPosition,
+        events_and_pos: List[Tuple[EventBase, PersistedEventPosition]],
         max_room_stream_token: RoomStreamToken,
         extra_users: Optional[Collection[UserID]] = None,
     ) -> None:
-        """Unwraps event and calls `on_new_room_event_args`."""
-        await self.on_new_room_event_args(
-            event_pos=event_pos,
-            room_id=event.room_id,
-            event_id=event.event_id,
-            event_type=event.type,
-            state_key=event.get("state_key"),
-            membership=event.content.get("membership"),
-            max_room_stream_token=max_room_stream_token,
-            extra_users=extra_users or [],
-        )
+        """Creates a _PendingRoomEventEntry for each of the listed events and calls
+        notify_new_room_events with the results."""
+        event_entries = []
+        for event, pos in events_and_pos:
+            entry = self.create_pending_room_event_entry(
+                pos,
+                extra_users,
+                event.room_id,
+                event.type,
+                event.get("state_key"),
+                event.content.get("membership"),
+            )
+            event_entries.append((entry, event.event_id))
+        await self.notify_new_room_events(event_entries, max_room_stream_token)
 
-    async def on_new_room_event_args(
+    async def notify_new_room_events(
         self,
-        room_id: str,
-        event_id: str,
-        event_type: str,
-        state_key: Optional[str],
-        membership: Optional[str],
-        event_pos: PersistedEventPosition,
+        event_entries: List[Tuple[_PendingRoomEventEntry, str]],
         max_room_stream_token: RoomStreamToken,
-        extra_users: Optional[Collection[UserID]] = None,
     ) -> None:
         """Used by handlers to inform the notifier something has happened
         in the room, room event wise.
@@ -338,22 +334,33 @@ class Notifier:
         until all previous events have been persisted before notifying
         the client streams.
         """
-        self.pending_new_room_events.append(
-            _PendingRoomEventEntry(
-                event_pos=event_pos,
-                extra_users=extra_users or [],
-                room_id=room_id,
-                type=event_type,
-                state_key=state_key,
-                membership=membership,
-            )
-        )
-        self._notify_pending_new_room_events(max_room_stream_token)
+        for event_entry, event_id in event_entries:
+            self.pending_new_room_events.append(event_entry)
+            await self._third_party_rules.on_new_event(event_id)
 
-        await self._third_party_rules.on_new_event(event_id)
+        self._notify_pending_new_room_events(max_room_stream_token)
 
         self.notify_replication()
 
+    def create_pending_room_event_entry(
+        self,
+        event_pos: PersistedEventPosition,
+        extra_users: Optional[Collection[UserID]],
+        room_id: str,
+        event_type: str,
+        state_key: Optional[str],
+        membership: Optional[str],
+    ) -> _PendingRoomEventEntry:
+        """Creates and returns a _PendingRoomEventEntry"""
+        return _PendingRoomEventEntry(
+            event_pos=event_pos,
+            extra_users=extra_users or [],
+            room_id=room_id,
+            type=event_type,
+            state_key=state_key,
+            membership=membership,
+        )
+
     def _notify_pending_new_room_events(
         self, max_room_stream_token: RoomStreamToken
     ) -> None:
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 998354648f..eced182fd5 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -31,7 +31,7 @@ from typing import (
 
 from prometheus_client import Counter
 
-from synapse.api.constants import EventTypes, Membership, RelationTypes
+from synapse.api.constants import MAIN_TIMELINE, EventTypes, Membership, RelationTypes
 from synapse.event_auth import auth_types_for_event, get_user_power_level
 from synapse.events import EventBase, relation_from_event
 from synapse.events.snapshot import EventContext
@@ -280,14 +280,19 @@ class BulkPushRuleEvaluator:
         # If the event does not have a relation, then cannot have any mutual
         # relations or thread ID.
         relations = {}
-        thread_id = "main"
+        thread_id = MAIN_TIMELINE
         if relation:
             relations = await self._get_mutual_relations(
                 relation.parent_id,
                 itertools.chain(*(r.rules() for r in rules_by_user.values())),
             )
+            # Recursively attempt to find the thread this event relates to.
             if relation.rel_type == RelationTypes.THREAD:
                 thread_id = relation.parent_id
+            else:
+                # Since the event has not yet been persisted we check whether
+                # the parent is part of a thread.
+                thread_id = await self.store.get_thread_id(relation.parent_id) or "main"
 
         # It's possible that old room versions have non-integer power levels (floats or
         # strings). Workaround this by explicitly converting to int.
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 658bf373b7..edeba27a45 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -39,7 +39,12 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
     await concurrently_execute(get_room_unread_count, joins, 10)
 
     for notifs in room_notifs:
-        if notifs.notify_count == 0:
+        # Combine the counts from all the threads.
+        notify_count = notifs.main_timeline.notify_count + sum(
+            n.notify_count for n in notifs.threads.values()
+        )
+
+        if notify_count == 0:
             continue
 
         if group_by_room:
@@ -47,7 +52,7 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
             badge += 1
         else:
             # increment the badge count by the number of unread messages in the room
-            badge += notifs.notify_count
+            badge += notify_count
     return badge
 
 
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b2522f98ca..18252a2958 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -210,15 +210,16 @@ class ReplicationDataHandler:
 
                 max_token = self.store.get_room_max_token()
                 event_pos = PersistedEventPosition(instance_name, token)
-                await self.notifier.on_new_room_event_args(
-                    event_pos=event_pos,
-                    max_room_stream_token=max_token,
-                    extra_users=extra_users,
-                    room_id=row.data.room_id,
-                    event_id=row.data.event_id,
-                    event_type=row.data.type,
-                    state_key=row.data.state_key,
-                    membership=row.data.membership,
+                event_entry = self.notifier.create_pending_room_event_entry(
+                    event_pos,
+                    extra_users,
+                    row.data.room_id,
+                    row.data.type,
+                    row.data.state_key,
+                    row.data.membership,
+                )
+                await self.notifier.notify_new_room_events(
+                    [(event_entry, row.data.event_id)], max_token
                 )
 
                 # If this event is a join, make a note of it so we have an accurate
diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py
index f3ff156abe..287dfdd69e 100644
--- a/synapse/rest/client/receipts.py
+++ b/synapse/rest/client/receipts.py
@@ -16,7 +16,7 @@ import logging
 from typing import TYPE_CHECKING, Tuple
 
 from synapse.api.constants import ReceiptTypes
-from synapse.api.errors import SynapseError
+from synapse.api.errors import Codes, SynapseError
 from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseRequest
@@ -43,6 +43,7 @@ class ReceiptRestServlet(RestServlet):
         self.receipts_handler = hs.get_receipts_handler()
         self.read_marker_handler = hs.get_read_marker_handler()
         self.presence_handler = hs.get_presence_handler()
+        self._main_store = hs.get_datastores().main
 
         self._known_receipt_types = {
             ReceiptTypes.READ,
@@ -71,7 +72,24 @@ class ReceiptRestServlet(RestServlet):
                 thread_id = body.get("thread_id")
                 if not thread_id or not isinstance(thread_id, str):
                     raise SynapseError(
-                        400, "thread_id field must be a non-empty string"
+                        400,
+                        "thread_id field must be a non-empty string",
+                        Codes.INVALID_PARAM,
+                    )
+
+                if receipt_type == ReceiptTypes.FULLY_READ:
+                    raise SynapseError(
+                        400,
+                        f"thread_id is not compatible with {ReceiptTypes.FULLY_READ} receipts.",
+                        Codes.INVALID_PARAM,
+                    )
+
+                # Ensure the event ID roughly correlates to the thread ID.
+                if thread_id != await self._main_store.get_thread_id(event_id):
+                    raise SynapseError(
+                        400,
+                        f"event_id {event_id} is not related to thread {thread_id}",
+                        Codes.INVALID_PARAM,
                     )
 
         await self.presence_handler.bump_presence_active_time(requester.user)
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index c2989765ce..f1c23d68e5 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -509,6 +509,10 @@ class SyncRestServlet(RestServlet):
             ephemeral_events = room.ephemeral
             result["ephemeral"] = {"events": ephemeral_events}
             result["unread_notifications"] = room.unread_notifications
+            if room.unread_thread_notifications:
+                result[
+                    "org.matrix.msc3773.unread_thread_notifications"
+                ] = room.unread_thread_notifications
             result["summary"] = room.summary
             if self._msc2654_enabled:
                 result["org.matrix.msc2654.unread_count"] = room.unread_count
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index c95b0d6f19..18ed313b5c 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -75,6 +75,7 @@ class VersionsRestServlet(RestServlet):
                     "r0.6.1",
                     "v1.1",
                     "v1.2",
+                    "v1.3",
                 ],
                 # as per MSC1497:
                 "unstable_features": {
@@ -103,8 +104,9 @@ class VersionsRestServlet(RestServlet):
                     "org.matrix.msc3030": self.config.experimental.msc3030_enabled,
                     # Adds support for thread relations, per MSC3440.
                     "org.matrix.msc3440.stable": True,  # TODO: remove when "v1.3" is added above
-                    # Support for thread read receipts.
+                    # Support for thread read receipts & notification counts.
                     "org.matrix.msc3771": self.config.experimental.msc3771_enabled,
+                    "org.matrix.msc3773": self.config.experimental.msc3773_enabled,
                     # Allows moderators to fetch redacted event content as described in MSC2815
                     "fi.mau.msc2815": self.config.experimental.msc2815_enabled,
                     # Adds support for login token requests as per MSC3882
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b4469eb964..7bb21f8f81 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -94,7 +94,7 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
     "event_search": "event_search_event_id_idx",
     "local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx",
     "remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx",
-    "event_push_summary": "event_push_summary_unique_index",
+    "event_push_summary": "event_push_summary_unique_index2",
     "receipts_linearized": "receipts_linearized_unique_index",
     "receipts_graph": "receipts_graph_unique_index",
 }
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index c9724d7345..332e13d1c9 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -88,7 +88,7 @@ from typing import (
 
 import attr
 
-from synapse.api.constants import ReceiptTypes
+from synapse.api.constants import MAIN_TIMELINE, ReceiptTypes
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
@@ -119,6 +119,32 @@ DEFAULT_HIGHLIGHT_ACTION: List[Union[dict, str]] = [
 ]
 
 
+@attr.s(slots=True, auto_attribs=True)
+class _RoomReceipt:
+    """
+    HttpPushAction instances include the information used to generate HTTP
+    requests to a push gateway.
+    """
+
+    unthreaded_stream_ordering: int = 0
+    # threaded_stream_ordering includes the main pseudo-thread.
+    threaded_stream_ordering: Dict[str, int] = attr.Factory(dict)
+
+    def is_unread(self, thread_id: str, stream_ordering: int) -> bool:
+        """Returns True if the stream ordering is unread according to the receipt information."""
+
+        # Only include push actions with a stream ordering after both the unthreaded
+        # and threaded receipt. Properly handles a user without any receipts present.
+        return (
+            self.unthreaded_stream_ordering < stream_ordering
+            and self.threaded_stream_ordering.get(thread_id, 0) < stream_ordering
+        )
+
+
+# A _RoomReceipt with no receipts in it.
+MISSING_ROOM_RECEIPT = _RoomReceipt()
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class HttpPushAction:
     """
@@ -157,7 +183,7 @@ class UserPushAction(EmailPushAction):
 @attr.s(slots=True, auto_attribs=True)
 class NotifCounts:
     """
-    The per-user, per-room count of notifications. Used by sync and push.
+    The per-user, per-room, per-thread count of notifications. Used by sync and push.
     """
 
     notify_count: int = 0
@@ -165,6 +191,21 @@ class NotifCounts:
     highlight_count: int = 0
 
 
+@attr.s(slots=True, auto_attribs=True)
+class RoomNotifCounts:
+    """
+    The per-user, per-room count of notifications. Used by sync and push.
+    """
+
+    main_timeline: NotifCounts
+    # Map of thread ID to the notification counts.
+    threads: Dict[str, NotifCounts]
+
+    def __len__(self) -> int:
+        # To properly account for the amount of space in any caches.
+        return len(self.threads) + 1
+
+
 def _serialize_action(
     actions: Collection[Union[Mapping, str]], is_highlight: bool
 ) -> str:
@@ -338,12 +379,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         return result
 
-    @cached(tree=True, max_entries=5000)
+    @cached(tree=True, max_entries=5000, iterable=True)
     async def get_unread_event_push_actions_by_room_for_user(
         self,
         room_id: str,
         user_id: str,
-    ) -> NotifCounts:
+    ) -> RoomNotifCounts:
         """Get the notification count, the highlight count and the unread message count
         for a given user in a given room after their latest read receipt.
 
@@ -356,8 +397,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             user_id: The user to retrieve the counts for.
 
         Returns
-            A NotifCounts object containing the notification count, the highlight count
-            and the unread message count.
+            A RoomNotifCounts object containing the notification count, the
+            highlight count and the unread message count for both the main timeline
+            and threads.
         """
         return await self.db_pool.runInteraction(
             "get_unread_event_push_actions_by_room",
@@ -371,7 +413,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         txn: LoggingTransaction,
         room_id: str,
         user_id: str,
-    ) -> NotifCounts:
+    ) -> RoomNotifCounts:
         # Get the stream ordering of the user's latest receipt in the room.
         result = self.get_last_unthreaded_receipt_for_user_txn(
             txn,
@@ -405,8 +447,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         txn: LoggingTransaction,
         room_id: str,
         user_id: str,
-        receipt_stream_ordering: int,
-    ) -> NotifCounts:
+        unthreaded_receipt_stream_ordering: int,
+    ) -> RoomNotifCounts:
         """Get the number of unread messages for a user/room that have happened
         since the given stream ordering.
 
@@ -414,78 +456,204 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             txn: The database transaction.
             room_id: The room ID to get unread counts for.
             user_id: The user ID to get unread counts for.
-            receipt_stream_ordering: The stream ordering of the user's latest
-                receipt in the room. If there are no receipts, the stream ordering
-                of the user's join event.
+            unthreaded_receipt_stream_ordering: The stream ordering of the user's latest
+                unthreaded receipt in the room. If there are no unthreaded receipts,
+                the stream ordering of the user's join event.
 
-        Returns
-            A NotifCounts object containing the notification count, the highlight count
-            and the unread message count.
+        Returns:
+            A RoomNotifCounts object containing the notification count, the
+            highlight count and the unread message count for both the main timeline
+            and threads.
         """
 
-        counts = NotifCounts()
+        main_counts = NotifCounts()
+        thread_counts: Dict[str, NotifCounts] = {}
+
+        def _get_thread(thread_id: str) -> NotifCounts:
+            if thread_id == MAIN_TIMELINE:
+                return main_counts
+            return thread_counts.setdefault(thread_id, NotifCounts())
+
+        receipt_types_clause, receipts_args = make_in_list_sql_clause(
+            self.database_engine,
+            "receipt_type",
+            (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+        )
 
         # First we pull the counts from the summary table.
         #
-        # We check that `last_receipt_stream_ordering` matches the stream
-        # ordering given. If it doesn't match then a new read receipt has arrived and
-        # we haven't yet updated the counts in `event_push_summary` to reflect
-        # that; in that case we simply ignore `event_push_summary` counts
-        # and do a manual count of all of the rows in the `event_push_actions` table
-        # for this user/room.
+        # We check that `last_receipt_stream_ordering` matches the stream ordering of the
+        # latest receipt for the thread (which may be either the unthreaded read receipt
+        # or the threaded read receipt).
         #
-        # If `last_receipt_stream_ordering` is null then that means it's up to
-        # date (as the row was written by an older version of Synapse that
+        # If it doesn't match then a new read receipt has arrived and we haven't yet
+        # updated the counts in `event_push_summary` to reflect that; in that case we
+        # simply ignore `event_push_summary` counts.
+        #
+        # We then do a manual count of all the rows in the `event_push_actions` table
+        # for any user/room/thread which did not have a valid summary found.
+        #
+        # If `last_receipt_stream_ordering` is null then that means it's up-to-date
+        # (as the row was written by an older version of Synapse that
         # updated `event_push_summary` synchronously when persisting a new read
         # receipt).
         txn.execute(
-            """
-                SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
+            f"""
+                SELECT notif_count, COALESCE(unread_count, 0), thread_id
                 FROM event_push_summary
+                LEFT JOIN (
+                    SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+                    FROM receipts_linearized
+                    LEFT JOIN events USING (room_id, event_id)
+                    WHERE
+                        user_id = ?
+                        AND room_id = ?
+                        AND stream_ordering > ?
+                        AND {receipt_types_clause}
+                    GROUP BY thread_id
+                ) AS receipts USING (thread_id)
                 WHERE room_id = ? AND user_id = ?
                 AND (
-                    (last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
-                    OR last_receipt_stream_ordering = ?
-                )
+                    (last_receipt_stream_ordering IS NULL AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?))
+                    OR last_receipt_stream_ordering = COALESCE(threaded_receipt_stream_ordering, ?)
+                ) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0)
             """,
-            (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
+            (
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *receipts_args,
+                room_id,
+                user_id,
+                unthreaded_receipt_stream_ordering,
+                unthreaded_receipt_stream_ordering,
+            ),
         )
-        row = txn.fetchone()
-
-        summary_stream_ordering = 0
-        if row:
-            summary_stream_ordering = row[0]
-            counts.notify_count += row[1]
-            counts.unread_count += row[2]
+        summarised_threads = set()
+        for notif_count, unread_count, thread_id in txn:
+            summarised_threads.add(thread_id)
+            counts = _get_thread(thread_id)
+            counts.notify_count += notif_count
+            counts.unread_count += unread_count
 
         # Next we need to count highlights, which aren't summarised
-        sql = """
-            SELECT COUNT(*) FROM event_push_actions
+        sql = f"""
+            SELECT COUNT(*), thread_id FROM event_push_actions
+            LEFT JOIN (
+                SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+                FROM receipts_linearized
+                LEFT JOIN events USING (room_id, event_id)
+                WHERE
+                    user_id = ?
+                    AND room_id = ?
+                    AND stream_ordering > ?
+                    AND {receipt_types_clause}
+                GROUP BY thread_id
+            ) AS receipts USING (thread_id)
             WHERE user_id = ?
                 AND room_id = ?
-                AND stream_ordering > ?
+                AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
                 AND highlight = 1
+            GROUP BY thread_id
         """
-        txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
-        row = txn.fetchone()
-        if row:
-            counts.highlight_count += row[0]
+        txn.execute(
+            sql,
+            (
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *receipts_args,
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+            ),
+        )
+        for highlight_count, thread_id in txn:
+            _get_thread(thread_id).highlight_count += highlight_count
+
+        # For threads which were summarised we need to count actions since the last
+        # rotation.
+        thread_id_clause, thread_id_args = make_in_list_sql_clause(
+            self.database_engine, "thread_id", summarised_threads
+        )
+
+        # The (inclusive) event stream ordering that was previously summarised.
+        rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
+            txn,
+            table="event_push_summary_stream_ordering",
+            keyvalues={},
+            retcol="stream_ordering",
+        )
+
+        unread_counts = self._get_notif_unread_count_for_user_room(
+            txn, room_id, user_id, rotated_upto_stream_ordering
+        )
+        for notif_count, unread_count, thread_id in unread_counts:
+            if thread_id not in summarised_threads:
+                continue
+
+            if thread_id == MAIN_TIMELINE:
+                counts.notify_count += notif_count
+                counts.unread_count += unread_count
+            elif thread_id in thread_counts:
+                thread_counts[thread_id].notify_count += notif_count
+                thread_counts[thread_id].unread_count += unread_count
+            else:
+                # Previous thread summaries of 0 are discarded above.
+                #
+                # TODO If empty summaries are deleted this can be removed.
+                thread_counts[thread_id] = NotifCounts(
+                    notify_count=notif_count,
+                    unread_count=unread_count,
+                    highlight_count=0,
+                )
 
         # Finally we need to count push actions that aren't included in the
         # summary returned above. This might be due to recent events that haven't
         # been summarised yet or the summary is out of date due to a recent read
         # receipt.
-        start_unread_stream_ordering = max(
-            receipt_stream_ordering, summary_stream_ordering
-        )
-        notify_count, unread_count = self._get_notif_unread_count_for_user_room(
-            txn, room_id, user_id, start_unread_stream_ordering
+        sql = f"""
+            SELECT
+                COUNT(CASE WHEN notif = 1 THEN 1 END),
+                COUNT(CASE WHEN unread = 1 THEN 1 END),
+                thread_id
+            FROM event_push_actions
+            LEFT JOIN (
+                SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+                FROM receipts_linearized
+                LEFT JOIN events USING (room_id, event_id)
+                WHERE
+                    user_id = ?
+                    AND room_id = ?
+                    AND stream_ordering > ?
+                    AND {receipt_types_clause}
+                GROUP BY thread_id
+            ) AS receipts USING (thread_id)
+            WHERE user_id = ?
+                AND room_id = ?
+                AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
+                AND NOT {thread_id_clause}
+            GROUP BY thread_id
+        """
+        txn.execute(
+            sql,
+            (
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *receipts_args,
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *thread_id_args,
+            ),
         )
+        for notif_count, unread_count, thread_id in txn:
+            counts = _get_thread(thread_id)
+            counts.notify_count += notif_count
+            counts.unread_count += unread_count
 
-        counts.notify_count += notify_count
-        counts.unread_count += unread_count
-
-        return counts
+        return RoomNotifCounts(main_counts, thread_counts)
 
     def _get_notif_unread_count_for_user_room(
         self,
@@ -494,7 +662,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         user_id: str,
         stream_ordering: int,
         max_stream_ordering: Optional[int] = None,
-    ) -> Tuple[int, int]:
+        thread_id: Optional[str] = None,
+    ) -> List[Tuple[int, int, str]]:
         """Returns the notify and unread counts from `event_push_actions` for
         the given user/room in the given range.
 
@@ -508,45 +677,55 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             stream_ordering: The (exclusive) minimum stream ordering to consider.
             max_stream_ordering: The (inclusive) maximum stream ordering to consider.
                 If this is not given, then no maximum is applied.
+            thread_id: The thread ID to fetch unread counts for. If this is not provided
+                then the results for *all* threads is returned.
+
+                Note that if this is provided the resulting list will only have 0 or
+                1 tuples in it.
 
         Return:
-            A tuple of the notif count and unread count in the given range.
+            A tuple of the notif count and unread count in the given range for
+            each thread.
         """
 
         # If there have been no events in the room since the stream ordering,
         # there can't be any push actions either.
         if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
-            return 0, 0
+            return []
 
-        clause = ""
+        stream_ordering_clause = ""
         args = [user_id, room_id, stream_ordering]
         if max_stream_ordering is not None:
-            clause = "AND ea.stream_ordering <= ?"
+            stream_ordering_clause = "AND ea.stream_ordering <= ?"
             args.append(max_stream_ordering)
 
             # If the max stream ordering is less than the min stream ordering,
             # then obviously there are zero push actions in that range.
             if max_stream_ordering <= stream_ordering:
-                return 0, 0
+                return []
+
+        # Either limit the results to a specific thread or fetch all threads.
+        thread_id_clause = ""
+        if thread_id is not None:
+            thread_id_clause = "AND thread_id = ?"
+            args.append(thread_id)
 
         sql = f"""
             SELECT
                COUNT(CASE WHEN notif = 1 THEN 1 END),
-               COUNT(CASE WHEN unread = 1 THEN 1 END)
-             FROM event_push_actions ea
-             WHERE user_id = ?
+               COUNT(CASE WHEN unread = 1 THEN 1 END),
+               thread_id
+            FROM event_push_actions ea
+            WHERE user_id = ?
                AND room_id = ?
                AND ea.stream_ordering > ?
-               {clause}
+               {stream_ordering_clause}
+               {thread_id_clause}
+            GROUP BY thread_id
         """
 
         txn.execute(sql, args)
-        row = txn.fetchone()
-
-        if row:
-            return cast(Tuple[int, int], row)
-
-        return 0, 0
+        return cast(List[Tuple[int, int, str]], txn.fetchall())
 
     async def get_push_action_users_in_range(
         self, min_stream_ordering: int, max_stream_ordering: int
@@ -563,7 +742,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
     def _get_receipts_by_room_txn(
         self, txn: LoggingTransaction, user_id: str
-    ) -> Dict[str, int]:
+    ) -> Dict[str, _RoomReceipt]:
         """
         Generate a map of room ID to the latest stream ordering that has been
         read by the given user.
@@ -573,7 +752,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             user_id: The user to fetch receipts for.
 
         Returns:
-            A map of room ID to stream ordering for all rooms the user has a receipt in.
+            A map including all rooms the user is in with a receipt. It maps
+            room IDs to _RoomReceipt instances
         """
         receipt_types_clause, args = make_in_list_sql_clause(
             self.database_engine,
@@ -582,20 +762,26 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         )
 
         sql = f"""
-            SELECT room_id, MAX(stream_ordering)
+            SELECT room_id, thread_id, MAX(stream_ordering)
             FROM receipts_linearized
             INNER JOIN events USING (room_id, event_id)
             WHERE {receipt_types_clause}
             AND user_id = ?
-            GROUP BY room_id
+            GROUP BY room_id, thread_id
         """
 
         args.extend((user_id,))
         txn.execute(sql, args)
-        return {
-            room_id: latest_stream_ordering
-            for room_id, latest_stream_ordering in txn.fetchall()
-        }
+
+        result: Dict[str, _RoomReceipt] = {}
+        for room_id, thread_id, stream_ordering in txn:
+            room_receipt = result.setdefault(room_id, _RoomReceipt())
+            if thread_id is None:
+                room_receipt.unthreaded_stream_ordering = stream_ordering
+            else:
+                room_receipt.threaded_stream_ordering[thread_id] = stream_ordering
+
+        return result
 
     async def get_unread_push_actions_for_user_in_range_for_http(
         self,
@@ -628,9 +814,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         def get_push_actions_txn(
             txn: LoggingTransaction,
-        ) -> List[Tuple[str, str, int, str, bool]]:
+        ) -> List[Tuple[str, str, str, int, str, bool]]:
             sql = """
-                SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
+                SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
+                    ep.actions, ep.highlight
                 FROM event_push_actions AS ep
                 WHERE
                     ep.user_id = ?
@@ -640,7 +827,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 ORDER BY ep.stream_ordering ASC LIMIT ?
             """
             txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
-            return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
+            return cast(List[Tuple[str, str, str, int, str, bool]], txn.fetchall())
 
         push_actions = await self.db_pool.runInteraction(
             "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn
@@ -653,10 +840,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 stream_ordering=stream_ordering,
                 actions=_deserialize_action(actions, highlight),
             )
-            for event_id, room_id, stream_ordering, actions, highlight in push_actions
-            # Only include push actions with a stream ordering after any receipt, or without any
-            # receipt present (invited to but never read rooms).
-            if stream_ordering > receipts_by_room.get(room_id, 0)
+            for event_id, room_id, thread_id, stream_ordering, actions, highlight in push_actions
+            if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread(
+                thread_id, stream_ordering
+            )
         ]
 
         # Now sort it so it's ordered correctly, since currently it will
@@ -700,10 +887,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         def get_push_actions_txn(
             txn: LoggingTransaction,
-        ) -> List[Tuple[str, str, int, str, bool, int]]:
+        ) -> List[Tuple[str, str, str, int, str, bool, int]]:
             sql = """
-                SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
-                    ep.highlight, e.received_ts
+                SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
+                    ep.actions, ep.highlight, e.received_ts
                 FROM event_push_actions AS ep
                 INNER JOIN events AS e USING (room_id, event_id)
                 WHERE
@@ -714,7 +901,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 ORDER BY ep.stream_ordering DESC LIMIT ?
             """
             txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
-            return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
+            return cast(List[Tuple[str, str, str, int, str, bool, int]], txn.fetchall())
 
         push_actions = await self.db_pool.runInteraction(
             "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
@@ -729,10 +916,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 actions=_deserialize_action(actions, highlight),
                 received_ts=received_ts,
             )
-            for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions
-            # Only include push actions with a stream ordering after any receipt, or without any
-            # receipt present (invited to but never read rooms).
-            if stream_ordering > receipts_by_room.get(room_id, 0)
+            for event_id, room_id, thread_id, stream_ordering, actions, highlight, received_ts in push_actions
+            if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread(
+                thread_id, stream_ordering
+            )
         ]
 
         # Now sort it so it's ordered correctly, since currently it will
@@ -1056,7 +1243,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         )
 
         sql = """
-            SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
+            SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
             FROM receipts_linearized AS r
             INNER JOIN events AS e USING (event_id)
             WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
@@ -1077,53 +1264,86 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 limit,
             ),
         )
-        rows = cast(List[Tuple[int, str, str, int]], txn.fetchall())
+        rows = cast(List[Tuple[int, str, str, Optional[str], int]], txn.fetchall())
 
         # For each new read receipt we delete push actions from before it and
         # recalculate the summary.
-        for _, room_id, user_id, stream_ordering in rows:
+        #
+        # Care must be taken of whether it is a threaded or unthreaded receipt.
+        for _, room_id, user_id, thread_id, stream_ordering in rows:
             # Only handle our own read receipts.
             if not self.hs.is_mine_id(user_id):
                 continue
 
+            thread_clause = ""
+            thread_args: Tuple = ()
+            if thread_id is not None:
+                thread_clause = "AND thread_id = ?"
+                thread_args = (thread_id,)
+
+            # For each new read receipt we delete push actions from before it and
+            # recalculate the summary.
             txn.execute(
-                """
+                f"""
                 DELETE FROM event_push_actions
                 WHERE room_id = ?
                     AND user_id = ?
                     AND stream_ordering <= ?
                     AND highlight = 0
+                    {thread_clause}
                 """,
-                (room_id, user_id, stream_ordering),
+                (room_id, user_id, stream_ordering, *thread_args),
             )
 
             # Fetch the notification counts between the stream ordering of the
             # latest receipt and what was previously summarised.
-            notif_count, unread_count = self._get_notif_unread_count_for_user_room(
-                txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
-            )
-
-            # First ensure that the existing rows have an updated thread_id field.
-            self.db_pool.simple_update_txn(
+            unread_counts = self._get_notif_unread_count_for_user_room(
                 txn,
-                table="event_push_summary",
-                keyvalues={"room_id": room_id, "user_id": user_id, "thread_id": None},
-                updatevalues={"thread_id": "main"},
+                room_id,
+                user_id,
+                stream_ordering,
+                old_rotate_stream_ordering,
+                thread_id,
             )
 
-            # Replace the previous summary with the new counts.
-            #
-            # TODO(threads): Upsert per-thread instead of setting them all to main.
-            self.db_pool.simple_upsert_txn(
+            # For an unthreaded receipt, mark the summary for all threads in the room
+            # as cleared.
+            if thread_id is None:
+                self.db_pool.simple_update_txn(
+                    txn,
+                    table="event_push_summary",
+                    keyvalues={"user_id": user_id, "room_id": room_id},
+                    updatevalues={
+                        "notif_count": 0,
+                        "unread_count": 0,
+                        "stream_ordering": old_rotate_stream_ordering,
+                        "last_receipt_stream_ordering": stream_ordering,
+                    },
+                )
+
+            # For a threaded receipt, we *always* want to update that receipt,
+            # event if there are no new notifications in that thread. This ensures
+            # the stream_ordering & last_receipt_stream_ordering are updated.
+            elif not unread_counts:
+                unread_counts = [(0, 0, thread_id)]
+
+            # Then any updated threads get their notification count and unread
+            # count updated.
+            self.db_pool.simple_update_many_txn(
                 txn,
                 table="event_push_summary",
-                keyvalues={"room_id": room_id, "user_id": user_id, "thread_id": "main"},
-                values={
-                    "notif_count": notif_count,
-                    "unread_count": unread_count,
-                    "stream_ordering": old_rotate_stream_ordering,
-                    "last_receipt_stream_ordering": stream_ordering,
-                },
+                key_names=("room_id", "user_id", "thread_id"),
+                key_values=[(room_id, user_id, row[2]) for row in unread_counts],
+                value_names=(
+                    "notif_count",
+                    "unread_count",
+                    "stream_ordering",
+                    "last_receipt_stream_ordering",
+                ),
+                value_values=[
+                    (row[0], row[1], old_rotate_stream_ordering, stream_ordering)
+                    for row in unread_counts
+                ],
             )
 
         # We always update `event_push_summary_last_receipt_stream_id` to
@@ -1211,23 +1431,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         # Calculate the new counts that should be upserted into event_push_summary
         sql = """
-            SELECT user_id, room_id,
+            SELECT user_id, room_id, thread_id,
                 coalesce(old.%s, 0) + upd.cnt,
                 upd.stream_ordering
             FROM (
-                SELECT user_id, room_id, count(*) as cnt,
+                SELECT user_id, room_id, thread_id, count(*) as cnt,
                     max(ea.stream_ordering) as stream_ordering
                 FROM event_push_actions AS ea
-                LEFT JOIN event_push_summary AS old USING (user_id, room_id)
+                LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id)
                 WHERE ? < ea.stream_ordering AND ea.stream_ordering <= ?
                     AND (
                         old.last_receipt_stream_ordering IS NULL
                         OR old.last_receipt_stream_ordering < ea.stream_ordering
                     )
                     AND %s = 1
-                GROUP BY user_id, room_id
+                GROUP BY user_id, room_id, thread_id
             ) AS upd
-            LEFT JOIN event_push_summary AS old USING (user_id, room_id)
+            LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id)
         """
 
         # First get the count of unread messages.
@@ -1241,11 +1461,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         # object because we might not have the same amount of rows in each of them. To do
         # this, we use a dict indexed on the user ID and room ID to make it easier to
         # populate.
-        summaries: Dict[Tuple[str, str], _EventPushSummary] = {}
+        summaries: Dict[Tuple[str, str, str], _EventPushSummary] = {}
         for row in txn:
-            summaries[(row[0], row[1])] = _EventPushSummary(
-                unread_count=row[2],
-                stream_ordering=row[3],
+            summaries[(row[0], row[1], row[2])] = _EventPushSummary(
+                unread_count=row[3],
+                stream_ordering=row[4],
                 notif_count=0,
             )
 
@@ -1256,40 +1476,36 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         )
 
         for row in txn:
-            if (row[0], row[1]) in summaries:
-                summaries[(row[0], row[1])].notif_count = row[2]
+            if (row[0], row[1], row[2]) in summaries:
+                summaries[(row[0], row[1], row[2])].notif_count = row[3]
             else:
                 # Because the rules on notifying are different than the rules on marking
                 # a message unread, we might end up with messages that notify but aren't
                 # marked unread, so we might not have a summary for this (user, room)
                 # tuple to complete.
-                summaries[(row[0], row[1])] = _EventPushSummary(
+                summaries[(row[0], row[1], row[2])] = _EventPushSummary(
                     unread_count=0,
-                    stream_ordering=row[3],
-                    notif_count=row[2],
+                    stream_ordering=row[4],
+                    notif_count=row[3],
                 )
 
         logger.info("Rotating notifications, handling %d rows", len(summaries))
 
-        # Ensure that any updated threads have an updated thread_id.
-        self.db_pool.simple_update_many_txn(
-            txn,
-            table="event_push_summary",
-            key_names=("user_id", "room_id", "thread_id"),
-            key_values=[(user_id, room_id, None) for user_id, room_id in summaries],
-            value_names=("thread_id",),
-            value_values=[("main",) for _ in summaries],
-        )
-
-        # TODO(threads): Update on a per-thread basis.
         self.db_pool.simple_upsert_many_txn(
             txn,
             table="event_push_summary",
             key_names=("user_id", "room_id", "thread_id"),
-            key_values=[(user_id, room_id, "main") for user_id, room_id in summaries],
+            key_values=[
+                (user_id, room_id, thread_id)
+                for user_id, room_id, thread_id in summaries
+            ],
             value_names=("notif_count", "unread_count", "stream_ordering"),
             value_values=[
-                (summary.notif_count, summary.unread_count, summary.stream_ordering)
+                (
+                    summary.notif_count,
+                    summary.unread_count,
+                    summary.stream_ordering,
+                )
                 for summary in summaries.values()
             ],
         )
@@ -1300,7 +1516,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         )
 
     async def _remove_old_push_actions_that_have_rotated(self) -> None:
-        """Clear out old push actions that have been summarised."""
+        """
+        Clear out old push actions that have been summarised (and are older than
+        1 day ago).
+        """
 
         # We want to clear out anything that is older than a day that *has* already
         # been rotated.
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index ed17b2e70c..8295322b0e 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -81,15 +81,10 @@ def _load_rules(
         for rawrule in rawrules
     ]
 
-    push_rules = PushRules(
-        ruleslist,
-    )
+    push_rules = PushRules(ruleslist)
 
     filtered_rules = FilteredPushRules(
-        push_rules,
-        enabled_map,
-        msc3786_enabled=experimental_config.msc3786_enabled,
-        msc3772_enabled=experimental_config.msc3772_enabled,
+        push_rules, enabled_map, msc3772_enabled=experimental_config.msc3772_enabled
     )
 
     return filtered_rules
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 898947af95..154385b1e8 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -832,6 +832,42 @@ class RelationsWorkerStore(SQLBaseStore):
             "get_event_relations", _get_event_relations
         )
 
+    @cached()
+    async def get_thread_id(self, event_id: str) -> Optional[str]:
+        """
+        Get the thread ID for an event. This considers multi-level relations,
+        e.g. an annotation to an event which is part of a thread.
+
+        Args:
+            event_id: The event ID to fetch the thread ID for.
+
+        Returns:
+            The event ID of the root event in the thread, if this event is part
+            of a thread. None, otherwise.
+        """
+        # Since event relations form a tree, we should only ever find 0 or 1
+        # results from the below query.
+        sql = """
+            WITH RECURSIVE related_events AS (
+                SELECT event_id, relates_to_id, relation_type
+                FROM event_relations
+                WHERE event_id = ?
+                UNION SELECT e.event_id, e.relates_to_id, e.relation_type
+                FROM event_relations e
+                INNER JOIN related_events r ON r.relates_to_id = e.event_id
+            ) SELECT relates_to_id FROM related_events WHERE relation_type = 'm.thread';
+        """
+
+        def _get_thread_id(txn: LoggingTransaction) -> Optional[str]:
+            txn.execute(sql, (event_id,))
+            # TODO Should we ensure there's only a single result here?
+            row = txn.fetchone()
+            if row:
+                return row[0]
+            return None
+
+        return await self.db_pool.runInteraction("get_thread_id", _get_thread_id)
+
 
 class RelationsStore(RelationsWorkerStore):
     pass
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 7412bce255..e41c99027a 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -207,21 +207,30 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
     def _construct_room_type_where_clause(
         self, room_types: Union[List[Union[str, None]], None]
-    ) -> Tuple[Union[str, None], List[str]]:
+    ) -> Tuple[Union[str, None], list]:
         if not room_types:
             return None, []
-        else:
-            # We use None when we want get rooms without a type
-            is_null_clause = ""
-            if None in room_types:
-                is_null_clause = "OR room_type IS NULL"
-                room_types = [value for value in room_types if value is not None]
 
+        # Since None is used to represent a room without a type, care needs to
+        # be taken into account when constructing the where clause.
+        clauses = []
+        args: list = []
+
+        room_types_set = set(room_types)
+
+        # We use None to represent a room without a type.
+        if None in room_types_set:
+            clauses.append("room_type IS NULL")
+            room_types_set.remove(None)
+
+        # If there are other room types, generate the proper clause.
+        if room_types:
             list_clause, args = make_in_list_sql_clause(
-                self.database_engine, "room_type", room_types
+                self.database_engine, "room_type", room_types_set
             )
+            clauses.append(list_clause)
 
-            return f"({list_clause} {is_null_clause})", args
+        return f"({' OR '.join(clauses)})", args
 
     async def count_public_rooms(
         self,
@@ -241,14 +250,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         def _count_public_rooms_txn(txn: LoggingTransaction) -> int:
             query_args = []
 
-            room_type_clause, args = self._construct_room_type_where_clause(
-                search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None)
-                if search_filter
-                else None
-            )
-            room_type_clause = f" AND {room_type_clause}" if room_type_clause else ""
-            query_args += args
-
             if network_tuple:
                 if network_tuple.appservice_id:
                     published_sql = """
@@ -268,6 +269,14 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
                     UNION SELECT room_id from appservice_room_list
             """
 
+            room_type_clause, args = self._construct_room_type_where_clause(
+                search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None)
+                if search_filter
+                else None
+            )
+            room_type_clause = f" AND {room_type_clause}" if room_type_clause else ""
+            query_args += args
+
             sql = f"""
                 SELECT
                     COUNT(*)
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 4a5c947699..19dbf2da7f 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -90,9 +90,9 @@ Changes in SCHEMA_VERSION = 73;
 
 
 SCHEMA_COMPAT_VERSION = (
-    # The groups tables are no longer accessible, so synapses with SCHEMA_VERSION < 72
-    # could break.
-    72
+    # The threads_id column must exist for event_push_actions, event_push_summary,
+    # receipts_linearized, and receipts_graph.
+    73
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql
new file mode 100644
index 0000000000..0ffde9bbeb
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql
@@ -0,0 +1,29 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Forces the background updates from 06thread_notifications.sql to run in the
+-- foreground as code will now require those to be "done".
+
+DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
+
+-- Overwrite any null thread_id columns.
+UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
+
+-- Do not run the event_push_summary_unique_index job if it is pending; the
+-- thread_id field will be made required.
+DELETE FROM background_updates WHERE update_name = 'event_push_summary_unique_index';
+DROP INDEX IF EXISTS event_push_summary_unique_index;
diff --git a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres
new file mode 100644
index 0000000000..33674f8c62
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres
@@ -0,0 +1,19 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The columns can now be made non-nullable.
+ALTER TABLE event_push_actions_staging ALTER COLUMN thread_id SET NOT NULL;
+ALTER TABLE event_push_actions ALTER COLUMN thread_id SET NOT NULL;
+ALTER TABLE event_push_summary ALTER COLUMN thread_id SET NOT NULL;
diff --git a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite
new file mode 100644
index 0000000000..5322ad77a4
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite
@@ -0,0 +1,101 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- SQLite doesn't support modifying columns to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE event_push_actions_staging_new (
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    actions TEXT NOT NULL,
+    notif SMALLINT NOT NULL,
+    highlight SMALLINT NOT NULL,
+    unread SMALLINT,
+    thread_id TEXT NOT NULL,
+    inserted_ts BIGINT
+);
+
+CREATE TABLE event_push_actions_new (
+    room_id TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    profile_tag VARCHAR(32),
+    actions TEXT NOT NULL,
+    topological_ordering BIGINT,
+    stream_ordering BIGINT,
+    notif SMALLINT,
+    highlight SMALLINT,
+    unread SMALLINT,
+    thread_id TEXT NOT NULL,
+    CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag)
+);
+
+CREATE TABLE event_push_summary_new (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    notif_count BIGINT NOT NULL,
+    stream_ordering BIGINT NOT NULL,
+    unread_count BIGINT,
+    last_receipt_stream_ordering BIGINT,
+    thread_id TEXT NOT NULL
+);
+
+-- Swap the indexes.
+DROP INDEX IF EXISTS event_push_actions_staging_id;
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging_new(event_id);
+
+DROP INDEX IF EXISTS event_push_actions_room_id_user_id;
+DROP INDEX IF EXISTS event_push_actions_rm_tokens;
+DROP INDEX IF EXISTS event_push_actions_stream_ordering;
+DROP INDEX IF EXISTS event_push_actions_u_highlight;
+DROP INDEX IF EXISTS event_push_actions_highlights_index;
+CREATE INDEX event_push_actions_room_id_user_id on event_push_actions_new(room_id, user_id);
+CREATE INDEX event_push_actions_rm_tokens on event_push_actions_new( user_id, room_id, topological_ordering, stream_ordering );
+CREATE INDEX event_push_actions_stream_ordering on event_push_actions_new( stream_ordering, user_id );
+CREATE INDEX event_push_actions_u_highlight ON event_push_actions_new (user_id, stream_ordering);
+CREATE INDEX event_push_actions_highlights_index ON event_push_actions_new (user_id, room_id, topological_ordering, stream_ordering);
+
+-- Copy the data.
+INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts)
+    SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts
+    FROM event_push_actions_staging;
+
+INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id)
+    SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id
+    FROM event_push_actions;
+
+INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
+    SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id
+    FROM event_push_summary;
+
+-- Drop the old tables.
+DROP TABLE event_push_actions_staging;
+DROP TABLE event_push_actions;
+DROP TABLE event_push_summary;
+
+-- Rename the tables.
+ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging;
+ALTER TABLE event_push_actions_new RENAME TO event_push_actions;
+ALTER TABLE event_push_summary_new RENAME TO event_push_summary;
+
+-- Re-run background updates from 72/02event_push_actions_index.sql and
+-- 72/06thread_notifications.sql.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7307, 'event_push_summary_unique_index2', '{}')
+  ON CONFLICT (update_name) DO NOTHING;
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7307, 'event_push_actions_stream_highlight_index', '{}')
+  ON CONFLICT (update_name) DO NOTHING;
diff --git a/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres
new file mode 100644
index 0000000000..3e0bc9e5eb
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop constraint on (room_id, receipt_type, user_id).
+
+-- Rebuild the unique constraint with the thread_id.
+ALTER TABLE receipts_linearized
+    DROP CONSTRAINT receipts_linearized_uniqueness;
+
+ALTER TABLE receipts_graph
+    DROP CONSTRAINT receipts_graph_uniqueness;
diff --git a/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite
new file mode 100644
index 0000000000..e664889fbc
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite
@@ -0,0 +1,76 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop constraint on (room_id, receipt_type, user_id).
+--
+-- SQLite doesn't support modifying constraints to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE receipts_linearized_new (
+    stream_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    receipt_type TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    thread_id TEXT,
+    event_stream_ordering BIGINT,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+CREATE TABLE receipts_graph_new (
+    room_id TEXT NOT NULL,
+    receipt_type TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    event_ids TEXT NOT NULL,
+    thread_id TEXT,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+-- Drop the old indexes.
+DROP INDEX IF EXISTS receipts_linearized_id;
+DROP INDEX IF EXISTS receipts_linearized_room_stream;
+DROP INDEX IF EXISTS receipts_linearized_user;
+
+-- Copy the data.
+INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data)
+    SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+    FROM receipts_linearized;
+INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
+    SELECT room_id, receipt_type, user_id, event_ids, data
+    FROM receipts_graph;
+
+-- Drop the old tables.
+DROP TABLE receipts_linearized;
+DROP TABLE receipts_graph;
+
+-- Rename the tables.
+ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
+ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
+
+-- Create the indices.
+CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
+CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
+CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
+
+-- Re-run background updates from 72/08thread_receipts.sql.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7308, 'receipts_linearized_unique_index', '{}')
+  ON CONFLICT (update_name) DO NOTHING;
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7308, 'receipts_graph_unique_index', '{}')
+  ON CONFLICT (update_name) DO NOTHING;
diff --git a/tests/appservice/test_api.py b/tests/appservice/test_api.py
index 532b676365..11008ac1fb 100644
--- a/tests/appservice/test_api.py
+++ b/tests/appservice/test_api.py
@@ -69,10 +69,14 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
 
         self.request_url = None
 
-        async def get_json(url: str, args: Mapping[Any, Any]) -> List[JsonDict]:
-            if not args.get(b"access_token"):
+        async def get_json(
+            url: str, args: Mapping[Any, Any], headers: Mapping[Any, Any]
+        ) -> List[JsonDict]:
+            # Ensure the access token is passed as both a header and query arg.
+            if not headers.get("Authorization") or not args.get(b"access_token"):
                 raise RuntimeError("Access token not provided")
 
+            self.assertEqual(headers.get("Authorization"), f"Bearer {TOKEN}")
             self.assertEqual(args.get(b"access_token"), TOKEN)
             self.request_url = url
             if url == URL_USER:
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index efd92793c0..d42e36cdf1 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -22,7 +22,10 @@ from synapse.api.room_versions import RoomVersions
 from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
 from synapse.handlers.room import RoomEventSource
 from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.storage.databases.main.event_push_actions import NotifCounts
+from synapse.storage.databases.main.event_push_actions import (
+    NotifCounts,
+    RoomNotifCounts,
+)
 from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
 from synapse.types import PersistedEventPosition
 
@@ -178,7 +181,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2],
-            NotifCounts(highlight_count=0, unread_count=0, notify_count=0),
+            RoomNotifCounts(
+                NotifCounts(highlight_count=0, unread_count=0, notify_count=0), {}
+            ),
         )
 
         self.persist(
@@ -191,7 +196,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2],
-            NotifCounts(highlight_count=0, unread_count=0, notify_count=1),
+            RoomNotifCounts(
+                NotifCounts(highlight_count=0, unread_count=0, notify_count=1), {}
+            ),
         )
 
         self.persist(
@@ -206,7 +213,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2],
-            NotifCounts(highlight_count=1, unread_count=0, notify_count=2),
+            RoomNotifCounts(
+                NotifCounts(highlight_count=1, unread_count=0, notify_count=2), {}
+            ),
         )
 
     def test_get_rooms_for_user_with_stream_ordering(self):
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 5e66b5b26c..3612ebe7b9 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -2213,14 +2213,17 @@ class PublicRoomsRoomTypeFilterTestCase(unittest.HomeserverTestCase):
         )
 
     def make_public_rooms_request(
-        self, room_types: Union[List[Union[str, None]], None]
+        self,
+        room_types: Optional[List[Union[str, None]]],
+        instance_id: Optional[str] = None,
     ) -> Tuple[List[Dict[str, Any]], int]:
-        channel = self.make_request(
-            "POST",
-            self.url,
-            {"filter": {PublicRoomsFilterFields.ROOM_TYPES: room_types}},
-            self.token,
-        )
+        body: JsonDict = {"filter": {PublicRoomsFilterFields.ROOM_TYPES: room_types}}
+        if instance_id:
+            body["third_party_instance_id"] = "test|test"
+
+        channel = self.make_request("POST", self.url, body, self.token)
+        self.assertEqual(channel.code, 200)
+
         chunk = channel.json_body["chunk"]
         count = channel.json_body["total_room_count_estimate"]
 
@@ -2230,31 +2233,49 @@ class PublicRoomsRoomTypeFilterTestCase(unittest.HomeserverTestCase):
 
     def test_returns_both_rooms_and_spaces_if_no_filter(self) -> None:
         chunk, count = self.make_public_rooms_request(None)
-
         self.assertEqual(count, 2)
 
+        # Also check if there's no filter property at all in the body.
+        channel = self.make_request("POST", self.url, {}, self.token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(len(channel.json_body["chunk"]), 2)
+        self.assertEqual(channel.json_body["total_room_count_estimate"], 2)
+
+        chunk, count = self.make_public_rooms_request(None, "test|test")
+        self.assertEqual(count, 0)
+
     def test_returns_only_rooms_based_on_filter(self) -> None:
         chunk, count = self.make_public_rooms_request([None])
 
         self.assertEqual(count, 1)
         self.assertEqual(chunk[0].get("room_type", None), None)
 
+        chunk, count = self.make_public_rooms_request([None], "test|test")
+        self.assertEqual(count, 0)
+
     def test_returns_only_space_based_on_filter(self) -> None:
         chunk, count = self.make_public_rooms_request(["m.space"])
 
         self.assertEqual(count, 1)
         self.assertEqual(chunk[0].get("room_type", None), "m.space")
 
+        chunk, count = self.make_public_rooms_request(["m.space"], "test|test")
+        self.assertEqual(count, 0)
+
     def test_returns_both_rooms_and_space_based_on_filter(self) -> None:
         chunk, count = self.make_public_rooms_request(["m.space", None])
-
         self.assertEqual(count, 2)
 
+        chunk, count = self.make_public_rooms_request(["m.space", None], "test|test")
+        self.assertEqual(count, 0)
+
     def test_returns_both_rooms_and_spaces_if_array_is_empty(self) -> None:
         chunk, count = self.make_public_rooms_request([])
-
         self.assertEqual(count, 2)
 
+        chunk, count = self.make_public_rooms_request([], "test|test")
+        self.assertEqual(count, 0)
+
 
 class PublicRoomsTestRemoteSearchFallbackTestCase(unittest.HomeserverTestCase):
     """Test that we correctly fallback to local filtering if a remote server
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index 473c965e19..ee48920f84 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -12,14 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Tuple
+from typing import Optional, Tuple
 
 from twisted.test.proto_helpers import MemoryReactor
 
+from synapse.api.constants import MAIN_TIMELINE, RelationTypes
 from synapse.rest import admin
 from synapse.rest.client import login, room
 from synapse.server import HomeServer
 from synapse.storage.databases.main.event_push_actions import NotifCounts
+from synapse.types import JsonDict
 from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
@@ -64,16 +66,23 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         user_id, token, _, other_token, room_id = self._create_users_and_room()
 
         # Create two events, one of which is a highlight.
-        self.helper.send_event(
+        first_event_id = self.helper.send_event(
             room_id,
             type="m.room.message",
             content={"msgtype": "m.text", "body": "msg"},
             tok=other_token,
-        )
-        event_id = self.helper.send_event(
+        )["event_id"]
+        second_event_id = self.helper.send_event(
             room_id,
             type="m.room.message",
-            content={"msgtype": "m.text", "body": user_id},
+            content={
+                "msgtype": "m.text",
+                "body": user_id,
+                "m.relates_to": {
+                    "rel_type": RelationTypes.THREAD,
+                    "event_id": first_event_id,
+                },
+            },
             tok=other_token,
         )["event_id"]
 
@@ -93,13 +102,13 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         )
         self.assertEqual(2, len(email_actions))
 
-        # Send a receipt, which should clear any actions.
+        # Send a receipt, which should clear the first action.
         self.get_success(
             self.store.insert_receipt(
                 room_id,
                 "m.read",
                 user_id=user_id,
-                event_ids=[event_id],
+                event_ids=[first_event_id],
                 thread_id=None,
                 data={},
             )
@@ -109,6 +118,30 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
                 user_id, 0, 1000, 20
             )
         )
+        self.assertEqual(1, len(http_actions))
+        email_actions = self.get_success(
+            self.store.get_unread_push_actions_for_user_in_range_for_email(
+                user_id, 0, 1000, 20
+            )
+        )
+        self.assertEqual(1, len(email_actions))
+
+        # Send a thread receipt to clear the thread action.
+        self.get_success(
+            self.store.insert_receipt(
+                room_id,
+                "m.read",
+                user_id=user_id,
+                event_ids=[second_event_id],
+                thread_id=first_event_id,
+                data={},
+            )
+        )
+        http_actions = self.get_success(
+            self.store.get_unread_push_actions_for_user_in_range_for_http(
+                user_id, 0, 1000, 20
+            )
+        )
         self.assertEqual([], http_actions)
         email_actions = self.get_success(
             self.store.get_unread_push_actions_for_user_in_range_for_email(
@@ -133,13 +166,14 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
                 )
             )
             self.assertEqual(
-                counts,
+                counts.main_timeline,
                 NotifCounts(
                     notify_count=noitf_count,
                     unread_count=0,
                     highlight_count=highlight_count,
                 ),
             )
+            self.assertEqual(counts.threads, {})
 
         def _create_event(highlight: bool = False) -> str:
             result = self.helper.send_event(
@@ -186,6 +220,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         _assert_counts(0, 0)
 
         _create_event()
+        _assert_counts(1, 0)
         _rotate()
         _assert_counts(1, 0)
 
@@ -236,6 +271,444 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         _rotate()
         _assert_counts(0, 0)
 
+    def test_count_aggregation_threads(self) -> None:
+        """
+        This is essentially the same test as test_count_aggregation, but adds
+        events to the main timeline and to a thread.
+        """
+
+        user_id, token, _, other_token, room_id = self._create_users_and_room()
+        thread_id: str
+
+        last_event_id: str
+
+        def _assert_counts(
+            noitf_count: int,
+            highlight_count: int,
+            thread_notif_count: int,
+            thread_highlight_count: int,
+        ) -> None:
+            counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-unread-counts",
+                    self.store._get_unread_counts_by_receipt_txn,
+                    room_id,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                counts.main_timeline,
+                NotifCounts(
+                    notify_count=noitf_count,
+                    unread_count=0,
+                    highlight_count=highlight_count,
+                ),
+            )
+            if thread_notif_count or thread_highlight_count:
+                self.assertEqual(
+                    counts.threads,
+                    {
+                        thread_id: NotifCounts(
+                            notify_count=thread_notif_count,
+                            unread_count=0,
+                            highlight_count=thread_highlight_count,
+                        ),
+                    },
+                )
+            else:
+                self.assertEqual(counts.threads, {})
+
+        def _create_event(
+            highlight: bool = False, thread_id: Optional[str] = None
+        ) -> str:
+            content: JsonDict = {
+                "msgtype": "m.text",
+                "body": user_id if highlight else "msg",
+            }
+            if thread_id:
+                content["m.relates_to"] = {
+                    "rel_type": "m.thread",
+                    "event_id": thread_id,
+                }
+
+            result = self.helper.send_event(
+                room_id,
+                type="m.room.message",
+                content=content,
+                tok=other_token,
+            )
+            nonlocal last_event_id
+            last_event_id = result["event_id"]
+            return last_event_id
+
+        def _rotate() -> None:
+            self.get_success(self.store._rotate_notifs())
+
+        def _mark_read(event_id: str, thread_id: str = MAIN_TIMELINE) -> None:
+            self.get_success(
+                self.store.insert_receipt(
+                    room_id,
+                    "m.read",
+                    user_id=user_id,
+                    event_ids=[event_id],
+                    thread_id=thread_id,
+                    data={},
+                )
+            )
+
+        _assert_counts(0, 0, 0, 0)
+        thread_id = _create_event()
+        _assert_counts(1, 0, 0, 0)
+        _rotate()
+        _assert_counts(1, 0, 0, 0)
+
+        _create_event(thread_id=thread_id)
+        _assert_counts(1, 0, 1, 0)
+        _rotate()
+        _assert_counts(1, 0, 1, 0)
+
+        _create_event()
+        _assert_counts(2, 0, 1, 0)
+        _rotate()
+        _assert_counts(2, 0, 1, 0)
+
+        event_id = _create_event(thread_id=thread_id)
+        _assert_counts(2, 0, 2, 0)
+        _rotate()
+        _assert_counts(2, 0, 2, 0)
+
+        _create_event()
+        _create_event(thread_id=thread_id)
+        _mark_read(event_id)
+        _assert_counts(1, 0, 3, 0)
+        _mark_read(event_id, thread_id)
+        _assert_counts(1, 0, 1, 0)
+
+        _mark_read(last_event_id)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event()
+        _create_event(thread_id=thread_id)
+        _assert_counts(1, 0, 1, 0)
+        _rotate()
+        _assert_counts(1, 0, 1, 0)
+
+        # Delete old event push actions, this should not affect the (summarised) count.
+        self.get_success(self.store._remove_old_push_actions_that_have_rotated())
+        _assert_counts(1, 0, 1, 0)
+
+        _mark_read(last_event_id)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event(True)
+        _assert_counts(1, 1, 0, 0)
+        _rotate()
+        _assert_counts(1, 1, 0, 0)
+
+        event_id = _create_event(True, thread_id)
+        _assert_counts(1, 1, 1, 1)
+        _rotate()
+        _assert_counts(1, 1, 1, 1)
+
+        # Check that adding another notification and rotating after highlight
+        # works.
+        _create_event()
+        _rotate()
+        _assert_counts(2, 1, 1, 1)
+
+        _create_event(thread_id=thread_id)
+        _rotate()
+        _assert_counts(2, 1, 2, 1)
+
+        # Check that sending read receipts at different points results in the
+        # right counts.
+        _mark_read(event_id)
+        _assert_counts(1, 0, 2, 1)
+        _mark_read(event_id, thread_id)
+        _assert_counts(1, 0, 1, 0)
+        _mark_read(last_event_id)
+        _assert_counts(0, 0, 1, 0)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event(True)
+        _create_event(True, thread_id)
+        _assert_counts(1, 1, 1, 1)
+        _mark_read(last_event_id)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+        _rotate()
+        _assert_counts(0, 0, 0, 0)
+
+    def test_count_aggregation_mixed(self) -> None:
+        """
+        This is essentially the same test as test_count_aggregation_threads, but
+        sends both unthreaded and threaded receipts.
+        """
+
+        user_id, token, _, other_token, room_id = self._create_users_and_room()
+        thread_id: str
+
+        last_event_id: str
+
+        def _assert_counts(
+            noitf_count: int,
+            highlight_count: int,
+            thread_notif_count: int,
+            thread_highlight_count: int,
+        ) -> None:
+            counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-unread-counts",
+                    self.store._get_unread_counts_by_receipt_txn,
+                    room_id,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                counts.main_timeline,
+                NotifCounts(
+                    notify_count=noitf_count,
+                    unread_count=0,
+                    highlight_count=highlight_count,
+                ),
+            )
+            if thread_notif_count or thread_highlight_count:
+                self.assertEqual(
+                    counts.threads,
+                    {
+                        thread_id: NotifCounts(
+                            notify_count=thread_notif_count,
+                            unread_count=0,
+                            highlight_count=thread_highlight_count,
+                        ),
+                    },
+                )
+            else:
+                self.assertEqual(counts.threads, {})
+
+        def _create_event(
+            highlight: bool = False, thread_id: Optional[str] = None
+        ) -> str:
+            content: JsonDict = {
+                "msgtype": "m.text",
+                "body": user_id if highlight else "msg",
+            }
+            if thread_id:
+                content["m.relates_to"] = {
+                    "rel_type": "m.thread",
+                    "event_id": thread_id,
+                }
+
+            result = self.helper.send_event(
+                room_id,
+                type="m.room.message",
+                content=content,
+                tok=other_token,
+            )
+            nonlocal last_event_id
+            last_event_id = result["event_id"]
+            return last_event_id
+
+        def _rotate() -> None:
+            self.get_success(self.store._rotate_notifs())
+
+        def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None:
+            self.get_success(
+                self.store.insert_receipt(
+                    room_id,
+                    "m.read",
+                    user_id=user_id,
+                    event_ids=[event_id],
+                    thread_id=thread_id,
+                    data={},
+                )
+            )
+
+        _assert_counts(0, 0, 0, 0)
+        thread_id = _create_event()
+        _assert_counts(1, 0, 0, 0)
+        _rotate()
+        _assert_counts(1, 0, 0, 0)
+
+        _create_event(thread_id=thread_id)
+        _assert_counts(1, 0, 1, 0)
+        _rotate()
+        _assert_counts(1, 0, 1, 0)
+
+        _create_event()
+        _assert_counts(2, 0, 1, 0)
+        _rotate()
+        _assert_counts(2, 0, 1, 0)
+
+        event_id = _create_event(thread_id=thread_id)
+        _assert_counts(2, 0, 2, 0)
+        _rotate()
+        _assert_counts(2, 0, 2, 0)
+
+        _create_event()
+        _create_event(thread_id=thread_id)
+        _mark_read(event_id)
+        _assert_counts(1, 0, 1, 0)
+
+        _mark_read(last_event_id, MAIN_TIMELINE)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event()
+        _create_event(thread_id=thread_id)
+        _assert_counts(1, 0, 1, 0)
+        _rotate()
+        _assert_counts(1, 0, 1, 0)
+
+        # Delete old event push actions, this should not affect the (summarised) count.
+        self.get_success(self.store._remove_old_push_actions_that_have_rotated())
+        _assert_counts(1, 0, 1, 0)
+
+        _mark_read(last_event_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event(True)
+        _assert_counts(1, 1, 0, 0)
+        _rotate()
+        _assert_counts(1, 1, 0, 0)
+
+        event_id = _create_event(True, thread_id)
+        _assert_counts(1, 1, 1, 1)
+        _rotate()
+        _assert_counts(1, 1, 1, 1)
+
+        # Check that adding another notification and rotating after highlight
+        # works.
+        _create_event()
+        _rotate()
+        _assert_counts(2, 1, 1, 1)
+
+        _create_event(thread_id=thread_id)
+        _rotate()
+        _assert_counts(2, 1, 2, 1)
+
+        # Check that sending read receipts at different points results in the
+        # right counts.
+        _mark_read(event_id)
+        _assert_counts(1, 0, 1, 0)
+        _mark_read(event_id, MAIN_TIMELINE)
+        _assert_counts(1, 0, 1, 0)
+        _mark_read(last_event_id, MAIN_TIMELINE)
+        _assert_counts(0, 0, 1, 0)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event(True)
+        _create_event(True, thread_id)
+        _assert_counts(1, 1, 1, 1)
+        _mark_read(last_event_id)
+        _assert_counts(0, 0, 0, 0)
+        _rotate()
+        _assert_counts(0, 0, 0, 0)
+
+    def test_recursive_thread(self) -> None:
+        """
+        Events related to events in a thread should still be considered part of
+        that thread.
+        """
+
+        # Create a user to receive notifications and send receipts.
+        user_id = self.register_user("user1235", "pass")
+        token = self.login("user1235", "pass")
+
+        # And another users to send events.
+        other_id = self.register_user("other", "pass")
+        other_token = self.login("other", "pass")
+
+        # Create a room and put both users in it.
+        room_id = self.helper.create_room_as(user_id, tok=token)
+        self.helper.join(room_id, other_id, tok=other_token)
+
+        # Update the user's push rules to care about reaction events.
+        self.get_success(
+            self.store.add_push_rule(
+                user_id,
+                "related_events",
+                priority_class=5,
+                conditions=[
+                    {"kind": "event_match", "key": "type", "pattern": "m.reaction"}
+                ],
+                actions=["notify"],
+            )
+        )
+
+        def _create_event(type: str, content: JsonDict) -> str:
+            result = self.helper.send_event(
+                room_id, type=type, content=content, tok=other_token
+            )
+            return result["event_id"]
+
+        def _assert_counts(noitf_count: int, thread_notif_count: int) -> None:
+            counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-unread-counts",
+                    self.store._get_unread_counts_by_receipt_txn,
+                    room_id,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                counts.main_timeline,
+                NotifCounts(
+                    notify_count=noitf_count, unread_count=0, highlight_count=0
+                ),
+            )
+            if thread_notif_count:
+                self.assertEqual(
+                    counts.threads,
+                    {
+                        thread_id: NotifCounts(
+                            notify_count=thread_notif_count,
+                            unread_count=0,
+                            highlight_count=0,
+                        ),
+                    },
+                )
+            else:
+                self.assertEqual(counts.threads, {})
+
+        # Create a root event.
+        thread_id = _create_event(
+            "m.room.message", {"msgtype": "m.text", "body": "msg"}
+        )
+        _assert_counts(1, 0)
+
+        # Reply, creating a thread.
+        reply_id = _create_event(
+            "m.room.message",
+            {
+                "msgtype": "m.text",
+                "body": "msg",
+                "m.relates_to": {
+                    "rel_type": "m.thread",
+                    "event_id": thread_id,
+                },
+            },
+        )
+        _assert_counts(1, 1)
+
+        # Create an event related to a thread event, this should still appear in
+        # the thread.
+        _create_event(
+            type="m.reaction",
+            content={
+                "m.relates_to": {
+                    "rel_type": "m.annotation",
+                    "event_id": reply_id,
+                    "key": "A",
+                }
+            },
+        )
+        _assert_counts(1, 2)
+
     def test_find_first_stream_ordering_after_ts(self) -> None:
         def add_event(so: int, ts: int) -> None:
             self.get_success(