summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/8488.misc1
-rw-r--r--changelog.d/8503.misc1
-rw-r--r--changelog.d/8517.bugfix1
-rw-r--r--changelog.d/8526.doc1
-rw-r--r--changelog.d/8527.bugfix1
-rw-r--r--changelog.d/8529.doc1
-rw-r--r--changelog.d/8530.bugfix1
-rw-r--r--changelog.d/8536.bugfix1
-rw-r--r--changelog.d/8537.misc1
-rw-r--r--changelog.d/8542.misc1
-rw-r--r--docs/manhole.md46
-rw-r--r--docs/message_retention_policies.md34
-rw-r--r--synapse/app/generic_worker.py4
-rw-r--r--synapse/events/builder.py21
-rw-r--r--synapse/events/validator.py3
-rw-r--r--synapse/federation/send_queue.py2
-rw-r--r--synapse/federation/sender/__init__.py9
-rw-r--r--synapse/handlers/appservice.py11
-rw-r--r--synapse/handlers/message.py30
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/handlers/room_member.py59
-rw-r--r--synapse/notifier.py6
-rw-r--r--synapse/push/emailpusher.py8
-rw-r--r--synapse/push/httppusher.py8
-rw-r--r--synapse/push/pusherpool.py10
-rw-r--r--synapse/rest/client/v1/profile.py4
-rw-r--r--synapse/storage/database.py99
-rw-r--r--synapse/storage/databases/main/events.py4
-rw-r--r--synapse/storage/databases/main/keys.py5
-rw-r--r--synapse/storage/databases/main/metrics.py11
-rw-r--r--synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql18
-rw-r--r--synapse/storage/databases/main/transactions.py76
-rw-r--r--synapse/storage/databases/main/user_directory.py45
-rw-r--r--synapse/storage/util/id_generators.py7
-rw-r--r--tests/handlers/test_appservice.py13
-rw-r--r--tests/handlers/test_message.py1
-rw-r--r--tests/handlers/test_presence.py2
-rw-r--r--tests/replication/test_federation_sender_shard.py2
-rw-r--r--tests/replication/test_sharded_event_persister.py217
-rw-r--r--tests/storage/test_redaction.py4
-rw-r--r--tests/unittest.py32
41 files changed, 615 insertions, 187 deletions
diff --git a/changelog.d/8488.misc b/changelog.d/8488.misc
new file mode 100644
index 0000000000..237cb3b311
--- /dev/null
+++ b/changelog.d/8488.misc
@@ -0,0 +1 @@
+Allow events to be sent to clients sooner when using sharded event persisters.
diff --git a/changelog.d/8503.misc b/changelog.d/8503.misc
new file mode 100644
index 0000000000..edb1be8aa8
--- /dev/null
+++ b/changelog.d/8503.misc
@@ -0,0 +1 @@
+Add user agent to user_daily_visits table.
diff --git a/changelog.d/8517.bugfix b/changelog.d/8517.bugfix
new file mode 100644
index 0000000000..1ab623c59f
--- /dev/null
+++ b/changelog.d/8517.bugfix
@@ -0,0 +1 @@
+Fix error code for `/profile/{userId}/displayname` to be `M_BAD_JSON`.
diff --git a/changelog.d/8526.doc b/changelog.d/8526.doc
new file mode 100644
index 0000000000..cbf48680c1
--- /dev/null
+++ b/changelog.d/8526.doc
@@ -0,0 +1 @@
+Added note about docker in manhole.md regarding which ip address to bind to. Contributed by @Maquis196.
diff --git a/changelog.d/8527.bugfix b/changelog.d/8527.bugfix
new file mode 100644
index 0000000000..727e0ba299
--- /dev/null
+++ b/changelog.d/8527.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.7.0 that could cause Synapse to insert values from non-state `m.room.retention` events into the `room_retention` database table.
diff --git a/changelog.d/8529.doc b/changelog.d/8529.doc
new file mode 100644
index 0000000000..6e710e6527
--- /dev/null
+++ b/changelog.d/8529.doc
@@ -0,0 +1 @@
+Document the new behaviour of the `allowed_lifetime_min` and `allowed_lifetime_max` settings in the room retention configuration.
diff --git a/changelog.d/8530.bugfix b/changelog.d/8530.bugfix
new file mode 100644
index 0000000000..443d88424e
--- /dev/null
+++ b/changelog.d/8530.bugfix
@@ -0,0 +1 @@
+Fix rare bug where sending an event would fail due to a racey assertion.
diff --git a/changelog.d/8536.bugfix b/changelog.d/8536.bugfix
new file mode 100644
index 0000000000..8d238cc008
--- /dev/null
+++ b/changelog.d/8536.bugfix
@@ -0,0 +1 @@
+Fix not sending events over federation when using sharded event writers.
diff --git a/changelog.d/8537.misc b/changelog.d/8537.misc
new file mode 100644
index 0000000000..26309b5b93
--- /dev/null
+++ b/changelog.d/8537.misc
@@ -0,0 +1 @@
+Factor out common code between `RoomMemberHandler._locally_reject_invite` and `EventCreationHandler.create_event`.
diff --git a/changelog.d/8542.misc b/changelog.d/8542.misc
new file mode 100644
index 0000000000..63149fd9b9
--- /dev/null
+++ b/changelog.d/8542.misc
@@ -0,0 +1 @@
+Improve database performance by executing more queries without starting transactions.
diff --git a/docs/manhole.md b/docs/manhole.md
index 75b6ae40e0..37d1d7823c 100644
--- a/docs/manhole.md
+++ b/docs/manhole.md
@@ -5,22 +5,54 @@ The "manhole" allows server administrators to access a Python shell on a running
 Synapse installation. This is a very powerful mechanism for administration and
 debugging.
 
+**_Security Warning_**
+
+Note that this will give administrative access to synapse to **all users** with
+shell access to the server. It should therefore **not** be enabled in
+environments where untrusted users have shell access.
+
+***
+
 To enable it, first uncomment the `manhole` listener configuration in
