summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-x.ci/scripts/test_old_deps.sh8
-rw-r--r--.github/workflows/docker.yml14
-rw-r--r--.github/workflows/tests.yml8
-rw-r--r--.github/workflows/twisted_trunk.yml2
-rw-r--r--CHANGES.md20
-rw-r--r--changelog.d/11612.bugfix1
-rw-r--r--changelog.d/11621.feature1
-rw-r--r--changelog.d/11639.feature1
-rw-r--r--changelog.d/11683.removal1
-rw-r--r--changelog.d/11767.bugfix1
-rw-r--r--changelog.d/11784.bugfix1
-rw-r--r--changelog.d/11788.feature1
-rw-r--r--changelog.d/11789.feature1
-rw-r--r--changelog.d/11792.misc1
-rw-r--r--changelog.d/11793.misc1
-rw-r--r--changelog.d/11794.misc1
-rw-r--r--changelog.d/11795.misc1
-rw-r--r--changelog.d/11810.misc1
-rw-r--r--changelog.d/11817.misc1
-rw-r--r--docker/Dockerfile-pgtests2
-rwxr-xr-xdocker/run_pg_tests.sh2
-rw-r--r--docs/admin_api/user_admin_api.md6
-rw-r--r--docs/admin_api/version_api.md2
-rw-r--r--docs/usage/administration/admin_api/federation.md40
-rwxr-xr-xsetup.py2
-rw-r--r--synapse/__init__.py4
-rw-r--r--synapse/app/_base.py11
-rw-r--r--synapse/events/__init__.py13
-rw-r--r--synapse/events/snapshot.py2
-rw-r--r--synapse/events/validator.py4
-rw-r--r--synapse/federation/transport/server/__init__.py16
-rw-r--r--synapse/federation/transport/server/_base.py14
-rw-r--r--synapse/federation/transport/server/federation.py24
-rw-r--r--synapse/federation/transport/server/groups_local.py8
-rw-r--r--synapse/federation/transport/server/groups_server.py8
-rw-r--r--synapse/handlers/deactivate_account.py3
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/http/client.py15
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/replication/slave/storage/events.py2
-rw-r--r--synapse/rest/admin/__init__.py6
-rw-r--r--synapse/rest/admin/federation.py44
-rw-r--r--synapse/rest/client/account_data.py2
-rw-r--r--synapse/rest/media/v1/preview_html.py31
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py224
-rw-r--r--synapse/storage/databases/main/account_data.py83
-rw-r--r--synapse/storage/databases/main/event_federation.py2
-rw-r--r--synapse/storage/databases/main/events.py7
-rw-r--r--synapse/storage/databases/main/purge_events.py1
-rw-r--r--synapse/storage/databases/main/signatures.py54
-rw-r--r--synapse/storage/engines/postgres.py4
-rw-r--r--synapse/storage/schema/__init__.py11
-rw-r--r--synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql18
-rw-r--r--synapse/storage/schema/main/delta/68/01event_columns.sql26
-rw-r--r--synapse/visibility.py2
-rw-r--r--tests/handlers/test_deactivate_account.py219
-rw-r--r--tests/replication/slave/storage/test_account_data.py4
-rw-r--r--tests/rest/admin/test_federation.py55
-rw-r--r--tests/rest/admin/test_room.py1
-rw-r--r--tests/rest/media/v1/test_html_preview.py (renamed from tests/test_preview.py)34
-rw-r--r--tests/rest/media/v1/test_url_preview.py81
-rw-r--r--tests/server.py2
-rw-r--r--tests/storage/test_event_chain.py5
-rw-r--r--tox.ini3
64 files changed, 953 insertions, 216 deletions
diff --git a/.ci/scripts/test_old_deps.sh b/.ci/scripts/test_old_deps.sh
index 8b473936f8..a54aa86fbc 100755
--- a/.ci/scripts/test_old_deps.sh
+++ b/.ci/scripts/test_old_deps.sh
@@ -1,12 +1,14 @@
 #!/usr/bin/env bash
-
-# this script is run by GitHub Actions in a plain `bionic` container; it installs the
+# this script is run by GitHub Actions in a plain `focal` container; it installs the
 # minimal requirements for tox and hands over to the py3-old tox environment.
 
+# Prevent tzdata from asking for user input
+export DEBIAN_FRONTEND=noninteractive
+
 set -ex
 
 apt-get update
-apt-get install -y python3 python3-dev python3-pip libxml2-dev libxslt-dev xmlsec1 zlib1g-dev tox
+apt-get install -y python3 python3-dev python3-pip libxml2-dev libxslt-dev xmlsec1 zlib1g-dev tox libjpeg-dev libwebp-dev
 
 export LANG="C.UTF-8"
 
diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml
index 3276d1e122..124b17458f 100644
--- a/.github/workflows/docker.yml
+++ b/.github/workflows/docker.yml
@@ -34,6 +34,8 @@ jobs:
           username: ${{ secrets.DOCKERHUB_USERNAME }}
           password: ${{ secrets.DOCKERHUB_TOKEN }}
 
+      # TODO: consider using https://github.com/docker/metadata-action instead of this
+      # custom magic
       - name: Calculate docker image tag
         id: set-tag
         run: |
@@ -53,18 +55,6 @@ jobs:
           esac
           echo "::set-output name=tag::$tag"
 
-        # for release builds, we want to get the amd64 image out asap, so first
-        # we do an amd64-only build, before following up with a multiarch build.
-      - name: Build and push amd64
-        uses: docker/build-push-action@v2
-        if: "${{ startsWith(github.ref, 'refs/tags/v') }}"
-        with:
-          push: true
-          labels: "gitsha1=${{ github.sha }}"
-          tags: "matrixdotorg/synapse:${{ steps.set-tag.outputs.tag }}"
-          file: "docker/Dockerfile"
-          platforms: linux/amd64
-
       - name: Build and push all platforms
         uses: docker/build-push-action@v2
         with:
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 4f58069702..e47671102e 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -141,7 +141,7 @@ jobs:
     steps:
       - uses: actions/checkout@v2
       - name: Test with old deps
-        uses: docker://ubuntu:bionic # For old python and sqlite
+        uses: docker://ubuntu:focal # For old python and sqlite
         with:
           workdir: /github/workspace
           entrypoint: .ci/scripts/test_old_deps.sh
@@ -213,15 +213,15 @@ jobs:
       fail-fast: false
       matrix:
         include:
-          - sytest-tag: bionic
+          - sytest-tag: focal
 
-          - sytest-tag: bionic
+          - sytest-tag: focal
             postgres: postgres
 
           - sytest-tag: testing
             postgres: postgres
 
-          - sytest-tag: bionic
+          - sytest-tag: focal
             postgres: multi-postgres
             workers: workers
 
diff --git a/.github/workflows/twisted_trunk.yml b/.github/workflows/twisted_trunk.yml
index e974ac7aba..fb9d46b7bf 100644
--- a/.github/workflows/twisted_trunk.yml
+++ b/.github/workflows/twisted_trunk.yml
@@ -25,7 +25,7 @@ jobs:
       - run: sudo apt-get -qq install xmlsec1
       - uses: actions/setup-python@v2
         with:
-          python-version: 3.6
+          python-version: 3.7
       - run: .ci/patch_for_twisted_trunk.sh
       - run: pip install tox
       - run: tox -e py