-`homeserver.yaml`:
+`homeserver.yaml`. The configuration is slightly different if you're using docker.
+
+#### Docker config
+
+If you are using Docker, set `bind_addresses` to `['0.0.0.0']` as shown:
 
 ```yaml
 listeners:
   - port: 9000
-    bind_addresses: ['::1', '127.0.0.1']
+    bind_addresses: ['0.0.0.0']
     type: manhole
 ```
 
-(`bind_addresses` in the above is important: it ensures that access to the
-manhole is only possible for local users).
+When using `docker run` to start the server, you will then need to change the command to the following to include the
+`manhole` port forwarding. The `-p 127.0.0.1:9000:9000` below is important: it 
+ensures that access to the `manhole` is only possible for local users.
 
-Note that this will give administrative access to synapse to **all users** with
-shell access to the server. It should therefore **not** be enabled in
-environments where untrusted users have shell access.
+```bash
+docker run -d --name synapse \
+    --mount type=volume,src=synapse-data,dst=/data \
+    -p 8008:8008 \
+    -p 127.0.0.1:9000:9000 \
+    matrixdotorg/synapse:latest
+```
+
+#### Native config
+
+If you are not using docker, set `bind_addresses` to `['::1', '127.0.0.1']` as shown.
+The `bind_addresses` in the example below is important: it ensures that access to the
+`manhole` is only possible for local users).
+
+```yaml
+listeners:
+  - port: 9000
+    bind_addresses: ['::1', '127.0.0.1']
+    type: manhole
+```
+
+#### Accessing synapse manhole
 
 Then restart synapse, and point an ssh client at port 9000 on localhost, using
 the username `matrix`:
diff --git a/docs/message_retention_policies.md b/docs/message_retention_policies.md
index 1dd60bdad9..75d2028e17 100644
--- a/docs/message_retention_policies.md
+++ b/docs/message_retention_policies.md
@@ -136,24 +136,34 @@ the server's database.
 
 ### Lifetime limits
 
-**Note: this feature is mainly useful within a closed federation or on
-servers that don't federate, because there currently is no way to
-enforce these limits in an open federation.**
-
-Server admins can restrict the values their local users are allowed to
-use for both `min_lifetime` and `max_lifetime`. These limits can be
-defined as such in the `retention` section of the configuration file:
+Server admins can set limits on the values of `max_lifetime` to use when
+purging old events in a room. These limits can be defined as such in the
+`retention` section of the configuration file:
 
 ```yaml
   allowed_lifetime_min: 1d
   allowed_lifetime_max: 1y
 ```
 
-Here, `allowed_lifetime_min` is the lowest value a local user can set
-for both `min_lifetime` and `max_lifetime`, and `allowed_lifetime_max`
-is the highest value. Both parameters are optional (e.g. setting
-`allowed_lifetime_min` but not `allowed_lifetime_max` only enforces a
-minimum and no maximum).
+The limits are considered when running purge jobs. If necessary, the
+effective value of `max_lifetime` will be brought between
+`allowed_lifetime_min` and `allowed_lifetime_max` (inclusive).
+This means that, if the value of `max_lifetime` defined in the room's state
+is lower than `allowed_lifetime_min`, the value of `allowed_lifetime_min`
+will be used instead. Likewise, if the value of `max_lifetime` is higher
+than `allowed_lifetime_max`, the value of `allowed_lifetime_max` will be
+used instead.
+
+In the example above, we ensure Synapse never deletes events that are less
+than one day old, and that it always deletes events that are over a year
+old.
+
+If a default policy is set, and its `max_lifetime` value is lower than
+`allowed_lifetime_min` or higher than `allowed_lifetime_max`, the same
+process applies.
+
+Both parameters are optional; if one is omitted Synapse won't use it to
+adjust the effective value of `max_lifetime`.
 
 Like other settings in this section, these parameters can be expressed
 either as a duration or as a number of milliseconds.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d53181deb1..1b511890aa 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -790,10 +790,6 @@ class FederationSenderHandler:
             send_queue.process_rows_for_federation(self.federation_sender, rows)
             await self.update_token(token)
 
-        # We also need to poke the federation sender when new events happen
-        elif stream_name == "events":
-            self.federation_sender.notify_new_events(token)
-
         # ... and when new receipts happen
         elif stream_name == ReceiptsStream.NAME:
             await self._on_new_receipts(rows)
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index b6c47be646..df4f950fec 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -97,32 +97,37 @@ class EventBuilder:
     def is_state(self):
         return self._state_key is not None
 
-    async def build(self, prev_event_ids: List[str]) -> EventBase:
+    async def build(
+        self, prev_event_ids: List[str], auth_event_ids: Optional[List[str]]
+    ) -> EventBase:
         """Transform into a fully signed and hashed event
 
         Args:
             prev_event_ids: The event IDs to use as the prev events
+            auth_event_ids: The event IDs to use as the auth events.
+                Should normally be set to None, which will cause them to be calculated
+                based on the room state at the prev_events.
 
         Returns:
             The signed and hashed event.
         """
-
-        state_ids = await self._state.get_current_state_ids(
-            self.room_id, prev_event_ids
-        )
-        auth_ids = self._auth.compute_auth_events(self, state_ids)
+        if auth_event_ids is None:
+            state_ids = await self._state.get_current_state_ids(
+                self.room_id, prev_event_ids
+            )
+            auth_event_ids = self._auth.compute_auth_events(self, state_ids)
 
         format_version = self.room_version.event_format
         if format_version == EventFormatVersions.V1:
             # The types of auth/prev events changes between event versions.
             auth_events = await self._store.add_event_hashes(
-                auth_ids
+                auth_event_ids
             )  # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
             prev_events = await self._store.add_event_hashes(
                 prev_event_ids
             )  # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
         else:
-            auth_events = auth_ids
+            auth_events = auth_event_ids
             prev_events = prev_event_ids
 
         old_depth = await self._store.get_max_depth_of(prev_event_ids)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 9df35b54ba..5f9af8529b 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -83,6 +83,9 @@ class EventValidator:
         Args:
             event (FrozenEvent): The event to validate.
         """
+        if not event.is_state():
+            raise SynapseError(code=400, msg="must be a state event")
+
         min_lifetime = event.content.get("min_lifetime")
         max_lifetime = event.content.get("max_lifetime")
 
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 8e46957d15..5f1bf492c1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -188,7 +188,7 @@ class FederationRemoteSendQueue:
             for key in keys[:i]:
                 del self.edus[key]
 
-    def notify_new_events(self, current_id):
+    def notify_new_events(self, max_token):
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
         # stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index e33b29a42c..604cfd1935 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -40,7 +40,7 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure, measure_func
 
 logger = logging.getLogger(__name__)
@@ -154,10 +154,15 @@ class FederationSender:
             self._per_destination_queues[destination] = queue
         return queue
 
-    def notify_new_events(self, current_id: int) -> None:
+    def notify_new_events(self, max_token: RoomStreamToken) -> None:
         """This gets called when we have some new events we might want to
         send out to other servers.
         """
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        current_id = max_token.stream
+
         self._last_poked_id = max(current_id, self._last_poked_id)
 
         if self._is_processing:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9d4e87dad6..c8d5e58035 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -27,6 +27,7 @@ from synapse.metrics import (
     event_processing_loop_room_count,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -47,15 +48,17 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
-    async def notify_interested_services(self, current_id):
+    async def notify_interested_services(self, max_token: RoomStreamToken):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
         prolonged length of time.
-
-        Args:
-            current_id(int): The current maximum ID.
         """
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        current_id = max_token.stream
+
         services = self.store.get_app_services()
         if not services or not self.notify_appservices:
             return
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c52e6824d3..f18f882596 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -437,9 +437,9 @@ class EventCreationHandler:
         self,
         requester: Requester,
         event_dict: dict,
-        token_id: Optional[str] = None,
         txn_id: Optional[str] = None,
         prev_event_ids: Optional[List[str]] = None,
+        auth_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
     ) -> Tuple[EventBase, EventContext]:
         """
@@ -453,13 +453,18 @@ class EventCreationHandler:
         Args:
             requester
             event_dict: An entire event
-            token_id
             txn_id
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
 
                 If None, they will be requested from the database.
+
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
+
             require_consent: Whether to check if the requester has
                 consented to the privacy policy.
         Raises:
@@ -511,14 +516,17 @@ class EventCreationHandler:
         if require_consent and not is_exempt:
             await self.assert_accepted_privacy_policy(requester)
 
-        if token_id is not None:
-            builder.internal_metadata.token_id = token_id
+        if requester.access_token_id is not None:
+            builder.internal_metadata.token_id = requester.access_token_id
 
         if txn_id is not None:
             builder.internal_metadata.txn_id = txn_id
 
         event, context = await self.create_new_client_event(
-            builder=builder, requester=requester, prev_event_ids=prev_event_ids,
+            builder=builder,
+            requester=requester,
+            prev_event_ids=prev_event_ids,
+            auth_event_ids=auth_event_ids,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -726,7 +734,7 @@ class EventCreationHandler:
                     return event, event.internal_metadata.stream_ordering
 
             event, context = await self.create_event(
-                requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
+                requester, event_dict, txn_id=txn_id
             )
 
             assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@@ -757,6 +765,7 @@ class EventCreationHandler:
         builder: EventBuilder,
         requester: Optional[Requester] = None,
         prev_event_ids: Optional[List[str]] = None,
+        auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
@@ -769,6 +778,11 @@ class EventCreationHandler:
 
                 If None, they will be requested from the database.
 
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
+
         Returns:
             Tuple of created event, context
         """
@@ -790,7 +804,9 @@ class EventCreationHandler:
             builder.type == EventTypes.Create or len(prev_event_ids) > 0
         ), "Attempting to create an event with no prev_events"
 
-        event = await builder.build(prev_event_ids=prev_event_ids)
+        event = await builder.build(
+            prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
+        )
         context = await self.state.compute_event_context(event)
         if requester:
             context.app_service = requester.app_service
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 93ed51063a..ec300d8877 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -214,7 +214,6 @@ class RoomCreationHandler(BaseHandler):
                     "replacement_room": new_room_id,
                 },
             },
-            token_id=requester.access_token_id,
         )
         old_room_version = await self.store.get_room_version_id(old_room_id)
         await self.auth.check_from_context(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0080eeaf8d..ec784030e9 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,12 +17,10 @@ import abc
 import logging
 import random
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
-
-from unpaddedbase64 import encode_base64
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
 
 from synapse import types
-from synapse.api.constants import MAX_DEPTH, AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -31,12 +29,8 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.api.ratelimiting import Ratelimiter
-from synapse.api.room_versions import EventFormatVersions
-from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase
-from synapse.events.builder import create_local_event_from_event_dict
 from synapse.events.snapshot import EventContext
-from synapse.events.validator import EventValidator
 from synapse.storage.roommember import RoomsForUser
 from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
 from synapse.util.async_helpers import Linearizer
@@ -193,7 +187,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 # For backwards compatibility:
                 "membership": membership,
             },
-            token_id=requester.access_token_id,
             txn_id=txn_id,
             prev_event_ids=prev_event_ids,
             require_consent=require_consent,
@@ -1133,31 +1126,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         room_id = invite_event.room_id
         target_user = invite_event.state_key
-        room_version = await self.store.get_room_version(room_id)
 
         content["membership"] = Membership.LEAVE
 
-        # the auth events for the new event are the same as that of the invite, plus
-        # the invite itself.
-        #
-        # the prev_events are just the invite.
-        invite_hash = invite_event.event_id  # type: Union[str, Tuple]
-        if room_version.event_format == EventFormatVersions.V1:
-            alg, h = compute_event_reference_hash(invite_event)
-            invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
-
-        auth_events = tuple(invite_event.auth_events) + (invite_hash,)
-        prev_events = (invite_hash,)
-
-        # we cap depth of generated events, to ensure that they are not
-        # rejected by other servers (and so that they can be persisted in
-        # the db)
-        depth = min(invite_event.depth + 1, MAX_DEPTH)
-
         event_dict = {
-            "depth": depth,
-            "auth_events": auth_events,
-            "prev_events": prev_events,
             "type": EventTypes.Member,
             "room_id": room_id,
             "sender": target_user,
@@ -1165,24 +1137,23 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             "state_key": target_user,
         }
 