diff --git a/CHANGES.md b/CHANGES.md
index ba76b5017e..37b9e6bb96 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,17 @@ Bugfixes
 - Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. ([\#11806](https://github.com/matrix-org/synapse/issues/11806))
 
 
+Synapse 1.50.2 (2022-01-24)
+===========================
+
+This release includes the same bugfix as Synapse 1.51.0rc2.
+
+Bugfixes
+--------
+
+- Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. ([\#11806](https://github.com/matrix-org/synapse/issues/11806))
+
+
 Synapse 1.51.0rc1 (2022-01-21)
 ==============================
 
@@ -89,15 +100,6 @@ Internal Changes
 - Add some comments and type annotations for `_update_outliers_txn`. ([\#11776](https://github.com/matrix-org/synapse/issues/11776))
 
 
-Synapse 1.50.2 (2022-01-24)
-===========================
-
-Bugfixes
---------
-
-- Backport the sole fix from v1.51.0rc2. This fixes a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. ([\#11806](https://github.com/matrix-org/synapse/issues/11806))
-
-
 Synapse 1.50.1 (2022-01-18)
 ===========================
 
diff --git a/changelog.d/11612.bugfix b/changelog.d/11612.bugfix
new file mode 100644
index 0000000000..842f6892fd
--- /dev/null
+++ b/changelog.d/11612.bugfix
@@ -0,0 +1 @@
+Include the bundled aggregations in the `/sync` response, per [MSC2675](https://github.com/matrix-org/matrix-doc/pull/2675).
diff --git a/changelog.d/11621.feature b/changelog.d/11621.feature
new file mode 100644
index 0000000000..dc426fb658
--- /dev/null
+++ b/changelog.d/11621.feature
@@ -0,0 +1 @@
+Remove account data (including client config, push rules and ignored users) upon user deactivation.
\ No newline at end of file
diff --git a/changelog.d/11639.feature b/changelog.d/11639.feature
new file mode 100644
index 0000000000..e9f6704f7a
--- /dev/null
+++ b/changelog.d/11639.feature
@@ -0,0 +1 @@
+Add admin API to reset connection timeouts for remote server.
\ No newline at end of file
diff --git a/changelog.d/11683.removal b/changelog.d/11683.removal
new file mode 100644
index 0000000000..b1f048f7f5
--- /dev/null
+++ b/changelog.d/11683.removal
@@ -0,0 +1 @@
+Drop support for Python 3.6, which is EOL.
\ No newline at end of file
diff --git a/changelog.d/11767.bugfix b/changelog.d/11767.bugfix
new file mode 100644
index 0000000000..3e344747f4
--- /dev/null
+++ b/changelog.d/11767.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug when previewing Reddit URLs which do not contain an image.
diff --git a/changelog.d/11784.bugfix b/changelog.d/11784.bugfix
new file mode 100644
index 0000000000..6569a8c299
--- /dev/null
+++ b/changelog.d/11784.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug that media streams could cause long-lived connections when generating URL previews.
diff --git a/changelog.d/11788.feature b/changelog.d/11788.feature
new file mode 100644
index 0000000000..dc426fb658
--- /dev/null
+++ b/changelog.d/11788.feature
@@ -0,0 +1 @@
+Remove account data (including client config, push rules and ignored users) upon user deactivation.
\ No newline at end of file
diff --git a/changelog.d/11789.feature b/changelog.d/11789.feature
new file mode 100644
index 0000000000..dc426fb658
--- /dev/null
+++ b/changelog.d/11789.feature
@@ -0,0 +1 @@
+Remove account data (including client config, push rules and ignored users) upon user deactivation.
\ No newline at end of file
diff --git a/changelog.d/11792.misc b/changelog.d/11792.misc
new file mode 100644
index 0000000000..6aa1cd61c3
--- /dev/null
+++ b/changelog.d/11792.misc
@@ -0,0 +1 @@
+Preparation for database schema simplifications: add `state_key` and `rejection_reason` columns to `events` table.
diff --git a/changelog.d/11793.misc b/changelog.d/11793.misc
new file mode 100644
index 0000000000..fc0530bf2c
--- /dev/null
+++ b/changelog.d/11793.misc
@@ -0,0 +1 @@
+Add `FrozenEvent.get_state_key` and use it in a couple of places.
diff --git a/changelog.d/11794.misc b/changelog.d/11794.misc
new file mode 100644
index 0000000000..29826bc0e5
--- /dev/null
+++ b/changelog.d/11794.misc
@@ -0,0 +1 @@
+Preparation for database schema simplifications: stop reading from `event_reference_hashes`.
diff --git a/changelog.d/11795.misc b/changelog.d/11795.misc
new file mode 100644
index 0000000000..aeba317670
--- /dev/null
+++ b/changelog.d/11795.misc
@@ -0,0 +1 @@
+Drop unused table `public_room_list_stream`.
diff --git a/changelog.d/11810.misc b/changelog.d/11810.misc
new file mode 100644
index 0000000000..5579b85979
--- /dev/null
+++ b/changelog.d/11810.misc
@@ -0,0 +1 @@
+Docker: skip the initial amd64-only build and go straight to multiarch.
diff --git a/changelog.d/11817.misc b/changelog.d/11817.misc
new file mode 100644
index 0000000000..bd29d8d6eb
--- /dev/null
+++ b/changelog.d/11817.misc
@@ -0,0 +1 @@
+Compatibility with updated type hints for jsonschema 4.4.0.
diff --git a/docker/Dockerfile-pgtests b/docker/Dockerfile-pgtests
index 92b804d193..b94484ea7f 100644
--- a/docker/Dockerfile-pgtests
+++ b/docker/Dockerfile-pgtests
@@ -1,6 +1,6 @@
 # Use the Sytest image that comes with a lot of the build dependencies
 # pre-installed
-FROM matrixdotorg/sytest:bionic
+FROM matrixdotorg/sytest:focal
 
 # The Sytest image doesn't come with python, so install that
 RUN apt-get update && apt-get -qq install -y python3 python3-dev python3-pip
diff --git a/docker/run_pg_tests.sh b/docker/run_pg_tests.sh
index 58e2177d34..b22b6ef16b 100755
--- a/docker/run_pg_tests.sh
+++ b/docker/run_pg_tests.sh
@@ -16,4 +16,4 @@ sudo -u postgres /usr/lib/postgresql/10/bin/pg_ctl -w -D /var/lib/postgresql/dat
 # Run the tests
 cd /src
 export TRIAL_FLAGS="-j 4"
-tox --workdir=./.tox-pg-container -e py36-postgres "$@"
+tox --workdir=./.tox-pg-container -e py37-postgres "$@"
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index c514cadb9d..fdc1f2d1cf 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -353,6 +353,11 @@ The following actions are performed when deactivating an user:
 - Remove the user from the user directory
 - Reject all pending invites
 - Remove all account validity information related to the user
+- Remove the arbitrary data store known as *account data*. For example, this includes:
+    - list of ignored users;
+    - push rules;
+    - secret storage keys; and
+    - cross-signing keys.
 
 The following additional actions are performed during deactivation if `erase`
 is set to `true`:
@@ -366,7 +371,6 @@ The following actions are **NOT** performed. The list may be incomplete.
 - Remove mappings of SSO IDs
 - [Delete media uploaded](#delete-media-uploaded-by-a-user) by user (included avatar images)
 - Delete sent and received messages
-- Delete E2E cross-signing keys
 - Remove the user's creation (registration) timestamp
 - [Remove rate limit overrides](#override-ratelimiting-for-users)
 - Remove from monthly active users
diff --git a/docs/admin_api/version_api.md b/docs/admin_api/version_api.md
index efb4a0c0f7..27977de0d3 100644
--- a/docs/admin_api/version_api.md
+++ b/docs/admin_api/version_api.md
@@ -16,6 +16,6 @@ It returns a JSON body like the following:
 ```json
 {
     "server_version": "0.99.2rc1 (b=develop, abcdef123)",
-    "python_version": "3.6.8"
+    "python_version": "3.7.8"
 }
 ```
diff --git a/docs/usage/administration/admin_api/federation.md b/docs/usage/administration/admin_api/federation.md
index 8f9535f57b..5e609561a6 100644
--- a/docs/usage/administration/admin_api/federation.md
+++ b/docs/usage/administration/admin_api/federation.md
@@ -86,7 +86,7 @@ The following fields are returned in the JSON response body:
 - `next_token`: string representing a positive integer - Indication for pagination. See above.
 - `total` - integer - Total number of destinations.
 
-# Destination Details API
+## Destination Details API
 
 This API gets the retry timing info for a specific remote server.
 
@@ -108,7 +108,45 @@ A response body like the following is returned:
 }
 ```
 
+**Parameters**
+
+The following parameters should be set in the URL:
+
+- `destination` - Name of the remote server.
+
 **Response**
 
 The response fields are the same like in the `destinations` array in
 [List of destinations](#list-of-destinations) response.
+
+## Reset connection timeout
+
+Synapse makes federation requests to other homeservers. If a federation request fails,
+Synapse will mark the destination homeserver as offline, preventing any future requests
+to that server for a "cooldown" period. This period grows over time if the server
+continues to fail its responses
+([exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff)).
+
+Admins can cancel the cooldown period with this API.
+
+This API resets the retry timing for a specific remote server and tries to connect to
+the remote server again. It does not wait for the next `retry_interval`.
+The connection must have previously run into an error and `retry_last_ts`
+([Destination Details API](#destination-details-api)) must not be equal to `0`.
+
+The connection attempt is carried out in the background and can take a while
+even if the API already returns the http status 200.
+
+The API is:
+
+```
+POST /_synapse/admin/v1/federation/destinations/<destination>/reset_connection
+
+{}
+```
+
+**Parameters**
+
+The following parameters should be set in the URL:
+
+- `destination` - Name of the remote server.
diff --git a/setup.py b/setup.py
index e618ff898b..d0511c767f 100755
--- a/setup.py
+++ b/setup.py
@@ -150,7 +150,7 @@ setup(
     zip_safe=False,
     long_description=long_description,
     long_description_content_type="text/x-rst",
-    python_requires="~=3.6",
+    python_requires="~=3.7",
     entry_points={
         "console_scripts": [
             "synapse_homeserver = synapse.app.homeserver:main",
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 26bdfec33a..603dbb27e1 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -21,8 +21,8 @@ import os
 import sys
 
 # Check that we're not running on an unsupported Python version.
-if sys.version_info < (3, 6):
-    print("Synapse requires Python 3.6 or above.")
+if sys.version_info < (3, 7):
+    print("Synapse requires Python 3.7 or above.")
     sys.exit(1)
 
 # Twisted and canonicaljson will fail to import when this file is executed to
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 579adbbca0..e5ee03b79f 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -16,7 +16,6 @@ import atexit
 import gc
 import logging
 import os
-import platform
 import signal
 import socket
 import sys
@@ -468,16 +467,12 @@ async def start(hs: "HomeServer") -> None:
     # everything currently allocated are things that will be used for the
     # rest of time. Doing so means less work each GC (hopefully).
     #
-    # This only works on Python 3.7
-    if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
-        gc.collect()
-        gc.freeze()
+    gc.collect()
+    gc.freeze()
 
     # Speed up shutdowns by freezing all allocated objects. This moves everything
     # into the permanent generation and excludes them from the final GC.
-    # Unfortunately only works on Python 3.7
-    if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
-        atexit.register(gc.freeze)
+    atexit.register(gc.freeze)
 
 
 def setup_sentry(hs: "HomeServer") -> None:
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 38f3cf4d33..9acb3c0cc4 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -315,10 +315,11 @@ class EventBase(metaclass=abc.ABCMeta):
     redacts: DefaultDictProperty[Optional[str]] = DefaultDictProperty("redacts", None)
     room_id: DictProperty[str] = DictProperty("room_id")
     sender: DictProperty[str] = DictProperty("sender")
-    # TODO state_key should be Optional[str], this is generally asserted in Synapse
-    # by calling is_state() first (which ensures this), but it is hard (not possible?)
+    # TODO state_key should be Optional[str]. This is generally asserted in Synapse
+    # by calling is_state() first (which ensures it is not None), but it is hard (not possible?)
     # to properly annotate that calling is_state() asserts that state_key exists
-    # and is non-None.
+    # and is non-None. It would be better to replace such direct references with
+    # get_state_key() (and a check for None).
     state_key: DictProperty[str] = DictProperty("state_key")
     type: DictProperty[str] = DictProperty("type")
     user_id: DictProperty[str] = DictProperty("sender")
@@ -332,7 +333,11 @@ class EventBase(metaclass=abc.ABCMeta):
         return self.content["membership"]
 
     def is_state(self) -> bool:
-        return hasattr(self, "state_key") and self.state_key is not None
+        return self.get_state_key() is not None
+
+    def get_state_key(self) -> Optional[str]:
+        """Get the state key of this event, or None if it's not a state event"""
+        return self._dict.get("state_key")
 
     def get_dict(self) -> JsonDict:
         d = dict(self._dict)
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 0eab1aefd6..5833fee25f 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -163,7 +163,7 @@ class EventContext:
         return {
             "prev_state_id": prev_state_id,
             "event_type": event.type,
-            "event_state_key": event.state_key if event.is_state() else None,
+            "event_state_key": event.get_state_key(),
             "state_group": self._state_group,
             "state_group_before_event": self.state_group_before_event,
             "rejected": self.rejected,
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index cf86934968..4245573017 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -246,7 +246,9 @@ POWER_LEVELS_SCHEMA = {
 
 # This could return something newer than Draft 7, but that's the current "latest"
 # validator.
-def _create_power_level_validator() -> jsonschema.Draft7Validator:
+#
+# See https://github.com/python/typeshed/issues/7028 for the ignored return type.
+def _create_power_level_validator() -> jsonschema.Draft7Validator:  # type: ignore[valid-type]
     validator = jsonschema.validators.validator_for(POWER_LEVELS_SCHEMA)
 
     # by default jsonschema does not consider a frozendict to be an object so
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 77b936361a..db4fe2c798 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Dict, Iterable, List, Optional, Tuple, Type
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type
 
 from typing_extensions import Literal
 
@@ -36,17 +36,19 @@ from synapse.http.servlet import (
     parse_integer_from_args,
     parse_string_from_args,
 )
-from synapse.server import HomeServer
 from synapse.types import JsonDict, ThirdPartyInstanceID
 from synapse.util.ratelimitutils import FederationRateLimiter
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class TransportLayerServer(JsonResource):
     """Handles incoming federation HTTP requests"""
 
-    def __init__(self, hs: HomeServer, servlet_groups: Optional[List[str]] = None):
+    def __init__(self, hs: "HomeServer", servlet_groups: Optional[List[str]] = None):
         """Initialize the TransportLayerServer
 
         Will by default register all servlets. For custom behaviour, pass in
@@ -113,7 +115,7 @@ class PublicRoomList(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
@@ -203,7 +205,7 @@ class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
@@ -251,7 +253,7 @@ class OpenIdUserInfo(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
@@ -297,7 +299,7 @@ DEFAULT_SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
 
 
 def register_servlets(
-    hs: HomeServer,
+    hs: "HomeServer",
     resource: HttpServer,
     authenticator: Authenticator,
     ratelimiter: FederationRateLimiter,
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index da1fbf8b63..2ca7c05835 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -15,7 +15,7 @@
 import functools
 import logging
 import re
-from typing import Any, Awaitable, Callable, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple, cast
 
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.urls import FEDERATION_V1_PREFIX
@@ -29,11 +29,13 @@ from synapse.logging.opentracing import (
     start_active_span_follows_from,
     whitelisted_homeserver,
 )
-from synapse.server import HomeServer
 from synapse.types import JsonDict
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.stringutils import parse_and_validate_server_name
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -46,7 +48,7 @@ class NoAuthenticationError(AuthenticationError):
 
 
 class Authenticator:
-    def __init__(self, hs: HomeServer):
+    def __init__(self, hs: "HomeServer"):
         self._clock = hs.get_clock()
         self.keyring = hs.get_keyring()
         self.server_name = hs.hostname
@@ -114,11 +116,11 @@ class Authenticator:
         # alive
         retry_timings = await self.store.get_destination_retry_timings(origin)
         if retry_timings and retry_timings.retry_last_ts:
-            run_in_background(self._reset_retry_timings, origin)
+            run_in_background(self.reset_retry_timings, origin)
 
         return origin
 
-    async def _reset_retry_timings(self, origin: str) -> None:
+    async def reset_retry_timings(self, origin: str) -> None:
         try:
             logger.info("Marking origin %r as up", origin)
             await self.store.set_destination_retry_timings(origin, None, 0, 0)
@@ -227,7 +229,7 @@ class BaseFederationServlet:
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index beadfa422b..9c1ad5851f 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -12,7 +12,17 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 import logging
-from typing import Dict, List, Mapping, Optional, Sequence, Tuple, Type, Union
+from typing import (
+    TYPE_CHECKING,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    Tuple,
+    Type,
+    Union,
+)
 
 from typing_extensions import Literal
 
@@ -30,11 +40,13 @@ from synapse.http.servlet import (
     parse_string_from_args,
     parse_strings_from_args,
 )
-from synapse.server import HomeServer
 from synapse.types import JsonDict
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 issue_8631_logger = logging.getLogger("synapse.8631_debug")
 
@@ -47,7 +59,7 @@ class BaseFederationServerServlet(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
@@ -596,7 +608,7 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
@@ -670,7 +682,7 @@ class FederationRoomHierarchyServlet(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
@@ -706,7 +718,7 @@ class RoomComplexityServlet(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
diff --git a/synapse/federation/transport/server/groups_local.py b/synapse/federation/transport/server/groups_local.py
index a12cd18d58..496472e1dc 100644
--- a/synapse/federation/transport/server/groups_local.py
+++ b/synapse/federation/transport/server/groups_local.py
@@ -11,7 +11,7 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-from typing import Dict, List, Tuple, Type
+from typing import TYPE_CHECKING, Dict, List, Tuple, Type
 
 from synapse.api.errors import SynapseError
 from synapse.federation.transport.server._base import (
@@ -19,10 +19,12 @@ from synapse.federation.transport.server._base import (
     BaseFederationServlet,
 )
 from synapse.handlers.groups_local import GroupsLocalHandler
-from synapse.server import HomeServer
 from synapse.types import JsonDict, get_domain_from_id
 from synapse.util.ratelimitutils import FederationRateLimiter
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class BaseGroupsLocalServlet(BaseFederationServlet):
     """Abstract base class for federation servlet classes which provides a groups local handler.
@@ -32,7 +34,7 @@ class BaseGroupsLocalServlet(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
diff --git a/synapse/federation/transport/server/groups_server.py b/synapse/federation/transport/server/groups_server.py
index b30e92a5eb..851b50152e 100644
--- a/synapse/federation/transport/server/groups_server.py
+++ b/synapse/federation/transport/server/groups_server.py
@@ -11,7 +11,7 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-from typing import Dict, List, Tuple, Type
+from typing import TYPE_CHECKING, Dict, List, Tuple, Type
 
 from typing_extensions import Literal
 
@@ -22,10 +22,12 @@ from synapse.federation.transport.server._base import (
     BaseFederationServlet,
 )
 from synapse.http.servlet import parse_string_from_args
-from synapse.server import HomeServer
 from synapse.types import JsonDict, get_domain_from_id
 from synapse.util.ratelimitutils import FederationRateLimiter
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class BaseGroupsServerServlet(BaseFederationServlet):
     """Abstract base class for federation servlet classes which provides a groups server handler.
@@ -35,7 +37,7 @@ class BaseGroupsServerServlet(BaseFederationServlet):
 
     def __init__(
         self,
-        hs: HomeServer,
+        hs: "HomeServer",
         authenticator: Authenticator,
         ratelimiter: FederationRateLimiter,
         server_name: str,
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index bee62cf360..7a13d76a68 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -157,6 +157,9 @@ class DeactivateAccountHandler:
         # Mark the user as deactivated.
         await self.store.set_user_deactivated_status(user_id, True)
 
+        # Remove account data (including ignored users and push rules).
+        await self.store.purge_account_data_for_user(user_id)
+
         return identity_server_supports_unbinding
 
     async def _reject_pending_invites_for_user(self, user_id: str) -> None:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ffc6b748e8..7e2a892b63 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1619,7 +1619,7 @@ class SyncHandler:
         # TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead?
         ignored_account_data = (
             await self.store.get_global_account_data_by_type_for_user(
-                AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
+                user_id=user_id, data_type=AccountDataTypes.IGNORED_USER_LIST
             )
         )
 
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ca33b45cb2..743a7ffcb1 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -731,15 +731,24 @@ class SimpleHttpClient:
         # straight back in again
 
         try:
-            length = await make_deferred_yieldable(
-                read_body_with_max_size(response, output_stream, max_size)
-            )
+            d = read_body_with_max_size(response, output_stream, max_size)
+
+            # Ensure that the body is not read forever.
+            d = timeout_deferred(d, 30, self.hs.get_reactor())
+
+            length = await make_deferred_yieldable(d)
         except BodyExceededMaxSize:
             raise SynapseError(
                 HTTPStatus.BAD_GATEWAY,
                 "Requested file is too large > %r bytes" % (max_size,),
                 Codes.TOO_LARGE,
             )
+        except defer.TimeoutError:
+            raise SynapseError(
+                HTTPStatus.BAD_GATEWAY,
+                "Requested file took too long to download",
+                Codes.TOO_LARGE,
+            )
         except Exception as e:
             raise SynapseError(
                 HTTPStatus.BAD_GATEWAY, ("Failed to download remote body: %s" % e)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index d844fbb3b3..22b4606ae0 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -70,7 +70,7 @@ REQUIREMENTS = [
     "pyasn1>=0.1.9",
     "pyasn1-modules>=0.0.7",
     "bcrypt>=3.1.0",
-    "pillow>=4.3.0",
+    "pillow>=5.4.0",
     "sortedcontainers>=1.4.4",
     "pymacaroons>=0.13.0",
     "msgpack>=0.5.2",
@@ -107,7 +107,7 @@ CONDITIONAL_REQUIREMENTS = {
     # `systemd.journal.JournalHandler`, as is documented in
     # `contrib/systemd/log_config.yaml`.
     "systemd": ["systemd-python>=231"],
-    "url_preview": ["lxml>=3.5.0"],
+    "url_preview": ["lxml>=4.2.0"],
     "sentry": ["sentry-sdk>=0.7.2"],
     "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
     "jwt": ["pyjwt>=1.6.4"],
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 0f08372694..a72dad7464 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -52,8 +52,8 @@ class SlavedEventStore(
     EventPushActionsWorkerStore,
     StreamWorkerStore,
     StateGroupWorkerStore,
-    EventsWorkerStore,
     SignatureWorkerStore,
+    EventsWorkerStore,
     UserErasureWorkerStore,
     RelationsWorkerStore,
     BaseSlavedStore,
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 465e06772b..b1e49d51b7 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -41,7 +41,8 @@ from synapse.rest.admin.event_reports import (
     EventReportsRestServlet,
 )
 from synapse.rest.admin.federation import (
-    DestinationsRestServlet,
+    DestinationResetConnectionRestServlet,
+    DestinationRestServlet,
     ListDestinationsRestServlet,
 )
 from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
@@ -267,7 +268,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ListRegistrationTokensRestServlet(hs).register(http_server)
     NewRegistrationTokenRestServlet(hs).register(http_server)
     RegistrationTokenRestServlet(hs).register(http_server)
-    DestinationsRestServlet(hs).register(http_server)
+    DestinationResetConnectionRestServlet(hs).register(http_server)
+    DestinationRestServlet(hs).register(http_server)
     ListDestinationsRestServlet(hs).register(http_server)
 
     # Some servlets only get registered for the main process.
diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py
index 8cd3fa189e..0f33f9e4da 100644
--- a/synapse/rest/admin/federation.py
+++ b/synapse/rest/admin/federation.py
@@ -16,6 +16,7 @@ from http import HTTPStatus
 from typing import TYPE_CHECKING, Tuple
 
 from synapse.api.errors import Codes, NotFoundError, SynapseError
+from synapse.federation.transport.server import Authenticator
 from synapse.http.servlet import RestServlet, parse_integer, parse_string
 from synapse.http.site import SynapseRequest
 from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
@@ -90,7 +91,7 @@ class ListDestinationsRestServlet(RestServlet):
         return HTTPStatus.OK, response
 
 
-class DestinationsRestServlet(RestServlet):
+class DestinationRestServlet(RestServlet):
     """Get details of a destination.
     This needs user to have administrator access in Synapse.
 
@@ -145,3 +146,44 @@ class DestinationsRestServlet(RestServlet):
             }
 
         return HTTPStatus.OK, response
+
+
+class DestinationResetConnectionRestServlet(RestServlet):
+    """Reset destinations' connection timeouts and wake it up.
+    This needs user to have administrator access in Synapse.
+
+    POST /_synapse/admin/v1/federation/destinations/<destination>/reset_connection
+    {}
+
+    returns:
+        200 OK otherwise an error.
+    """
+
+    PATTERNS = admin_patterns(
+        "/federation/destinations/(?P<destination>[^/]+)/reset_connection$"
+    )
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastore()
+        self._authenticator = Authenticator(hs)
+
+    async def on_POST(
+        self, request: SynapseRequest, destination: str
+    ) -> Tuple[int, JsonDict]:
+        await assert_requester_is_admin(self._auth, request)
+
+        if not await self._store.is_destination_known(destination):
+            raise NotFoundError("Unknown destination")
+
+        retry_timings = await self._store.get_destination_retry_timings(destination)
+        if not (retry_timings and retry_timings.retry_last_ts):
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "The retry timing does not need to be reset for this destination.",
+            )
+
+        # reset timings and wake up
+        await self._authenticator.reset_retry_timings(destination)
+
+        return HTTPStatus.OK, {}
diff --git a/synapse/rest/client/account_data.py b/synapse/rest/client/account_data.py
index d1badbdf3b..58b8adbd32 100644
--- a/synapse/rest/client/account_data.py
+++ b/synapse/rest/client/account_data.py
@@ -66,7 +66,7 @@ class AccountDataServlet(RestServlet):
             raise AuthError(403, "Cannot get account data for other users.")
 
         event = await self.store.get_global_account_data_by_type_for_user(
-            account_data_type, user_id
+            user_id, account_data_type
         )
 
         if event is None:
diff --git a/synapse/rest/media/v1/preview_html.py b/synapse/rest/media/v1/preview_html.py
index 30b067dd42..872a9e72e8 100644
--- a/synapse/rest/media/v1/preview_html.py
+++ b/synapse/rest/media/v1/preview_html.py
@@ -321,14 +321,33 @@ def _iterate_over_text(
 
 
 def rebase_url(url: str, base: str) -> str:
-    base_parts = list(urlparse.urlparse(base))
+    """
+    Resolves a potentially relative `url` against an absolute `base` URL.
+
+    For example:
+
+        >>> rebase_url("subpage", "https://example.com/foo/")
+        'https://example.com/foo/subpage'
+        >>> rebase_url("sibling", "https://example.com/foo")
+        'https://example.com/sibling'
+        >>> rebase_url("/bar", "https://example.com/foo/")
+        'https://example.com/bar'
+        >>> rebase_url("https://alice.com/a/", "https://example.com/foo/")
+        'https://alice.com/a'
+    """
+    base_parts = urlparse.urlparse(base)
+    # Convert the parsed URL to a list for (potential) modification.
     url_parts = list(urlparse.urlparse(url))
-    if not url_parts[0]:  # fix up schema
-        url_parts[0] = base_parts[0] or "http"
-    if not url_parts[1]:  # fix up hostname
-        url_parts[1] = base_parts[1]
+    # Add a scheme, if one does not exist.
+    if not url_parts[0]:
+        url_parts[0] = base_parts.scheme or "http"
+    # Fix up the hostname, if this is not a data URL.
+    if url_parts[0] != "data" and not url_parts[1]:
+        url_parts[1] = base_parts.netloc
+        # If the path does not start with a /, nest it under the base path's last
+        # directory.
         if not url_parts[2].startswith("/"):
-            url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts[2]) + url_parts[2]
+            url_parts[2] = re.sub(r"/[^/]+$", "/", base_parts.path) + url_parts[2]
     return urlparse.urlunparse(url_parts)
 
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index e8881bc870..efd84ced8f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -21,8 +21,9 @@ import re
 import shutil
 import sys
 import traceback
-from typing import TYPE_CHECKING, Iterable, Optional, Tuple
+from typing import TYPE_CHECKING, BinaryIO, Iterable, Optional, Tuple
 from urllib import parse as urlparse
+from urllib.request import urlopen
 
 import attr
 
@@ -71,6 +72,17 @@ IMAGE_CACHE_EXPIRY_MS = 2 * ONE_DAY
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
+class DownloadResult:
+    length: int
+    uri: str
+    response_code: int
+    media_type: str
+    download_name: Optional[str]
+    expires: int
+    etag: Optional[str]
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class MediaInfo:
     """
     Information parsed from downloading media being previewed.
@@ -256,7 +268,7 @@ class PreviewUrlResource(DirectServeJsonResource):
         if oembed_url:
             url_to_download = oembed_url
 
-        media_info = await self._download_url(url_to_download, user)
+        media_info = await self._handle_url(url_to_download, user)
 
         logger.debug("got media_info of '%s'", media_info)
 
@@ -297,7 +309,9 @@ class PreviewUrlResource(DirectServeJsonResource):
                 oembed_url = self._oembed.autodiscover_from_html(tree)
                 og_from_oembed: JsonDict = {}
                 if oembed_url:
-                    oembed_info = await self._download_url(oembed_url, user)
+                    oembed_info = await self._handle_url(
+                        oembed_url, user, allow_data_urls=True
+                    )
                     (
                         og_from_oembed,
                         author_name,
@@ -367,7 +381,135 @@ class PreviewUrlResource(DirectServeJsonResource):
 
         return jsonog.encode("utf8")
 
-    async def _download_url(self, url: str, user: UserID) -> MediaInfo:
+    async def _download_url(self, url: str, output_stream: BinaryIO) -> DownloadResult:
+        """
+        Fetches a remote URL and parses the headers.
+
+        Args:
+             url: The URL to fetch.
+             output_stream: The stream to write the content to.
+
+        Returns:
+            A tuple of:
+                Media length, URL downloaded, the HTTP response code,
+                the media type, the downloaded file name, the number of
+                milliseconds the result is valid for, the etag header.
+        """
+
+        try:
+            logger.debug("Trying to get preview for url '%s'", url)
+            length, headers, uri, code = await self.client.get_file(
+                url,
+                output_stream=output_stream,
+                max_size=self.max_spider_size,
+                headers={"Accept-Language": self.url_preview_accept_language},
+            )
+        except SynapseError:
+            # Pass SynapseErrors through directly, so that the servlet
+            # handler will return a SynapseError to the client instead of
+            # blank data or a 500.
+            raise
+        except DNSLookupError:
+            # DNS lookup returned no results
+            # Note: This will also be the case if one of the resolved IP
+            # addresses is blacklisted
+            raise SynapseError(
+                502,
+                "DNS resolution failure during URL preview generation",
+                Codes.UNKNOWN,
+            )
+        except Exception as e:
+            # FIXME: pass through 404s and other error messages nicely
+            logger.warning("Error downloading %s: %r", url, e)
+
+            raise SynapseError(
+                500,
+                "Failed to download content: %s"
+                % (traceback.format_exception_only(sys.exc_info()[0], e),),
+                Codes.UNKNOWN,
+            )
+
+        if b"Content-Type" in headers:
+            media_type = headers[b"Content-Type"][0].decode("ascii")
+        else:
+            media_type = "application/octet-stream"
+
+        download_name = get_filename_from_headers(headers)
+
+        # FIXME: we should calculate a proper expiration based on the
+        # Cache-Control and Expire headers.  But for now, assume 1 hour.
+        expires = ONE_HOUR
+        etag = headers[b"ETag"][0].decode("ascii") if b"ETag" in headers else None
+
+        return DownloadResult(
+            length, uri, code, media_type, download_name, expires, etag
+        )
+
+    async def _parse_data_url(
+        self, url: str, output_stream: BinaryIO
+    ) -> DownloadResult:
+        """
+        Parses a data: URL.
+
+        Args:
+             url: The URL to parse.
+             output_stream: The stream to write the content to.
+
+        Returns:
+            A tuple of:
+                Media length, URL downloaded, the HTTP response code,
+                the media type, the downloaded file name, the number of
+                milliseconds the result is valid for, the etag header.
+        """
+
+        try:
+            logger.debug("Trying to parse data url '%s'", url)
+            with urlopen(url) as url_info:
+                # TODO Can this be more efficient.
+                output_stream.write(url_info.read())
+        except Exception as e:
+            logger.warning("Error parsing data: URL %s: %r", url, e)
+
+            raise SynapseError(
+                500,
+                "Failed to parse data URL: %s"
+                % (traceback.format_exception_only(sys.exc_info()[0], e),),
+                Codes.UNKNOWN,
+            )
+
+        return DownloadResult(
+            # Read back the length that has been written.
+            length=output_stream.tell(),
+            uri=url,
+            # If it was parsed, consider this a 200 OK.
+            response_code=200,
+            # urlopen shoves the media-type from the data URL into the content type
+            # header object.
+            media_type=url_info.headers.get_content_type(),
+            # Some features are not supported by data: URLs.
+            download_name=None,
+            expires=ONE_HOUR,
+            etag=None,
+        )
+
+    async def _handle_url(
+        self, url: str, user: UserID, allow_data_urls: bool = False
+    ) -> MediaInfo:
+        """
+        Fetches content from a URL and parses the result to generate a MediaInfo.
+
+        It uses the media storage provider to persist the fetched content and
+        stores the mapping into the database.
+
+        Args:
+             url: The URL to fetch.
+             user: The user who ahs requested this URL.
+             allow_data_urls: True if data URLs should be allowed.
+
+        Returns:
+            A MediaInfo object describing the fetched content.
+        """
+
         # TODO: we should probably honour robots.txt... except in practice
         # we're most likely being explicitly triggered by a human rather than a
         # bot, so are we really a robot?
@@ -377,61 +519,27 @@ class PreviewUrlResource(DirectServeJsonResource):
         file_info = FileInfo(server_name=None, file_id=file_id, url_cache=True)
 
         with self.media_storage.store_into_file(file_info) as (f, fname, finish):
-            try:
-                logger.debug("Trying to get preview for url '%s'", url)
-                length, headers, uri, code = await self.client.get_file(
-                    url,
-                    output_stream=f,
-                    max_size=self.max_spider_size,
-                    headers={"Accept-Language": self.url_preview_accept_language},
-                )
-            except SynapseError:
-                # Pass SynapseErrors through directly, so that the servlet
-                # handler will return a SynapseError to the client instead of
-                # blank data or a 500.
-                raise
-            except DNSLookupError:
-                # DNS lookup returned no results
-                # Note: This will also be the case if one of the resolved IP
-                # addresses is blacklisted
-                raise SynapseError(
-                    502,
-                    "DNS resolution failure during URL preview generation",
-                    Codes.UNKNOWN,
-                )
-            except Exception as e:
-                # FIXME: pass through 404s and other error messages nicely
-                logger.warning("Error downloading %s: %r", url, e)
-
-                raise SynapseError(
-                    500,
-                    "Failed to download content: %s"
-                    % (traceback.format_exception_only(sys.exc_info()[0], e),),
-                    Codes.UNKNOWN,
-                )
-            await finish()
+            if url.startswith("data:"):
+                if not allow_data_urls:
+                    raise SynapseError(
+                        500, "Previewing of data: URLs is forbidden", Codes.UNKNOWN
+                    )
 
-            if b"Content-Type" in headers:
-                media_type = headers[b"Content-Type"][0].decode("ascii")
+                download_result = await self._parse_data_url(url, f)
             else:
-                media_type = "application/octet-stream"
+                download_result = await self._download_url(url, f)
 
-            download_name = get_filename_from_headers(headers)
-
-            # FIXME: we should calculate a proper expiration based on the
-            # Cache-Control and Expire headers.  But for now, assume 1 hour.
-            expires = ONE_HOUR
-            etag = headers[b"ETag"][0].decode("ascii") if b"ETag" in headers else None
+            await finish()
 
         try:
             time_now_ms = self.clock.time_msec()
 
             await self.store.store_local_media(
                 media_id=file_id,
-                media_type=media_type,
+                media_type=download_result.media_type,
                 time_now_ms=time_now_ms,
-                upload_name=download_name,
-                media_length=length,
+                upload_name=download_result.download_name,
+                media_length=download_result.length,
                 user_id=user,
                 url_cache=url,
             )
@@ -444,16 +552,16 @@ class PreviewUrlResource(DirectServeJsonResource):
             raise
 
         return MediaInfo(
-            media_type=media_type,
-            media_length=length,
-            download_name=download_name,
+            media_type=download_result.media_type,
+            media_length=download_result.length,
+            download_name=download_result.download_name,
             created_ts_ms=time_now_ms,
             filesystem_id=file_id,
             filename=fname,
-            uri=uri,
-            response_code=code,
-            expires=expires,
-            etag=etag,
+            uri=download_result.uri,
+            response_code=download_result.response_code,
+            expires=download_result.expires,
+            etag=download_result.etag,
         )
 
     async def _precache_image_url(
@@ -474,8 +582,8 @@ class PreviewUrlResource(DirectServeJsonResource):
         # FIXME: it might be cleaner to use the same flow as the main /preview_url
         # request itself and benefit from the same caching etc.  But for now we
         # just rely on the caching on the master request to speed things up.
-        image_info = await self._download_url(
-            rebase_url(og["og:image"], media_info.uri), user
+        image_info = await self._handle_url(
+            rebase_url(og["og:image"], media_info.uri), user, allow_data_urls=True
         )
 
         if _is_media(image_info.media_type):
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index ef475e18c7..5bfa408f74 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -26,6 +26,7 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
     AbstractStreamIdGenerator,
@@ -44,7 +45,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-class AccountDataWorkerStore(CacheInvalidationWorkerStore):
+class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -158,9 +159,9 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
             "get_account_data_for_user", get_account_data_for_user_txn
         )
 
-    @cached(num_args=2, max_entries=5000)
+    @cached(num_args=2, max_entries=5000, tree=True)
     async def get_global_account_data_by_type_for_user(
-        self, data_type: str, user_id: str
+        self, user_id: str, data_type: str
     ) -> Optional[JsonDict]:
         """
         Returns:
@@ -179,7 +180,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
         else:
             return None
 
-    @cached(num_args=2)
+    @cached(num_args=2, tree=True)
     async def get_account_data_for_room(
         self, user_id: str, room_id: str
     ) -> Dict[str, JsonDict]:
@@ -210,7 +211,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
             "get_account_data_for_room", get_account_data_for_room_txn
         )
 
-    @cached(num_args=3, max_entries=5000)
+    @cached(num_args=3, max_entries=5000, tree=True)
     async def get_account_data_for_room_and_type(
         self, user_id: str, room_id: str, account_data_type: str
     ) -> Optional[JsonDict]:
@@ -392,7 +393,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
             for row in rows:
                 if not row.room_id:
                     self.get_global_account_data_by_type_for_user.invalidate(
-                        (row.data_type, row.user_id)
+                        (row.user_id, row.data_type)
                     )
                 self.get_account_data_for_user.invalidate((row.user_id,))
                 self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
@@ -476,7 +477,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
             self._account_data_stream_cache.entity_has_changed(user_id, next_id)
             self.get_account_data_for_user.invalidate((user_id,))
             self.get_global_account_data_by_type_for_user.invalidate(
-                (account_data_type, user_id)
+                (user_id, account_data_type)
             )
 
         return self._account_data_id_gen.get_current_token()
@@ -546,6 +547,74 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore):
         for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
             self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
 
+    async def purge_account_data_for_user(self, user_id: str) -> None:
+        """
+        Removes the account data for a user.
+
+        This is intended to be used upon user deactivation and also removes any
+        derived information from account data (e.g. push rules and ignored users).
+
+        Args:
+            user_id: The user ID to remove data for.
+        """
+
+        def purge_account_data_for_user_txn(txn: LoggingTransaction) -> None:
+            # Purge from the primary account_data tables.
+            self.db_pool.simple_delete_txn(
+                txn, table="account_data", keyvalues={"user_id": user_id}
+            )
+
+            self.db_pool.simple_delete_txn(
+                txn, table="room_account_data", keyvalues={"user_id": user_id}
+            )
+
+            # Purge from ignored_users where this user is the ignorer.
+            # N.B. We don't purge where this user is the ignoree, because that
+            #      interferes with other users' account data.
+            #      It's also not this user's data to delete!
+            self.db_pool.simple_delete_txn(
+                txn, table="ignored_users", keyvalues={"ignorer_user_id": user_id}
+            )
+
+            # Remove the push rules
+            self.db_pool.simple_delete_txn(
+                txn, table="push_rules", keyvalues={"user_name": user_id}
+            )
+            self.db_pool.simple_delete_txn(
+                txn, table="push_rules_enable", keyvalues={"user_name": user_id}
+            )
+            self.db_pool.simple_delete_txn(
+                txn, table="push_rules_stream", keyvalues={"user_id": user_id}
+            )
+
+            # Invalidate caches as appropriate
+            self._invalidate_cache_and_stream(
+                txn, self.get_account_data_for_room_and_type, (user_id,)
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_account_data_for_user, (user_id,)
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_global_account_data_by_type_for_user, (user_id,)
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_account_data_for_room, (user_id,)
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_push_rules_for_user, (user_id,)
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_push_rules_enabled_for_user, (user_id,)
+            )
+            # This user might be contained in the ignored_by cache for other users,
+            # so we have to invalidate it all.
+            self._invalidate_all_cache_and_stream(txn, self.ignored_by)
+
+        await self.db_pool.runInteraction(
+            "purge_account_data_for_user_txn",
+            purge_account_data_for_user_txn,
+        )
+
 
 class AccountDataStore(AccountDataWorkerStore):
     pass
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index a556f17dac..ca71f073fc 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -65,7 +65,7 @@ class _NoChainCoverIndex(Exception):
         super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
 
 
-class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
+class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBaseStore):
     def __init__(
         self,
         database: DatabasePool,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1ae1ebe108..b7554154ac 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1389,6 +1389,8 @@ class PersistEventsStore:
                 "received_ts",
                 "sender",
                 "contains_url",
+                "state_key",
+                "rejection_reason",
             ),
             values=(
                 (
@@ -1405,8 +1407,10 @@ class PersistEventsStore:
                     self._clock.time_msec(),
                     event.sender,
                     "url" in event.content and isinstance(event.content["url"], str),
+                    event.get_state_key(),
+                    context.rejected or None,
                 )
-                for event, _ in events_and_contexts
+                for event, context in events_and_contexts
             ),
         )
 
@@ -1456,6 +1460,7 @@ class PersistEventsStore:
         for event, context in events_and_contexts:
             if context.rejected:
                 # Insert the event_id into the rejections table
+                # (events.rejection_reason has already been done)
                 self._store_rejections_txn(txn, event.event_id, context.rejected)
                 to_remove.add(event)
 
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 91b0576b85..e87a8fb85d 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -390,7 +390,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "event_search",
             "events",
             "group_rooms",
-            "public_room_list_stream",
             "receipts_graph",
             "receipts_linearized",
             "room_aliases",
diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py
index 3201623fe4..0518b8b910 100644
--- a/synapse/storage/databases/main/signatures.py
+++ b/synapse/storage/databases/main/signatures.py
@@ -12,16 +12,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Dict, Iterable, List, Tuple
+from typing import Collection, Dict, List, Tuple
 
 from unpaddedbase64 import encode_base64
 
-from synapse.storage._base import SQLBaseStore
-from synapse.storage.types import Cursor
+from synapse.crypto.event_signing import compute_event_reference_hash
+from synapse.storage.databases.main.events_worker import (
+    EventRedactBehaviour,
+    EventsWorkerStore,
+)
 from synapse.util.caches.descriptors import cached, cachedList
 
 
-class SignatureWorkerStore(SQLBaseStore):
+class SignatureWorkerStore(EventsWorkerStore):
     @cached()
     def get_event_reference_hash(self, event_id):
         # This is a dummy function to allow get_event_reference_hashes
@@ -32,7 +35,7 @@ class SignatureWorkerStore(SQLBaseStore):
         cached_method_name="get_event_reference_hash", list_name="event_ids", num_args=1
     )
     async def get_event_reference_hashes(
-        self, event_ids: Iterable[str]
+        self, event_ids: Collection[str]
     ) -> Dict[str, Dict[str, bytes]]:
         """Get all hashes for given events.
 
@@ -41,18 +44,27 @@ class SignatureWorkerStore(SQLBaseStore):
 
         Returns:
              A mapping of event ID to a mapping of algorithm to hash.
+             Returns an empty dict for a given event id if that event is unknown.
         """
+        events = await self.get_events(
+            event_ids,
+            redact_behaviour=EventRedactBehaviour.AS_IS,
+            allow_rejected=True,
+        )
 
-        def f(txn):
-            return {
-                event_id: self._get_event_reference_hashes_txn(txn, event_id)
-                for event_id in event_ids
-            }
+        hashes: Dict[str, Dict[str, bytes]] = {}
+        for event_id in event_ids:
+            event = events.get(event_id)
+            if event is None:
+                hashes[event_id] = {}
+            else:
+                ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
+                hashes[event_id] = {ref_alg: ref_hash_bytes}
 
-        return await self.db_pool.runInteraction("get_event_reference_hashes", f)
+        return hashes
 
     async def add_event_hashes(
-        self, event_ids: Iterable[str]
+        self, event_ids: Collection[str]
     ) -> List[Tuple[str, Dict[str, str]]]:
         """
 
@@ -70,24 +82,6 @@ class SignatureWorkerStore(SQLBaseStore):
 
         return list(encoded_hashes.items())
 
-    def _get_event_reference_hashes_txn(
-        self, txn: Cursor, event_id: str
-    ) -> Dict[str, bytes]:
-        """Get all the hashes for a given PDU.
-        Args:
-            txn:
-            event_id: Id for the Event.
-        Returns:
-            A mapping of algorithm -> hash.
-        """
-        query = (
-            "SELECT algorithm, hash"
-            " FROM event_reference_hashes"
-            " WHERE event_id = ?"
-        )
-        txn.execute(query, (event_id,))
-        return {k: v for k, v in txn}
-
 
 class SignatureStore(SignatureWorkerStore):
     """Persistence for event signatures and hashes"""
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 30f948a0f7..b3d71f661c 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -46,8 +46,8 @@ class PostgresEngine(BaseDatabaseEngine):
         self._version = db_conn.server_version
 
         # Are we on a supported PostgreSQL version?
-        if not allow_outdated_version and self._version < 90600:
-            raise RuntimeError("Synapse requires PostgreSQL 9.6 or above.")
+        if not allow_outdated_version and self._version < 100000:
+            raise RuntimeError("Synapse requires PostgreSQL 10 or above.")
 
         with db_conn.cursor() as txn:
             txn.execute("SHOW SERVER_ENCODING")
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 2a3d47185a..7b21c1b96d 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 67  # remember to update the list below when updating
+SCHEMA_VERSION = 68  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -53,11 +53,18 @@ Changes in SCHEMA_VERSION = 66:
 
 Changes in SCHEMA_VERSION = 67:
     - state_events.prev_state is no longer written to.
+
+Changes in SCHEMA_VERSION = 68:
+    - event_reference_hashes is no longer read.
+    - `events` has `state_key` and `rejection_reason` columns, which are populated for
+      new events.
 """
 
 
 SCHEMA_COMPAT_VERSION = (
-    61  # 61: Remove unused tables `user_stats_historical` and `room_stats_historical`
+    # we now have `state_key` columns in both `events` and `state_events`, so
+    # now incompatible with synapses wth SCHEMA_VERSION < 66.
+    66
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql b/synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql
new file mode 100644
index 0000000000..1eb8de9907
--- /dev/null
+++ b/synapse/storage/schema/main/delta/67/01drop_public_room_list_stream.sql
@@ -0,0 +1,18 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this table is unused as of Synapse 1.41
+DROP TABLE public_room_list_stream;
+
diff --git a/synapse/storage/schema/main/delta/68/01event_columns.sql b/synapse/storage/schema/main/delta/68/01event_columns.sql
new file mode 100644
index 0000000000..7c072f972e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/68/01event_columns.sql
@@ -0,0 +1,26 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add new colums to the `events` table which will (one day) make the `state_events`
+-- and `rejections` tables redundant.
+
+ALTER TABLE events
+  -- if this event is a state event, its state key
+  ADD COLUMN state_key TEXT DEFAULT NULL;
+
+
+ALTER TABLE events
+  -- if this event was rejected, the reason it was rejected.
+  ADD COLUMN rejection_reason TEXT DEFAULT NULL;
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 17532059e9..1b970ce479 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -87,7 +87,7 @@ async def filter_events_for_client(
     )
 
     ignore_dict_content = await storage.main.get_global_account_data_by_type_for_user(
-        AccountDataTypes.IGNORED_USER_LIST, user_id
+        user_id, AccountDataTypes.IGNORED_USER_LIST
     )
 
     ignore_list: FrozenSet[str] = frozenset()
diff --git a/tests/handlers/test_deactivate_account.py b/tests/handlers/test_deactivate_account.py
new file mode 100644
index 0000000000..3da597768c
--- /dev/null
+++ b/tests/handlers/test_deactivate_account.py
@@ -0,0 +1,219 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from http import HTTPStatus
+from typing import Any, Dict
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.api.constants import AccountDataTypes
+from synapse.push.rulekinds import PRIORITY_CLASS_MAP
+from synapse.rest import admin
+from synapse.rest.client import account, login
+from synapse.server import HomeServer
+from synapse.util import Clock
+
+from tests.unittest import HomeserverTestCase
+
+
+class DeactivateAccountTestCase(HomeserverTestCase):
+    servlets = [
+        login.register_servlets,
+        admin.register_servlets,
+        account.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self._store = hs.get_datastore()
+
+        self.user = self.register_user("user", "pass")
+        self.token = self.login("user", "pass")
+
+    def _deactivate_my_account(self):
+        """
+        Deactivates the account `self.user` using `self.token` and asserts
+        that it returns a 200 success code.
+        """
+        req = self.get_success(
+            self.make_request(
+                "POST",
+                "account/deactivate",
+                {
+                    "auth": {
+                        "type": "m.login.password",
+                        "user": self.user,
+                        "password": "pass",
+                    },
+                    "erase": True,
+                },
+                access_token=self.token,
+            )
+        )
+        self.assertEqual(req.code, HTTPStatus.OK, req)
+
+    def test_global_account_data_deleted_upon_deactivation(self) -> None:
+        """
+        Tests that global account data is removed upon deactivation.
+        """
+        # Add some account data
+        self.get_success(
+            self._store.add_account_data_for_user(
+                self.user,
+                AccountDataTypes.DIRECT,
+                {"@someone:remote": ["!somewhere:remote"]},
+            )
+        )
+
+        # Check that we actually added some.
+        self.assertIsNotNone(
+            self.get_success(
+                self._store.get_global_account_data_by_type_for_user(
+                    self.user, AccountDataTypes.DIRECT
+                )
+            ),
+        )
+
+        # Request the deactivation of our account
+        self._deactivate_my_account()
+
+        # Check that the account data does not persist.
+        self.assertIsNone(
+            self.get_success(
+                self._store.get_global_account_data_by_type_for_user(
+                    self.user, AccountDataTypes.DIRECT
+                )
+            ),
+        )
+
+    def test_room_account_data_deleted_upon_deactivation(self) -> None:
+        """
+        Tests that room account data is removed upon deactivation.
+        """
+        room_id = "!room:test"
+
+        # Add some room account data
+        self.get_success(
+            self._store.add_account_data_to_room(
+                self.user,
+                room_id,
+                "m.fully_read",
+                {"event_id": "$aaaa:test"},
+            )
+        )
+
+        # Check that we actually added some.
+        self.assertIsNotNone(
+            self.get_success(
+                self._store.get_account_data_for_room_and_type(
+                    self.user, room_id, "m.fully_read"
+                )
+            ),
+        )
+
+        # Request the deactivation of our account
+        self._deactivate_my_account()
+
+        # Check that the account data does not persist.
+        self.assertIsNone(
+            self.get_success(
+                self._store.get_account_data_for_room_and_type(
+                    self.user, room_id, "m.fully_read"
+                )
+            ),
+        )
+
+    def _is_custom_rule(self, push_rule: Dict[str, Any]) -> bool:
+        """
+        Default rules start with a dot: such as .m.rule and .im.vector.
+        This function returns true iff a rule is custom (not default).
+        """
+        return "/." not in push_rule["rule_id"]
+
+    def test_push_rules_deleted_upon_account_deactivation(self) -> None:
+        """
+        Push rules are a special case of account data.
+        They are stored separately but get sent to the client as account data in /sync.
+        This tests that deactivating a user deletes push rules along with the rest
+        of their account data.
+        """
+
+        # Add a push rule
+        self.get_success(
+            self._store.add_push_rule(
+                self.user,
+                "personal.override.rule1",
+                PRIORITY_CLASS_MAP["override"],
+                [],
+                [],
+            )
+        )
+
+        # Test the rule exists
+        push_rules = self.get_success(self._store.get_push_rules_for_user(self.user))
+        # Filter out default rules; we don't care
+        push_rules = list(filter(self._is_custom_rule, push_rules))
+        # Check our rule made it
+        self.assertEqual(
+            push_rules,
+            [
+                {
+                    "user_name": "@user:test",
+                    "rule_id": "personal.override.rule1",
+                    "priority_class": 5,
+                    "priority": 0,
+                    "conditions": [],
+                    "actions": [],
+                    "default": False,
+                }
+            ],
+            push_rules,
+        )
+
+        # Request the deactivation of our account
+        self._deactivate_my_account()
+
+        push_rules = self.get_success(self._store.get_push_rules_for_user(self.user))
+        # Filter out default rules; we don't care
+        push_rules = list(filter(self._is_custom_rule, push_rules))
+        # Check our rule no longer exists
+        self.assertEqual(push_rules, [], push_rules)
+
+    def test_ignored_users_deleted_upon_deactivation(self) -> None:
+        """
+        Ignored users are a special case of account data.
+        They get denormalised into the `ignored_users` table upon being stored as
+        account data.
+        Test that a user's list of ignored users is deleted upon deactivation.
+        """
+
+        # Add an ignored user
+        self.get_success(
+            self._store.add_account_data_for_user(
+                self.user,
+                AccountDataTypes.IGNORED_USER_LIST,
+                {"ignored_users": {"@sheltie:test": {}}},
+            )
+        )
+
+        # Test the user is ignored
+        self.assertEqual(
+            self.get_success(self._store.ignored_by("@sheltie:test")), {self.user}
+        )
+
+        # Request the deactivation of our account
+        self._deactivate_my_account()
+
+        # Test the user is no longer ignored by the user that was deactivated
+        self.assertEqual(
+            self.get_success(self._store.ignored_by("@sheltie:test")), set()
+        )
diff --git a/tests/replication/slave/storage/test_account_data.py b/tests/replication/slave/storage/test_account_data.py
index 43e3248703..1524087c43 100644
--- a/tests/replication/slave/storage/test_account_data.py
+++ b/tests/replication/slave/storage/test_account_data.py
@@ -30,7 +30,7 @@ class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase):
         )
         self.replicate()
         self.check(
-            "get_global_account_data_by_type_for_user", [TYPE, USER_ID], {"a": 1}
+            "get_global_account_data_by_type_for_user", [USER_ID, TYPE], {"a": 1}
         )
 
         self.get_success(
@@ -38,5 +38,5 @@ class SlavedAccountDataStoreTestCase(BaseSlavedStoreTestCase):
         )
         self.replicate()
         self.check(
-            "get_global_account_data_by_type_for_user", [TYPE, USER_ID], {"a": 2}
+            "get_global_account_data_by_type_for_user", [USER_ID, TYPE], {"a": 2}
         )
diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py
index b70350b6f1..e2d3cff2a3 100644
--- a/tests/rest/admin/test_federation.py
+++ b/tests/rest/admin/test_federation.py
@@ -43,11 +43,15 @@ class FederationTestCase(unittest.HomeserverTestCase):
 
     @parameterized.expand(
         [
-            ("/_synapse/admin/v1/federation/destinations",),
-            ("/_synapse/admin/v1/federation/destinations/dummy",),
+            ("GET", "/_synapse/admin/v1/federation/destinations"),
+            ("GET", "/_synapse/admin/v1/federation/destinations/dummy"),
+            (
+                "POST",
+                "/_synapse/admin/v1/federation/destinations/dummy/reset_connection",
+            ),
         ]
     )
-    def test_requester_is_no_admin(self, url: str) -> None:
+    def test_requester_is_no_admin(self, method: str, url: str) -> None:
         """
         If the user is not a server admin, an error 403 is returned.
         """
@@ -56,7 +60,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         other_user_tok = self.login("user", "pass")
 
         channel = self.make_request(
-            "GET",
+            method,
             url,
             content={},
             access_token=other_user_tok,
@@ -120,6 +124,16 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
         self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
 
+        # invalid destination
+        channel = self.make_request(
+            "POST",
+            self.url + "/dummy/reset_connection",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+
     def test_limit(self) -> None:
         """
         Testing list of destinations with limit
@@ -444,6 +458,39 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self.assertIsNone(channel.json_body["failure_ts"])
         self.assertIsNone(channel.json_body["last_successful_stream_ordering"])
 
+    def test_destination_reset_connection(self) -> None:
+        """Reset timeouts and wake up destination."""
+        self._create_destination("sub0.example.com", 100, 100, 100)
+
+        channel = self.make_request(
+            "POST",
+            self.url + "/sub0.example.com/reset_connection",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+
+        retry_timings = self.get_success(
+            self.store.get_destination_retry_timings("sub0.example.com")
+        )
+        self.assertIsNone(retry_timings)
+
+    def test_destination_reset_connection_not_required(self) -> None:
+        """Try to reset timeouts of a destination with no timeouts and get an error."""
+        self._create_destination("sub0.example.com", None, 0, 0)
+
+        channel = self.make_request(
+            "POST",
+            self.url + "/sub0.example.com/reset_connection",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+        self.assertEqual(
+            "The retry timing does not need to be reset for this destination.",
+            channel.json_body["error"],
+        )
+
     def _create_destination(
         self,
         destination: str,
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 3495a0366a..23da0ad736 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -2468,7 +2468,6 @@ PURGE_TABLES = [
     "event_search",
     "events",
     "group_rooms",
-    "public_room_list_stream",
     "receipts_graph",
     "receipts_linearized",
     "room_aliases",
diff --git a/tests/test_preview.py b/tests/rest/media/v1/test_html_preview.py
index 46e02f483f..a4b57e3d1f 100644
--- a/tests/test_preview.py
+++ b/tests/rest/media/v1/test_html_preview.py
@@ -16,10 +16,11 @@ from synapse.rest.media.v1.preview_html import (
     _get_html_media_encodings,
     decode_body,
     parse_html_to_open_graph,
+    rebase_url,
     summarize_paragraphs,
 )
 
-from . import unittest
+from tests import unittest
 
 try:
     import lxml
@@ -447,3 +448,34 @@ class MediaEncodingTestCase(unittest.TestCase):
             'text/html; charset="invalid"',
         )
         self.assertEqual(list(encodings), ["utf-8", "cp1252"])
+
+
+class RebaseUrlTestCase(unittest.TestCase):
+    def test_relative(self):
+        """Relative URLs should be resolved based on the context of the base URL."""
+        self.assertEqual(
+            rebase_url("subpage", "https://example.com/foo/"),
+            "https://example.com/foo/subpage",
+        )
+        self.assertEqual(
+            rebase_url("sibling", "https://example.com/foo"),
+            "https://example.com/sibling",
+        )
+        self.assertEqual(
+            rebase_url("/bar", "https://example.com/foo/"),
+            "https://example.com/bar",
+        )
+
+    def test_absolute(self):
+        """Absolute URLs should not be modified."""
+        self.assertEqual(
+            rebase_url("https://alice.com/a/", "https://example.com/foo/"),
+            "https://alice.com/a/",
+        )
+
+    def test_data(self):
+        """Data URLs should not be modified."""
+        self.assertEqual(
+            rebase_url("data:,Hello%2C%20World%21", "https://example.com/foo/"),
+            "data:,Hello%2C%20World%21",
+        )
diff --git a/tests/rest/media/v1/test_url_preview.py b/tests/rest/media/v1/test_url_preview.py
index 16e904f15b..53f6186213 100644
--- a/tests/rest/media/v1/test_url_preview.py
+++ b/tests/rest/media/v1/test_url_preview.py
@@ -12,9 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import base64
 import json
 import os
 import re
+from urllib.parse import urlencode
 
 from twisted.internet._resolver import HostResolution
 from twisted.internet.address import IPv4Address, IPv6Address
@@ -23,6 +25,7 @@ from twisted.test.proto_helpers import AccumulatingProtocol
 
 from synapse.config.oembed import OEmbedEndpointConfig
 from synapse.rest.media.v1.preview_url_resource import IMAGE_CACHE_EXPIRY_MS
+from synapse.types import JsonDict
 from synapse.util.stringutils import parse_and_validate_mxc_uri
 
 from tests import unittest
@@ -142,6 +145,14 @@ class URLPreviewTests(unittest.HomeserverTestCase):
     def create_test_resource(self):
         return self.hs.get_media_repository_resource()
 
+    def _assert_small_png(self, json_body: JsonDict) -> None:
+        """Assert properties from the SMALL_PNG test image."""
+        self.assertTrue(json_body["og:image"].startswith("mxc://"))
+        self.assertEqual(json_body["og:image:height"], 1)
+        self.assertEqual(json_body["og:image:width"], 1)
+        self.assertEqual(json_body["og:image:type"], "image/png")
+        self.assertEqual(json_body["matrix:image:size"], 67)
+
     def test_cache_returns_correct_type(self):
         self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")]
 
@@ -569,6 +580,66 @@ class URLPreviewTests(unittest.HomeserverTestCase):
             server.data,
         )
 
+    def test_data_url(self):
+        """
+        Requesting to preview a data URL is not supported.
+        """
+        self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")]
+
+        data = base64.b64encode(SMALL_PNG).decode()
+
+        query_params = urlencode(
+            {
+                "url": f'<html><head><img src="data:image/png;base64,{data}" /></head></html>'
+            }
+        )
+
+        channel = self.make_request(
+            "GET",
+            f"preview_url?{query_params}",
+            shorthand=False,
+        )
+        self.pump()
+
+        self.assertEqual(channel.code, 500)
+
+    def test_inline_data_url(self):
+        """
+        An inline image (as a data URL) should be parsed properly.
+        """
+        self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")]
+
+        data = base64.b64encode(SMALL_PNG)
+
+        end_content = (
+            b"<html><head>" b'<img src="data:image/png;base64,%s" />' b"</head></html>"
+        ) % (data,)
+
+        channel = self.make_request(
+            "GET",
+            "preview_url?url=http://matrix.org",
+            shorthand=False,
+            await_result=False,
+        )
+        self.pump()
+
+        client = self.reactor.tcpClients[0][2].buildProtocol(None)
+        server = AccumulatingProtocol()
+        server.makeConnection(FakeTransport(client, self.reactor))
+        client.makeConnection(FakeTransport(server, self.reactor))
+        client.dataReceived(
+            (
+                b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
+                b'Content-Type: text/html; charset="utf8"\r\n\r\n'
+            )
+            % (len(end_content),)
+            + end_content
+        )
+
+        self.pump()
+        self.assertEqual(channel.code, 200)
+        self._assert_small_png(channel.json_body)
+
     def test_oembed_photo(self):
         """Test an oEmbed endpoint which returns a 'photo' type which redirects the preview to a new URL."""
         self.lookups["publish.twitter.com"] = [(IPv4Address, "10.1.2.3")]
@@ -626,10 +697,7 @@ class URLPreviewTests(unittest.HomeserverTestCase):
         self.assertEqual(channel.code, 200)
         body = channel.json_body
         self.assertEqual(body["og:url"], "http://twitter.com/matrixdotorg/status/12345")
-        self.assertTrue(body["og:image"].startswith("mxc://"))
-        self.assertEqual(body["og:image:height"], 1)
-        self.assertEqual(body["og:image:width"], 1)
-        self.assertEqual(body["og:image:type"], "image/png")
+        self._assert_small_png(body)
 
     def test_oembed_rich(self):
         """Test an oEmbed endpoint which returns HTML content via the 'rich' type."""
@@ -820,10 +888,7 @@ class URLPreviewTests(unittest.HomeserverTestCase):
         self.assertEqual(
             body["og:url"], "http://www.twitter.com/matrixdotorg/status/12345"
         )
-        self.assertTrue(body["og:image"].startswith("mxc://"))
-        self.assertEqual(body["og:image:height"], 1)
-        self.assertEqual(body["og:image:width"], 1)
-        self.assertEqual(body["og:image:type"], "image/png")
+        self._assert_small_png(body)
 
     def _download_image(self):
         """Downloads an image into the URL cache.
diff --git a/tests/server.py b/tests/server.py
index a0cd14ea45..82990c2eb9 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -313,7 +313,7 @@ def make_request(
     req = request(channel, site)
     req.content = BytesIO(content)
     # Twisted expects to be at the end of the content when parsing the request.
-    req.content.seek(SEEK_END)
+    req.content.seek(0, SEEK_END)
 
     if access_token:
         req.requestHeaders.addRawHeader(
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index 7b7f6c349e..e3273a93f9 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -19,6 +19,7 @@ from twisted.trial import unittest
 from synapse.api.constants import EventTypes
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
 from synapse.rest import admin
 from synapse.rest.client import login, room
 from synapse.storage.databases.main.events import _LinkMap
@@ -391,7 +392,9 @@ class EventChainStoreTestCase(HomeserverTestCase):
         def _persist(txn):
             # We need to persist the events to the events and state_events
             # tables.
-            persist_events_store._store_event_txn(txn, [(e, {}) for e in events])
+            persist_events_store._store_event_txn(
+                txn, [(e, EventContext()) for e in events]
+            )
 
             # Actually call the function that calculates the auth chain stuff.
             persist_events_store._persist_event_auth_chain_txn(txn, events)
diff --git a/tox.ini b/tox.ini
index 2ffca14b22..32679e9106 100644
--- a/tox.ini
+++ b/tox.ini
@@ -117,8 +117,7 @@ usedevelop=true
 skip_install = true
 usedevelop = false
 deps =
-    # Old automat version for Twisted
-    Automat == 0.3.0
+    Automat == 0.8.0
     lxml
     {[base]deps}