-        event = create_local_event_from_event_dict(
-            clock=self.clock,
-            hostname=self.hs.hostname,
-            signing_key=self.hs.signing_key,
-            room_version=room_version,
-            event_dict=event_dict,
+        # the auth events for the new event are the same as that of the invite, plus
+        # the invite itself.
+        #
+        # the prev_events are just the invite.
+        prev_event_ids = [invite_event.event_id]
+        auth_event_ids = invite_event.auth_event_ids() + prev_event_ids
+
+        event, context = await self.event_creation_handler.create_event(
+            requester,
+            event_dict,
+            txn_id=txn_id,
+            prev_event_ids=prev_event_ids,
+            auth_event_ids=auth_event_ids,
         )
         event.internal_metadata.outlier = True
         event.internal_metadata.out_of_band_membership = True
-        if txn_id is not None:
-            event.internal_metadata.txn_id = txn_id
-        if requester.access_token_id is not None:
-            event.internal_metadata.token_id = requester.access_token_id
-
-        EventValidator().validate_new(event, self.config)
 
-        context = await self.state_handler.compute_event_context(event)
-        context.app_service = requester.app_service
         result_event = await self.event_creation_handler.handle_new_client_event(
             requester, event, context, extra_users=[UserID.from_string(target_user)],
         )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 13adeed01e..51c830c91e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -319,19 +319,19 @@ class Notifier:
         )
 
         if self.federation_sender:
-            self.federation_sender.notify_new_events(max_room_stream_token.stream)
+            self.federation_sender.notify_new_events(max_room_stream_token)
 
     async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
         try:
             await self.appservice_handler.notify_interested_services(
-                max_room_stream_token.stream
+                max_room_stream_token
             )
         except Exception:
             logger.exception("Error notifying application services of event")
 
     async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
-            await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
+            await self._pusher_pool.on_new_notifications(max_room_stream_token)
         except Exception:
             logger.exception("Error pusher pool of event")
 
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 28bd8ab748..c6763971ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,6 +18,7 @@ import logging
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
 
 logger = logging.getLogger(__name__)
 
@@ -91,7 +92,12 @@ class EmailPusher:
                 pass
             self.timed_call = None
 
-    def on_new_notifications(self, max_stream_ordering):
+    def on_new_notifications(self, max_token: RoomStreamToken):
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_ordering = max_token.stream
+
         if self.max_stream_ordering:
             self.max_stream_ordering = max(
                 max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 26706bf3e1..793d0db2d9 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes
 from synapse.logging import opentracing
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
+from synapse.types import RoomStreamToken
 
 from . import push_rule_evaluator, push_tools
 
@@ -114,7 +115,12 @@ class HttpPusher:
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_notifications(self, max_stream_ordering):
+    def on_new_notifications(self, max_token: RoomStreamToken):
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_ordering = max_token.stream
+
         self.max_stream_ordering = max(
             max_stream_ordering, self.max_stream_ordering or 0
         )
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 76150e117b..0080c68ce2 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -24,6 +24,7 @@ from synapse.push import PusherConfigException
 from synapse.push.emailpusher import EmailPusher
 from synapse.push.httppusher import HttpPusher
 from synapse.push.pusher import PusherFactory
+from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 
 if TYPE_CHECKING:
@@ -186,11 +187,16 @@ class PusherPool:
                 )
                 await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    async def on_new_notifications(self, max_stream_id: int):
+    async def on_new_notifications(self, max_token: RoomStreamToken):
         if not self.pushers:
             # nothing to do here.
             return
 
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_id = max_token.stream
+
         if max_stream_id < self._last_room_stream_id_seen:
             # Nothing to do
             return
@@ -214,7 +220,7 @@ class PusherPool:
 
                 if u in self.pushers:
                     for p in self.pushers[u].values():
-                        p.on_new_notifications(max_stream_id)
+                        p.on_new_notifications(max_token)
 
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index b686cd671f..e7fcd2b1ff 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -59,7 +59,9 @@ class ProfileDisplaynameRestServlet(RestServlet):
         try:
             new_name = content["displayname"]
         except Exception:
-            return 400, "Unable to parse name"
+            raise SynapseError(
+                code=400, msg="Unable to parse name", errcode=Codes.BAD_JSON,
+            )
 
         await self.profile_handler.set_displayname(user, requester, new_name, is_admin)
 
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0ba3a025cf..763722d6bc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -893,6 +893,12 @@ class DatabasePool:
         attempts = 0
         while True:
             try:
+                # We can autocommit if we are going to use native upserts
+                autocommit = (
+                    self.engine.can_native_upsert
+                    and table not in self._unsafe_to_upsert_tables
+                )
+
                 return await self.runInteraction(
                     desc,
                     self.simple_upsert_txn,
@@ -901,6 +907,7 @@ class DatabasePool:
                     values,
                     insertion_values,
                     lock=lock,
+                    db_autocommit=autocommit,
                 )
             except self.engine.module.IntegrityError as e:
                 attempts += 1
@@ -1063,6 +1070,43 @@ class DatabasePool:
         )
         txn.execute(sql, list(allvalues.values()))
 
+    async def simple_upsert_many(
+        self,
+        table: str,
+        key_names: Collection[str],
+        key_values: Collection[Iterable[Any]],
+        value_names: Collection[str],
+        value_values: Iterable[Iterable[Any]],
+        desc: str,
+    ) -> None:
+        """
+        Upsert, many times.
+
+        Args:
+            table: The table to upsert into
+            key_names: The key column names.
+            key_values: A list of each row's key column values.
+            value_names: The value column names
+            value_values: A list of each row's value column values.
+                Ignored if value_names is empty.
+        """
+
+        # We can autocommit if we are going to use native upserts
+        autocommit = (
+            self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
+        )
+
+        return await self.runInteraction(
+            desc,
+            self.simple_upsert_many_txn,
+            table,
+            key_names,
+            key_values,
+            value_names,
+            value_values,
+            db_autocommit=autocommit,
+        )
+
     def simple_upsert_many_txn(
         self,
         txn: LoggingTransaction,
@@ -1214,7 +1258,13 @@ class DatabasePool:
             desc: description of the transaction, for logging and metrics
         """
         return await self.runInteraction(
-            desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
+            desc,
+            self.simple_select_one_txn,
+            table,
+            keyvalues,
+            retcols,
+            allow_none,
+            db_autocommit=True,
         )
 
     @overload
@@ -1265,6 +1315,7 @@ class DatabasePool:
             keyvalues,
             retcol,
             allow_none=allow_none,
+            db_autocommit=True,
         )
 
     @overload
@@ -1346,7 +1397,12 @@ class DatabasePool:
             Results in a list
         """
         return await self.runInteraction(
-            desc, self.simple_select_onecol_txn, table, keyvalues, retcol
+            desc,
+            self.simple_select_onecol_txn,
+            table,
+            keyvalues,
+            retcol,
+            db_autocommit=True,
         )
 
     async def simple_select_list(
@@ -1371,7 +1427,12 @@ class DatabasePool:
             A list of dictionaries.
         """
         return await self.runInteraction(
-            desc, self.simple_select_list_txn, table, keyvalues, retcols
+            desc,
+            self.simple_select_list_txn,
+            table,
+            keyvalues,
+            retcols,
+            db_autocommit=True,
         )
 
     @classmethod
@@ -1450,6 +1511,7 @@ class DatabasePool:
                 chunk,
                 keyvalues,
                 retcols,
+                db_autocommit=True,
             )
 
             results.extend(rows)
@@ -1548,7 +1610,12 @@ class DatabasePool:
             desc: description of the transaction, for logging and metrics
         """
         await self.runInteraction(
-            desc, self.simple_update_one_txn, table, keyvalues, updatevalues
+            desc,
+            self.simple_update_one_txn,
+            table,
+            keyvalues,
+            updatevalues,
+            db_autocommit=True,
         )
 
     @classmethod
@@ -1607,7 +1674,9 @@ class DatabasePool:
             keyvalues: dict of column names and values to select the row with
             desc: description of the transaction, for logging and metrics
         """
-        await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
+        await self.runInteraction(
+            desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True,
+        )
 
     @staticmethod
     def simple_delete_one_txn(
@@ -1646,7 +1715,9 @@ class DatabasePool:
         Returns:
             The number of deleted rows.
         """
-        return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
+        return await self.runInteraction(
+            desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
+        )
 
     @staticmethod
     def simple_delete_txn(
@@ -1694,7 +1765,13 @@ class DatabasePool:
             Number rows deleted
         """
         return await self.runInteraction(
-            desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
+            desc,
+            self.simple_delete_many_txn,
+            table,
+            column,
+            iterable,
+            keyvalues,
+            db_autocommit=True,
         )
 
     @staticmethod
@@ -1860,7 +1937,13 @@ class DatabasePool:
         """
 
         return await self.runInteraction(
-            desc, self.simple_search_list_txn, table, term, col, retcols
+            desc,
+            self.simple_search_list_txn,
+            table,
+            term,
+            col,
+            retcols,
+            db_autocommit=True,
         )
 
     @classmethod
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fdb17745f6..ba3b1769b0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1270,6 +1270,10 @@ class PersistEventsStore:
             )
 
     def _store_retention_policy_for_room_txn(self, txn, event):
+        if not event.is_state():
+            logger.debug("Ignoring non-state m.room.retention event")
+            return
+
         if hasattr(event, "content") and (
             "min_lifetime" in event.content or "max_lifetime" in event.content
         ):
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ad43bb05ab..f8f4bb9b3f 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -122,9 +122,7 @@ class KeyStore(SQLBaseStore):
             # param, which is itself the 2-tuple (server_name, key_id).
             invalidations.append((server_name, key_id))
 
-        await self.db_pool.runInteraction(
-            "store_server_verify_keys",
-            self.db_pool.simple_upsert_many_txn,
+        await self.db_pool.simple_upsert_many(
             table="server_signature_keys",
             key_names=("server_name", "key_id"),
             key_values=key_values,
@@ -135,6 +133,7 @@ class KeyStore(SQLBaseStore):
                 "verify_key",
             ),
             value_values=value_values,
+            desc="store_server_verify_keys",
         )
 
         invalidate = self._get_server_verify_key.invalidate
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 0acf0617ca..79b01d16f9 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -281,9 +281,14 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
             a_day_in_milliseconds = 24 * 60 * 60 * 1000
             now = self._clock.time_msec()
 
+            # A note on user_agent. Technically a given device can have multiple
+            # user agents, so we need to decide which one to pick. We could have handled this
+            # in number of ways, but given that we don't _that_ much have gone for MAX()
+            # For more details of the other options considered see
+            # https://github.com/matrix-org/synapse/pull/8503#discussion_r502306111
             sql = """
-                INSERT INTO user_daily_visits (user_id, device_id, timestamp)
-                    SELECT u.user_id, u.device_id, ?
+                INSERT INTO user_daily_visits (user_id, device_id, timestamp, user_agent)
+                    SELECT u.user_id, u.device_id, ?, MAX(u.user_agent)
                     FROM user_ips AS u
                     LEFT JOIN (
                       SELECT user_id, device_id, timestamp FROM user_daily_visits
@@ -294,7 +299,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
                     WHERE last_seen > ? AND last_seen <= ?
                     AND udv.timestamp IS NULL AND users.is_guest=0
                     AND users.appservice_id IS NULL
-                    GROUP BY u.user_id, u.device_id
+                    GROUP BY u.user_id, u.device_id, u.user_agent
             """
 
             # This means that the day has rolled over but there could still
diff --git a/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
new file mode 100644
index 0000000000..b0b5dcddce
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- Add new column to user_daily_visits to track user agent
+ALTER TABLE user_daily_visits
+    ADD COLUMN user_agent TEXT;
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 7d46090267..59207cadd4 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -208,42 +208,56 @@ class TransactionStore(TransactionWorkerStore):
         """
 
         self._destination_retry_cache.pop(destination, None)
-        return await self.db_pool.runInteraction(
-            "set_destination_retry_timings",
-            self._set_destination_retry_timings,
-            destination,
-            failure_ts,
-            retry_last_ts,
-            retry_interval,
-        )
+        if self.database_engine.can_native_upsert:
+            return await self.db_pool.runInteraction(
+                "set_destination_retry_timings",
+                self._set_destination_retry_timings_native,
+                destination,
+                failure_ts,
+                retry_last_ts,
+                retry_interval,
+                db_autocommit=True,  # Safe as its a single upsert
+            )
+        else:
+            return await self.db_pool.runInteraction(
+                "set_destination_retry_timings",
+                self._set_destination_retry_timings_emulated,
+                destination,
+                failure_ts,
+                retry_last_ts,
+                retry_interval,
+            )
 
-    def _set_destination_retry_timings(
+    def _set_destination_retry_timings_native(
         self, txn, destination, failure_ts, retry_last_ts, retry_interval
     ):
+        assert self.database_engine.can_native_upsert
+
+        # Upsert retry time interval if retry_interval is zero (i.e. we're
+        # resetting it) or greater than the existing retry interval.
+        #
+        # WARNING: This is executed in autocommit, so we shouldn't add any more
+        # SQL calls in here (without being very careful).
+        sql = """
+            INSERT INTO destinations (
+                destination, failure_ts, retry_last_ts, retry_interval
+            )
+                VALUES (?, ?, ?, ?)
+            ON CONFLICT (destination) DO UPDATE SET
+                    failure_ts = EXCLUDED.failure_ts,
+                    retry_last_ts = EXCLUDED.retry_last_ts,
+                    retry_interval = EXCLUDED.retry_interval
+                WHERE
+                    EXCLUDED.retry_interval = 0
+                    OR destinations.retry_interval IS NULL
+                    OR destinations.retry_interval < EXCLUDED.retry_interval
+        """
 
-        if self.database_engine.can_native_upsert:
-            # Upsert retry time interval if retry_interval is zero (i.e. we're
-            # resetting it) or greater than the existing retry interval.
-
-            sql = """
-                INSERT INTO destinations (
-                    destination, failure_ts, retry_last_ts, retry_interval
-                )
-                    VALUES (?, ?, ?, ?)
-                ON CONFLICT (destination) DO UPDATE SET
-                        failure_ts = EXCLUDED.failure_ts,
-                        retry_last_ts = EXCLUDED.retry_last_ts,
-                        retry_interval = EXCLUDED.retry_interval
-                    WHERE
-                        EXCLUDED.retry_interval = 0
-                        OR destinations.retry_interval IS NULL
-                        OR destinations.retry_interval < EXCLUDED.retry_interval
-            """
-
-            txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
-
-            return
+        txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
 
+    def _set_destination_retry_timings_emulated(
+        self, txn, destination, failure_ts, retry_last_ts, retry_interval
+    ):
         self.database_engine.lock_table(txn, "destinations")
 
         # We need to be careful here as the data may have changed from under us
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 5a390ff2f6..d87ceec6da 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -480,21 +480,16 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             user_id_tuples: iterable of 2-tuple of user IDs.
         """
 
-        def _add_users_who_share_room_txn(txn):
-            self.db_pool.simple_upsert_many_txn(
-                txn,
-                table="users_who_share_private_rooms",
-                key_names=["user_id", "other_user_id", "room_id"],
-                key_values=[
-                    (user_id, other_user_id, room_id)
-                    for user_id, other_user_id in user_id_tuples
-                ],
-                value_names=(),
-                value_values=None,
-            )
-
-        await self.db_pool.runInteraction(
-            "add_users_who_share_room", _add_users_who_share_room_txn
+        await self.db_pool.simple_upsert_many(
+            table="users_who_share_private_rooms",
+            key_names=["user_id", "other_user_id", "room_id"],
+            key_values=[
+                (user_id, other_user_id, room_id)
+                for user_id, other_user_id in user_id_tuples
+            ],
+            value_names=(),
+            value_values=None,
+            desc="add_users_who_share_room",
         )
 
     async def add_users_in_public_rooms(
@@ -508,19 +503,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             user_ids
         """
 
-        def _add_users_in_public_rooms_txn(txn):
-
-            self.db_pool.simple_upsert_many_txn(
-                txn,
-                table="users_in_public_rooms",
-                key_names=["user_id", "room_id"],
-                key_values=[(user_id, room_id) for user_id in user_ids],
-                value_names=(),
-                value_values=None,
-            )
-
-        await self.db_pool.runInteraction(
-            "add_users_in_public_rooms", _add_users_in_public_rooms_txn
+        await self.db_pool.simple_upsert_many(
+            table="users_in_public_rooms",
+            key_names=["user_id", "room_id"],
+            key_values=[(user_id, room_id) for user_id in user_ids],
+            value_names=(),
+            value_values=None,
+            desc="add_users_in_public_rooms",
         )
 
     async def delete_all_from_user_dir(self) -> None:
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 3d8da48f2d..02d71302ea 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -618,14 +618,7 @@ class _MultiWriterCtxManager:
             db_autocommit=True,
         )
 
-        # Assert the fetched ID is actually greater than any ID we've already
-        # seen. If not, then the sequence and table have got out of sync
-        # somehow.
         with self.id_gen._lock:
-            assert max(self.id_gen._current_positions.values(), default=0) < min(
-                self.stream_ids
-            )
-
             self.id_gen._unfinished_ids.update(self.stream_ids)
 
         if self.multiple_ids is None:
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 2a0b7c1b56..ee4f3da31c 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -18,6 +18,7 @@ from mock import Mock
 from twisted.internet import defer
 
 from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.types import RoomStreamToken
 
 from tests.test_utils import make_awaitable
 from tests.utils import MockClock
@@ -61,7 +62,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             defer.succeed((0, [event])),
             defer.succeed((0, [])),
         ]
-        yield defer.ensureDeferred(self.handler.notify_interested_services(0))
+        yield defer.ensureDeferred(
+            self.handler.notify_interested_services(RoomStreamToken(None, 0))
+        )
         self.mock_scheduler.submit_event_for_as.assert_called_once_with(
             interested_service, event
         )
@@ -80,7 +83,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             defer.succeed((0, [event])),
             defer.succeed((0, [])),
         ]
-        yield defer.ensureDeferred(self.handler.notify_interested_services(0))
+        yield defer.ensureDeferred(
+            self.handler.notify_interested_services(RoomStreamToken(None, 0))
+        )
         self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
 
     @defer.inlineCallbacks
@@ -97,7 +102,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             defer.succeed((0, [event])),
             defer.succeed((0, [])),
         ]
-        yield defer.ensureDeferred(self.handler.notify_interested_services(0))
+        yield defer.ensureDeferred(
+            self.handler.notify_interested_services(RoomStreamToken(None, 0))
+        )
         self.assertFalse(
             self.mock_as_api.query_user.called,
             "query_user called when it shouldn't have been.",
diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py
index 64e28bc639..9f6f21a6e2 100644
--- a/tests/handlers/test_message.py
+++ b/tests/handlers/test_message.py
@@ -66,7 +66,6 @@ class EventCreationTestCase(unittest.HomeserverTestCase):
                     "sender": self.requester.user.to_string(),
                     "content": {"msgtype": "m.text", "body": random_string(5)},
                 },
-                token_id=self.token_id,
                 txn_id=txn_id,
             )
         )
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 914c82e7a8..8ed67640f8 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -615,7 +615,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
             self.store.get_latest_event_ids_in_room(room_id)
         )
 
-        event = self.get_success(builder.build(prev_event_ids))
+        event = self.get_success(builder.build(prev_event_ids, None))
 
         self.get_success(self.federation_handler.on_receive_pdu(hostname, event))
 
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 9c4a9c3563..779745ae9d 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -226,7 +226,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         }
 
         builder = factory.for_room_version(room_version, event_dict)
-        join_event = self.get_success(builder.build(prev_event_ids))
+        join_event = self.get_success(builder.build(prev_event_ids, None))
 
         self.get_success(federation.on_send_join_request(remote_server, join_event))
         self.replicate()
diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py
index 6068d14905..82cf033d4e 100644
--- a/tests/replication/test_sharded_event_persister.py
+++ b/tests/replication/test_sharded_event_persister.py
@@ -14,8 +14,12 @@
 # limitations under the License.
 import logging
 
+from mock import patch
+
+from synapse.api.room_versions import RoomVersion
 from synapse.rest import admin
 from synapse.rest.client.v1 import login, room
+from synapse.rest.client.v2_alpha import sync
 
 from tests.replication._base import BaseMultiWorkerStreamTestCase
 from tests.utils import USE_POSTGRES_FOR_TESTS
@@ -36,6 +40,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
         admin.register_servlets_for_client_rest_resource,
         room.register_servlets,
         login.register_servlets,
+        sync.register_servlets,
     ]
 
     def prepare(self, reactor, clock, hs):
@@ -43,6 +48,9 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
         self.other_user_id = self.register_user("otheruser", "pass")
         self.other_access_token = self.login("otheruser", "pass")
 
+        self.room_creator = self.hs.get_room_creation_handler()
+        self.store = hs.get_datastore()
+
     def default_config(self):
         conf = super().default_config()
         conf["redis"] = {"enabled": "true"}
@@ -53,6 +61,29 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
         }
         return conf
 
+    def _create_room(self, room_id: str, user_id: str, tok: str):
+        """Create a room with given room_id
+        """
+
+        # We control the room ID generation by patching out the
+        # `_generate_room_id` method
+        async def generate_room(
+            creator_id: str, is_public: bool, room_version: RoomVersion
+        ):
+            await self.store.store_room(
+                room_id=room_id,
+                room_creator_user_id=creator_id,
+                is_public=is_public,
+                room_version=room_version,
+            )
+            return room_id
+
+        with patch(
+            "synapse.handlers.room.RoomCreationHandler._generate_room_id"
+        ) as mock:
+            mock.side_effect = generate_room
+            self.helper.create_room_as(user_id, tok=tok)
+
     def test_basic(self):
         """Simple test to ensure that multiple rooms can be created and joined,
         and that different rooms get handled by different instances.
@@ -100,3 +131,189 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
 
         self.assertTrue(persisted_on_1)
         self.assertTrue(persisted_on_2)
+
+    def test_vector_clock_token(self):
+        """Tests that using a stream token with a vector clock component works
+        correctly with basic /sync and /messages usage.
+        """
+
+        self.make_worker_hs(
+            "synapse.app.generic_worker", {"worker_name": "worker1"},
+        )
+
+        worker_hs2 = self.make_worker_hs(
+            "synapse.app.generic_worker", {"worker_name": "worker2"},
+        )
+
+        sync_hs = self.make_worker_hs(
+            "synapse.app.generic_worker", {"worker_name": "sync"},
+        )
+
+        # Specially selected room IDs that get persisted on different workers.
+        room_id1 = "!foo:test"
+        room_id2 = "!baz:test"
+
+        self.assertEqual(
+            self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1"
+        )
+        self.assertEqual(
+            self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2"
+        )
+
+        user_id = self.register_user("user", "pass")
+        access_token = self.login("user", "pass")
+
+        store = self.hs.get_datastore()
+
+        # Create two room on the different workers.
+        self._create_room(room_id1, user_id, access_token)
+        self._create_room(room_id2, user_id, access_token)
+
+        # The other user joins
+        self.helper.join(
+            room=room_id1, user=self.other_user_id, tok=self.other_access_token
+        )
+        self.helper.join(
+            room=room_id2, user=self.other_user_id, tok=self.other_access_token
+        )
+
+        # Do an initial sync so that we're up to date.
+        request, channel = self.make_request("GET", "/sync", access_token=access_token)
+        self.render_on_worker(sync_hs, request)
+        next_batch = channel.json_body["next_batch"]
+
+        # We now gut wrench into the events stream MultiWriterIdGenerator on
+        # worker2 to mimic it getting stuck persisting an event. This ensures
+        # that when we send an event on worker1 we end up in a state where
+        # worker2 events stream position lags that on worker1, resulting in a
+        # RoomStreamToken with a non-empty instance map component.
+        #
+        # Worker2's event stream position will not advance until we call
+        # __aexit__ again.
+        actx = worker_hs2.get_datastore()._stream_id_gen.get_next()
+        self.get_success(actx.__aenter__())
+
+        response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token)
+        first_event_in_room1 = response["event_id"]
+
+        # Assert that the current stream token has an instance map component, as
+        # we are trying to test vector clock tokens.
+        room_stream_token = store.get_room_max_token()
+        self.assertNotEqual(len(room_stream_token.instance_map), 0)
+
+        # Check that syncing still gets the new event, despite the gap in the
+        # stream IDs.
+        request, channel = self.make_request(
+            "GET", "/sync?since={}".format(next_batch), access_token=access_token
+        )
+        self.render_on_worker(sync_hs, request)
+
+        # We should only see the new event and nothing else
+        self.assertIn(room_id1, channel.json_body["rooms"]["join"])
+        self.assertNotIn(room_id2, channel.json_body["rooms"]["join"])
+
+        events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"]
+        self.assertListEqual(
+            [first_event_in_room1], [event["event_id"] for event in events]
+        )
+
+        # Get the next batch and makes sure its a vector clock style token.
+        vector_clock_token = channel.json_body["next_batch"]
+        self.assertTrue(vector_clock_token.startswith("m"))
+
+        # Now that we've got a vector clock token we finish the fake persisting
+        # an event we started above.
+        self.get_success(actx.__aexit__(None, None, None))
+
+        # Now try and send an event to the other rooom so that we can test that
+        # the vector clock style token works as a `since` token.
+        response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token)
+        first_event_in_room2 = response["event_id"]
+
+        request, channel = self.make_request(
+            "GET",
+            "/sync?since={}".format(vector_clock_token),
+            access_token=access_token,
+        )
+        self.render_on_worker(sync_hs, request)
+
+        self.assertNotIn(room_id1, channel.json_body["rooms"]["join"])
+        self.assertIn(room_id2, channel.json_body["rooms"]["join"])
+
+        events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"]
+        self.assertListEqual(
+            [first_event_in_room2], [event["event_id"] for event in events]
+        )
+
+        next_batch = channel.json_body["next_batch"]
+
+        # We also want to test that the vector clock style token works with
+        # pagination. We do this by sending a couple of new events into the room
+        # and syncing again to get a prev_batch token for each room, then
+        # paginating from there back to the vector clock token.
+        self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token)
+        self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token)
+
+        request, channel = self.make_request(
+            "GET", "/sync?since={}".format(next_batch), access_token=access_token
+        )
+        self.render_on_worker(sync_hs, request)
+
+        prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][
+            "prev_batch"
+        ]
+        prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][
+            "prev_batch"
+        ]
+
+        # Paginating back in the first room should not produce any results, as
+        # no events have happened in it. This tests that we are correctly
+        # filtering results based on the vector clock portion.
+        request, channel = self.make_request(
+            "GET",
+            "/rooms/{}/messages?from={}&to={}&dir=b".format(
+                room_id1, prev_batch1, vector_clock_token
+            ),
+            access_token=access_token,
+        )
+        self.render_on_worker(sync_hs, request)
+        self.assertListEqual([], channel.json_body["chunk"])
+
+        # Paginating back on the second room should produce the first event
+        # again. This tests that pagination isn't completely broken.
+        request, channel = self.make_request(
+            "GET",
+            "/rooms/{}/messages?from={}&to={}&dir=b".format(
+                room_id2, prev_batch2, vector_clock_token
+            ),
+            access_token=access_token,
+        )
+        self.render_on_worker(sync_hs, request)
+        self.assertEqual(len(channel.json_body["chunk"]), 1)
+        self.assertEqual(
+            channel.json_body["chunk"][0]["event_id"], first_event_in_room2
+        )
+
+        # Paginating forwards should give the same results
+        request, channel = self.make_request(
+            "GET",
+            "/rooms/{}/messages?from={}&to={}&dir=f".format(
+                room_id1, vector_clock_token, prev_batch1
+            ),
+            access_token=access_token,
+        )
+        self.render_on_worker(sync_hs, request)
+        self.assertListEqual([], channel.json_body["chunk"])
+
+        request, channel = self.make_request(
+            "GET",
+            "/rooms/{}/messages?from={}&to={}&dir=f".format(
+                room_id2, vector_clock_token, prev_batch2,
+            ),
+            access_token=access_token,
+        )
+        self.render_on_worker(sync_hs, request)
+        self.assertEqual(len(channel.json_body["chunk"]), 1)
+        self.assertEqual(
+            channel.json_body["chunk"][0]["event_id"], first_event_in_room2
+        )
diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py
index 1ea35d60c1..d4f9e809db 100644
--- a/tests/storage/test_redaction.py
+++ b/tests/storage/test_redaction.py
@@ -236,9 +236,9 @@ class RedactionTestCase(unittest.HomeserverTestCase):
                 self._event_id = event_id
 
             @defer.inlineCallbacks
-            def build(self, prev_event_ids):
+            def build(self, prev_event_ids, auth_event_ids):
                 built_event = yield defer.ensureDeferred(
-                    self._base_builder.build(prev_event_ids)
+                    self._base_builder.build(prev_event_ids, auth_event_ids)
                 )
 
                 built_event._event_id = self._event_id
diff --git a/tests/unittest.py b/tests/unittest.py
index 6c1661c92c..040b126a27 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -20,7 +20,7 @@ import hmac
 import inspect
 import logging
 import time
-from typing import Optional, Tuple, Type, TypeVar, Union
+from typing import Optional, Tuple, Type, TypeVar, Union, overload
 
 from mock import Mock, patch
 
@@ -364,6 +364,36 @@ class HomeserverTestCase(TestCase):
         Function to optionally be overridden in subclasses.
         """
 
+    # Annoyingly mypy doesn't seem to pick up the fact that T is SynapseRequest
+    # when the `request` arg isn't given, so we define an explicit override to
+    # cover that case.
+    @overload
+    def make_request(
+        self,
+        method: Union[bytes, str],
+        path: Union[bytes, str],
+        content: Union[bytes, dict] = b"",
+        access_token: Optional[str] = None,
+        shorthand: bool = True,
+        federation_auth_origin: str = None,
+        content_is_form: bool = False,
+    ) -> Tuple[SynapseRequest, FakeChannel]:
+        ...
+
+    @overload
+    def make_request(
+        self,
+        method: Union[bytes, str],
+        path: Union[bytes, str],
+        content: Union[bytes, dict] = b"",
+        access_token: Optional[str] = None,
+        request: Type[T] = SynapseRequest,
+        shorthand: bool = True,
+        federation_auth_origin: str = None,
+        content_is_form: bool = False,
+    ) -> Tuple[T, FakeChannel]:
+        ...
+
     def make_request(
         self,
         method: Union[bytes, str],