summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/cache.py13
-rw-r--r--synapse/config/emailconfig.py14
-rw-r--r--synapse/config/repository.py24
-rw-r--r--synapse/config/server.py2
-rw-r--r--synapse/federation/federation_client.py26
-rw-r--r--synapse/federation/federation_server.py50
-rw-r--r--synapse/federation/persistence.py4
-rw-r--r--synapse/federation/sender/transaction_manager.py9
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/federation/transport/server.py11
-rw-r--r--synapse/federation/units.py90
-rw-r--r--synapse/handlers/appservice.py7
-rw-r--r--synapse/handlers/federation.py237
-rw-r--r--synapse/handlers/identity.py6
-rw-r--r--synapse/handlers/receipts.py3
-rw-r--r--synapse/handlers/room_list.py46
-rw-r--r--synapse/handlers/room_member.py13
-rw-r--r--synapse/handlers/send_email.py94
-rw-r--r--synapse/handlers/space_summary.py182
-rw-r--r--synapse/handlers/sync.py14
-rw-r--r--synapse/handlers/typing.py3
-rw-r--r--synapse/module_api/__init__.py12
-rw-r--r--synapse/rest/client/v1/room.py45
-rw-r--r--synapse/server.py6
-rw-r--r--synapse/storage/database.py14
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py188
-rw-r--r--synapse/storage/databases/main/event_federation.py114
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/events_worker.py144
-rw-r--r--synapse/storage/databases/main/registration.py30
-rw-r--r--synapse/storage/databases/main/roommember.py6
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql24
-rw-r--r--synapse/types.py29
-rw-r--r--synapse/util/jsonobject.py102
35 files changed, 1046 insertions, 544 deletions
diff --git a/synapse/config/cache.py b/synapse/config/cache.py
index 8d5f38b5d9..d119427ad8 100644
--- a/synapse/config/cache.py
+++ b/synapse/config/cache.py
@@ -151,6 +151,15 @@ class CacheConfig(Config):
           # entries are never evicted based on time.
           #
           #expiry_time: 30m
+
+          # Controls how long the results of a /sync request are cached for after
+          # a successful response is returned. A higher duration can help clients with
+          # intermittent connections, at the cost of higher memory usage.
+          #
+          # By default, this is zero, which means that sync responses are not cached
+          # at all.
+          #
+          #sync_response_cache_duration: 2m
         """
 
     def read_config(self, config, **kwargs):
@@ -212,6 +221,10 @@ class CacheConfig(Config):
         else:
             self.expiry_time_msec = None
 
+        self.sync_response_cache_duration = self.parse_duration(
+            cache_config.get("sync_response_cache_duration", 0)
+        )
+
         # Resize all caches (if necessary) with the new factors we've loaded
         self.resize_all_caches()
 
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 8d8f166e9b..42526502f0 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -80,6 +80,12 @@ class EmailConfig(Config):
         self.require_transport_security = email_config.get(
             "require_transport_security", False
         )
+        self.enable_smtp_tls = email_config.get("enable_tls", True)
+        if self.require_transport_security and not self.enable_smtp_tls:
+            raise ConfigError(
+                "email.require_transport_security requires email.enable_tls to be true"
+            )
+
         if "app_name" in email_config:
             self.email_app_name = email_config["app_name"]
         else:
@@ -368,6 +374,14 @@ class EmailConfig(Config):
           #
           #require_transport_security: true
 
+          # Uncomment the following to disable TLS for SMTP.
+          #
+          # By default, if the server supports TLS, it will be used, and the server
+          # must present a certificate that is valid for 'smtp_host'. If this option
+          # is set to false, TLS will not be used.
+          #
+          #enable_tls: false
+
           # notif_from defines the "From" address to use when sending emails.
           # It must be set if email sending is enabled.
           #
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 0dfb3a227a..7481f3bf5f 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -12,9 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import os
 from collections import namedtuple
 from typing import Dict, List
+from urllib.request import getproxies_environment  # type: ignore
 
 from synapse.config.server import DEFAULT_IP_RANGE_BLACKLIST, generate_ip_set
 from synapse.python_dependencies import DependencyException, check_requirements
@@ -22,6 +24,8 @@ from synapse.util.module_loader import load_module
 
 from ._base import Config, ConfigError
 
+logger = logging.getLogger(__name__)
+
 DEFAULT_THUMBNAIL_SIZES = [
     {"width": 32, "height": 32, "method": "crop"},
     {"width": 96, "height": 96, "method": "crop"},
@@ -36,6 +40,9 @@ THUMBNAIL_SIZE_YAML = """\
         #    method: %(method)s
 """
 
+HTTP_PROXY_SET_WARNING = """\
+The Synapse config url_preview_ip_range_blacklist will be ignored as an HTTP(s) proxy is configured."""
+
 ThumbnailRequirement = namedtuple(
     "ThumbnailRequirement", ["width", "height", "method", "media_type"]
 )
@@ -180,12 +187,17 @@ class ContentRepositoryConfig(Config):
                     e.message  # noqa: B306, DependencyException.message is a property
                 )
 
+            proxy_env = getproxies_environment()
             if "url_preview_ip_range_blacklist" not in config:
-                raise ConfigError(
-                    "For security, you must specify an explicit target IP address "
-                    "blacklist in url_preview_ip_range_blacklist for url previewing "
-                    "to work"
-                )
+                if "http" not in proxy_env or "https" not in proxy_env:
+                    raise ConfigError(
+                        "For security, you must specify an explicit target IP address "
+                        "blacklist in url_preview_ip_range_blacklist for url previewing "
+                        "to work"
+                    )
+            else:
+                if "http" in proxy_env or "https" in proxy_env:
+                    logger.warning("".join(HTTP_PROXY_SET_WARNING))
 
             # we always blacklist '0.0.0.0' and '::', which are supposed to be
             # unroutable addresses.
@@ -292,6 +304,8 @@ class ContentRepositoryConfig(Config):
         # This must be specified if url_preview_enabled is set. It is recommended that
         # you uncomment the following list as a starting point.
         #
+        # Note: The value is ignored when an HTTP proxy is in use
+        #
         #url_preview_ip_range_blacklist:
 %(ip_range_blacklist)s
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index b9e0c0b300..187b4301a0 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -960,6 +960,8 @@ class ServerConfig(Config):
         #
         # This option replaces federation_ip_range_blacklist in Synapse v1.25.0.
         #
+        # Note: The value is ignored when an HTTP proxy is in use
+        #
         #ip_range_blacklist:
 %(ip_range_blacklist)s
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b7a10da15a..2eefac04fd 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1108,7 +1108,8 @@ class FederationClient(FederationBase):
             The response from the remote server.
 
         Raises:
-            HttpResponseException: There was an exception returned from the remote server
+            HttpResponseException / RequestSendFailed: There was an exception
+                returned from the remote server
             SynapseException: M_FORBIDDEN when the remote server has disallowed publicRoom
                 requests over federation
 
@@ -1290,7 +1291,7 @@ class FederationClient(FederationBase):
         )
 
 
-@attr.s(frozen=True, slots=True)
+@attr.s(frozen=True, slots=True, auto_attribs=True)
 class FederationSpaceSummaryEventResult:
     """Represents a single event in the result of a successful get_space_summary call.
 
@@ -1299,12 +1300,13 @@ class FederationSpaceSummaryEventResult:
     object attributes.
     """
 
-    event_type = attr.ib(type=str)
-    state_key = attr.ib(type=str)
-    via = attr.ib(type=Sequence[str])
+    event_type: str
+    room_id: str
+    state_key: str
+    via: Sequence[str]
 
     # the raw data, including the above keys
-    data = attr.ib(type=JsonDict)
+    data: JsonDict
 
     @classmethod
     def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
@@ -1321,6 +1323,10 @@ class FederationSpaceSummaryEventResult:
         if not isinstance(event_type, str):
             raise ValueError("Invalid event: 'event_type' must be a str")
 
+        room_id = d.get("room_id")
+        if not isinstance(room_id, str):
+            raise ValueError("Invalid event: 'room_id' must be a str")
+
         state_key = d.get("state_key")
         if not isinstance(state_key, str):
             raise ValueError("Invalid event: 'state_key' must be a str")
@@ -1335,15 +1341,15 @@ class FederationSpaceSummaryEventResult:
         if any(not isinstance(v, str) for v in via):
             raise ValueError("Invalid event: 'via' must be a list of strings")
 
-        return cls(event_type, state_key, via, d)
+        return cls(event_type, room_id, state_key, via, d)
 
 
-@attr.s(frozen=True, slots=True)
+@attr.s(frozen=True, slots=True, auto_attribs=True)
 class FederationSpaceSummaryResult:
     """Represents the data returned by a successful get_space_summary call."""
 
-    rooms = attr.ib(type=Sequence[JsonDict])
-    events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])
+    rooms: Sequence[JsonDict]
+    events: Sequence[FederationSpaceSummaryEventResult]
 
     @classmethod
     def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 145b9161d9..0385aadefa 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -195,13 +195,17 @@ class FederationServer(FederationBase):
                 origin, room_id, versions, limit
             )
 
-            res = self._transaction_from_pdus(pdus).get_dict()
+            res = self._transaction_dict_from_pdus(pdus)
 
         return 200, res
 
     async def on_incoming_transaction(
-        self, origin: str, transaction_data: JsonDict
-    ) -> Tuple[int, Dict[str, Any]]:
+        self,
+        origin: str,
+        transaction_id: str,
+        destination: str,
+        transaction_data: JsonDict,
+    ) -> Tuple[int, JsonDict]:
         # If we receive a transaction we should make sure that kick off handling
         # any old events in the staging area.
         if not self._started_handling_of_staged_events:
@@ -212,8 +216,14 @@ class FederationServer(FederationBase):
         # accurate as possible.
         request_time = self._clock.time_msec()
 
-        transaction = Transaction(**transaction_data)
-        transaction_id = transaction.transaction_id  # type: ignore
+        transaction = Transaction(
+            transaction_id=transaction_id,
+            destination=destination,
+            origin=origin,
+            origin_server_ts=transaction_data.get("origin_server_ts"),  # type: ignore
+            pdus=transaction_data.get("pdus"),  # type: ignore
+            edus=transaction_data.get("edus"),
+        )
 
         if not transaction_id:
             raise Exception("Transaction missing transaction_id")
@@ -221,9 +231,7 @@ class FederationServer(FederationBase):
         logger.debug("[%s] Got transaction", transaction_id)
 
         # Reject malformed transactions early: reject if too many PDUs/EDUs
-        if len(transaction.pdus) > 50 or (  # type: ignore
-            hasattr(transaction, "edus") and len(transaction.edus) > 100  # type: ignore
-        ):
+        if len(transaction.pdus) > 50 or len(transaction.edus) > 100:
             logger.info("Transaction PDU or EDU count too large. Returning 400")
             return 400, {}
 
@@ -263,7 +271,7 @@ class FederationServer(FederationBase):
         # CRITICAL SECTION: the first thing we must do (before awaiting) is
         # add an entry to _active_transactions.
         assert origin not in self._active_transactions
-        self._active_transactions[origin] = transaction.transaction_id  # type: ignore
+        self._active_transactions[origin] = transaction.transaction_id
 
         try:
             result = await self._handle_incoming_transaction(
@@ -291,11 +299,11 @@ class FederationServer(FederationBase):
         if response:
             logger.debug(
                 "[%s] We've already responded to this request",
-                transaction.transaction_id,  # type: ignore
+                transaction.transaction_id,
             )
             return response
 
-        logger.debug("[%s] Transaction is new", transaction.transaction_id)  # type: ignore
+        logger.debug("[%s] Transaction is new", transaction.transaction_id)
 
         # We process PDUs and EDUs in parallel. This is important as we don't
         # want to block things like to device messages from reaching clients
@@ -334,7 +342,7 @@ class FederationServer(FederationBase):
             report back to the sending server.
         """
 
-        received_pdus_counter.inc(len(transaction.pdus))  # type: ignore
+        received_pdus_counter.inc(len(transaction.pdus))
 
         origin_host, _ = parse_server_name(origin)
 
@@ -342,7 +350,7 @@ class FederationServer(FederationBase):
 
         newest_pdu_ts = 0
 
-        for p in transaction.pdus:  # type: ignore
+        for p in transaction.pdus:
             # FIXME (richardv): I don't think this works:
             #  https://github.com/matrix-org/synapse/issues/8429
             if "unsigned" in p:
@@ -436,10 +444,10 @@ class FederationServer(FederationBase):
 
         return pdu_results
 
-    async def _handle_edus_in_txn(self, origin: str, transaction: Transaction):
+    async def _handle_edus_in_txn(self, origin: str, transaction: Transaction) -> None:
         """Process the EDUs in a received transaction."""
 
-        async def _process_edu(edu_dict):
+        async def _process_edu(edu_dict: JsonDict) -> None:
             received_edus_counter.inc()
 
             edu = Edu(
@@ -452,7 +460,7 @@ class FederationServer(FederationBase):
 
         await concurrently_execute(
             _process_edu,
-            getattr(transaction, "edus", []),
+            transaction.edus,
             TRANSACTION_CONCURRENCY_LIMIT,
         )
 
@@ -538,7 +546,7 @@ class FederationServer(FederationBase):
         pdu = await self.handler.get_persisted_pdu(origin, event_id)
 
         if pdu:
-            return 200, self._transaction_from_pdus([pdu]).get_dict()
+            return 200, self._transaction_dict_from_pdus([pdu])
         else:
             return 404, ""
 
@@ -879,18 +887,20 @@ class FederationServer(FederationBase):
         ts_now_ms = self._clock.time_msec()
         return await self.store.get_user_id_for_open_id_token(token, ts_now_ms)
 
-    def _transaction_from_pdus(self, pdu_list: List[EventBase]) -> Transaction:
+    def _transaction_dict_from_pdus(self, pdu_list: List[EventBase]) -> JsonDict:
         """Returns a new Transaction containing the given PDUs suitable for
         transmission.
         """
         time_now = self._clock.time_msec()
         pdus = [p.get_pdu_json(time_now) for p in pdu_list]
         return Transaction(
+            # Just need a dummy transaction ID and destination since it won't be used.
+            transaction_id="",
             origin=self.server_name,
             pdus=pdus,
             origin_server_ts=int(time_now),
-            destination=None,
-        )
+            destination="",
+        ).get_dict()
 
     async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
         """Process a PDU received in a federation /send/ transaction.
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 2f9c9bc2cd..4fead6ca29 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -45,7 +45,7 @@ class TransactionActions:
             `None` if we have not previously responded to this transaction or a
             2-tuple of `(int, dict)` representing the response code and response body.
         """
-        transaction_id = transaction.transaction_id  # type: ignore
+        transaction_id = transaction.transaction_id
         if not transaction_id:
             raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
@@ -56,7 +56,7 @@ class TransactionActions:
         self, origin: str, transaction: Transaction, code: int, response: JsonDict
     ) -> None:
         """Persist how we responded to a transaction."""
-        transaction_id = transaction.transaction_id  # type: ignore
+        transaction_id = transaction.transaction_id
         if not transaction_id:
             raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 72a635830b..dc555cca0b 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -27,6 +27,7 @@ from synapse.logging.opentracing import (
     tags,
     whitelisted_homeserver,
 )
+from synapse.types import JsonDict
 from synapse.util import json_decoder
 from synapse.util.metrics import measure_func
 
@@ -104,13 +105,13 @@ class TransactionManager:
                 len(edus),
             )
 
-            transaction = Transaction.create_new(
+            transaction = Transaction(
                 origin_server_ts=int(self.clock.time_msec()),
                 transaction_id=txn_id,
                 origin=self._server_name,
                 destination=destination,
-                pdus=pdus,
-                edus=edus,
+                pdus=[p.get_pdu_json() for p in pdus],
+                edus=[edu.get_dict() for edu in edus],
             )
 
             self._next_txn_id += 1
@@ -131,7 +132,7 @@ class TransactionManager:
             # FIXME (richardv): I also believe it no longer works. We (now?) store
             #  "age_ts" in "unsigned" rather than at the top level. See
             #  https://github.com/matrix-org/synapse/issues/8429.
-            def json_data_cb():
+            def json_data_cb() -> JsonDict:
                 data = transaction.get_dict()
                 now = int(self.clock.time_msec())
                 if "pdus" in data:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 6a8d3ad4fe..90a7c16b62 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -143,7 +143,7 @@ class TransportLayerClient:
         """Sends the given Transaction to its destination
 
         Args:
-            transaction (Transaction)
+            transaction
 
         Returns:
             Succeeds when we get a 2xx HTTP response. The result
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 5e059d6e09..640f46fff6 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -450,21 +450,12 @@ class FederationSendServlet(BaseFederationServerServlet):
                 len(transaction_data.get("edus", [])),
             )
 
-            # We should ideally be getting this from the security layer.
-            # origin = body["origin"]
-
-            # Add some extra data to the transaction dict that isn't included
-            # in the request body.
-            transaction_data.update(
-                transaction_id=transaction_id, destination=self.server_name
-            )
-
         except Exception as e:
             logger.exception(e)
             return 400, {"error": "Invalid transaction"}
 
         code, response = await self.handler.on_incoming_transaction(
-            origin, transaction_data
+            origin, transaction_id, self.server_name, transaction_data
         )
 
         return code, response
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index c83a261918..b9b12fbea5 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -17,18 +17,17 @@ server protocol.
 """
 
 import logging
-from typing import Optional
+from typing import List, Optional
 
 import attr
 
 from synapse.types import JsonDict
-from synapse.util.jsonobject import JsonEncodedObject
 
 logger = logging.getLogger(__name__)
 
 
-@attr.s(slots=True)
-class Edu(JsonEncodedObject):
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class Edu:
     """An Edu represents a piece of data sent from one homeserver to another.
 
     In comparison to Pdus, Edus are not persisted for a long time on disk, are
@@ -36,10 +35,10 @@ class Edu(JsonEncodedObject):
     internal ID or previous references graph.
     """
 
-    edu_type = attr.ib(type=str)
-    content = attr.ib(type=dict)
-    origin = attr.ib(type=str)
-    destination = attr.ib(type=str)
+    edu_type: str
+    content: dict
+    origin: str
+    destination: str
 
     def get_dict(self) -> JsonDict:
         return {
@@ -55,14 +54,21 @@ class Edu(JsonEncodedObject):
             "destination": self.destination,
         }
 
-    def get_context(self):
+    def get_context(self) -> str:
         return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
 
-    def strip_context(self):
+    def strip_context(self) -> None:
         getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"
 
 
-class Transaction(JsonEncodedObject):
+def _none_to_list(edus: Optional[List[JsonDict]]) -> List[JsonDict]:
+    if edus is None:
+        return []
+    return edus
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class Transaction:
     """A transaction is a list of Pdus and Edus to be sent to a remote home
     server with some extra metadata.
 
@@ -78,47 +84,21 @@ class Transaction(JsonEncodedObject):
 
     """
 
-    valid_keys = [
-        "transaction_id",
-        "origin",
-        "destination",
-        "origin_server_ts",
-        "previous_ids",
-        "pdus",
-        "edus",
-    ]
-
-    internal_keys = ["transaction_id", "destination"]
-
-    required_keys = [
-        "transaction_id",
-        "origin",
-        "destination",
-        "origin_server_ts",
-        "pdus",
-    ]
-
-    def __init__(self, transaction_id=None, pdus: Optional[list] = None, **kwargs):
-        """If we include a list of pdus then we decode then as PDU's
-        automatically.
-        """
-
-        # If there's no EDUs then remove the arg
-        if "edus" in kwargs and not kwargs["edus"]:
-            del kwargs["edus"]
-
-        super().__init__(transaction_id=transaction_id, pdus=pdus or [], **kwargs)
-
-    @staticmethod
-    def create_new(pdus, **kwargs):
-        """Used to create a new transaction. Will auto fill out
-        transaction_id and origin_server_ts keys.
-        """
-        if "origin_server_ts" not in kwargs:
-            raise KeyError("Require 'origin_server_ts' to construct a Transaction")
-        if "transaction_id" not in kwargs:
-            raise KeyError("Require 'transaction_id' to construct a Transaction")
-
-        kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
-
-        return Transaction(**kwargs)
+    # Required keys.
+    transaction_id: str
+    origin: str
+    destination: str
+    origin_server_ts: int
+    pdus: List[JsonDict] = attr.ib(factory=list, converter=_none_to_list)
+    edus: List[JsonDict] = attr.ib(factory=list, converter=_none_to_list)
+
+    def get_dict(self) -> JsonDict:
+        """A JSON-ready dictionary of valid keys which aren't internal."""
+        result = {
+            "origin": self.origin,
+            "origin_server_ts": self.origin_server_ts,
+            "pdus": self.pdus,
+        }
+        if self.edus:
+            result["edus"] = self.edus
+        return result
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 21a17cd2e8..4ab4046650 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -392,9 +392,6 @@ class ApplicationServicesHandler:
                     protocols[p].append(info)
 
         def _merge_instances(infos: List[JsonDict]) -> JsonDict:
-            if not infos:
-                return {}
-
             # Merge the 'instances' lists of multiple results, but just take
             # the other fields from the first as they ought to be identical
             # copy the result so as not to corrupt the cached one
@@ -406,7 +403,9 @@ class ApplicationServicesHandler:
 
             return combined
 
-        return {p: _merge_instances(protocols[p]) for p in protocols.keys()}
+        return {
+            p: _merge_instances(protocols[p]) for p in protocols.keys() if protocols[p]
+        }
 
     async def _get_services_for_event(
         self, event: EventBase
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8197b60b76..9a5e726533 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -42,6 +42,7 @@ from twisted.internet import defer
 
 from synapse import event_auth
 from synapse.api.constants import (
+    EventContentFields,
     EventTypes,
     Membership,
     RejectedReason,
@@ -108,21 +109,33 @@ soft_failed_event_counter = Counter(
 )
 
 
-@attr.s(slots=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class _NewEventInfo:
     """Holds information about a received event, ready for passing to _auth_and_persist_events
 
     Attributes:
         event: the received event
 
-        state: the state at that event
+        state: the state at that event, according to /state_ids from a remote
+           homeserver. Only populated for backfilled events which are going to be a
+           new backwards extremity.
+
+        claimed_auth_event_map: a map of (type, state_key) => event for the event's
+            claimed auth_events.
+
+            This can include events which have not yet been persisted, in the case that
+            we are backfilling a batch of events.
+
+            Note: May be incomplete: if we were unable to find all of the claimed auth
+            events. Also, treat the contents with caution: the events might also have
+            been rejected, might not yet have been authorized themselves, or they might
+            be in the wrong room.
 
-        auth_events: the auth_event map for that event
     """
 
-    event = attr.ib(type=EventBase)
-    state = attr.ib(type=Optional[Sequence[EventBase]], default=None)
-    auth_events = attr.ib(type=Optional[MutableStateMap[EventBase]], default=None)
+    event: EventBase
+    state: Optional[Sequence[EventBase]]
+    claimed_auth_event_map: StateMap[EventBase]
 
 
 class FederationHandler(BaseHandler):
@@ -262,7 +275,12 @@ class FederationHandler(BaseHandler):
 
         state = None
 
-        # Get missing pdus if necessary.
+        # Check that the event passes auth based on the state at the event. This is
+        # done for events that are to be added to the timeline (non-outliers).
+        #
+        # Get missing pdus if necessary:
+        #  - Fetching any missing prev events to fill in gaps in the graph
+        #  - Fetching state if we have a hole in the graph
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
             min_depth = await self.get_min_depth_for_context(pdu.room_id)
@@ -432,6 +450,13 @@ class FederationHandler(BaseHandler):
                         affected=event_id,
                     )
 
+        # A second round of checks for all events. Check that the event passes auth
+        # based on `auth_events`, this allows us to assert that the event would
+        # have been allowed at some point. If an event passes this check its OK
+        # for it to be used as part of a returned `/state` request, as either
+        # a) we received the event as part of the original join and so trust it, or
+        # b) we'll do a state resolution with existing state before it becomes
+        # part of the "current state", which adds more protection.
         await self._process_received_pdu(origin, pdu, state=state)
 
     async def _get_missing_events_for_pdu(
@@ -889,6 +914,79 @@ class FederationHandler(BaseHandler):
                     "resync_device_due_to_pdu", self._resync_device, event.sender
                 )
 
+        await self._handle_marker_event(origin, event)
+
+    async def _handle_marker_event(self, origin: str, marker_event: EventBase):
+        """Handles backfilling the insertion event when we receive a marker
+        event that points to one.
+
+        Args:
+            origin: Origin of the event. Will be called to get the insertion event
+            marker_event: The event to process
+        """
+
+        if marker_event.type != EventTypes.MSC2716_MARKER:
+            # Not a marker event
+            return
+
+        if marker_event.rejected_reason is not None:
+            # Rejected event
+            return
+
+        # Skip processing a marker event if the room version doesn't
+        # support it.
+        room_version = await self.store.get_room_version(marker_event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        logger.debug("_handle_marker_event: received %s", marker_event)
+
+        insertion_event_id = marker_event.content.get(
+            EventContentFields.MSC2716_MARKER_INSERTION
+        )
+
+        if insertion_event_id is None:
+            # Nothing to retrieve then (invalid marker)
+            return
+
+        logger.debug(
+            "_handle_marker_event: backfilling insertion event %s", insertion_event_id
+        )
+
+        await self._get_events_and_persist(
+            origin,
+            marker_event.room_id,
+            [insertion_event_id],
+        )
+
+        insertion_event = await self.store.get_event(
+            insertion_event_id, allow_none=True
+        )
+        if insertion_event is None:
+            logger.warning(
+                "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
+                origin,
+                insertion_event_id,
+                marker_event.event_id,
+            )
+            return
+
+        logger.debug(
+            "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
+        await self.store.insert_insertion_extremity(
+            insertion_event_id, marker_event.room_id
+        )
+
+        logger.debug(
+            "_handle_marker_event: insertion extremity added for %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
     async def _resync_device(self, sender: str) -> None:
         """We have detected that the device list for the given user may be out
         of sync, so we try and resync them.
@@ -1000,7 +1098,7 @@ class FederationHandler(BaseHandler):
                 _NewEventInfo(
                     event=ev,
                     state=events_to_state[e_id],
-                    auth_events={
+                    claimed_auth_event_map={
                         (
                             auth_events[a_id].type,
                             auth_events[a_id].state_key,
@@ -1057,9 +1155,19 @@ class FederationHandler(BaseHandler):
     async def _maybe_backfill_inner(
         self, room_id: str, current_depth: int, limit: int
     ) -> bool:
-        extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
+        oldest_events_with_depth = (
+            await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
+        )
+        insertion_events_to_be_backfilled = (
+            await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
+        )
+        logger.debug(
+            "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
+            oldest_events_with_depth,
+            insertion_events_to_be_backfilled,
+        )
 
-        if not extremities:
+        if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
             logger.debug("Not backfilling as no extremeties found.")
             return False
 
@@ -1089,10 +1197,12 @@ class FederationHandler(BaseHandler):
         #   state *before* the event, ignoring the special casing certain event
         #   types have.
 
-        forward_events = await self.store.get_successor_events(list(extremities))
+        forward_event_ids = await self.store.get_successor_events(
+            list(oldest_events_with_depth)
+        )
 
         extremities_events = await self.store.get_events(
-            forward_events,
+            forward_event_ids,
             redact_behaviour=EventRedactBehaviour.AS_IS,
             get_prev_content=False,
         )
@@ -1106,10 +1216,19 @@ class FederationHandler(BaseHandler):
             redact=False,
             check_history_visibility_only=True,
         )
+        logger.debug(
+            "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
+        )
 
-        if not filtered_extremities:
+        if not filtered_extremities and not insertion_events_to_be_backfilled:
             return False
 
+        extremities = {
+            **oldest_events_with_depth,
+            # TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
+            **insertion_events_to_be_backfilled,
+        }
+
         # Check if we reached a point where we should start backfilling.
         sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
         max_depth = sorted_extremeties_tuple[0][1]
@@ -2208,7 +2327,7 @@ class FederationHandler(BaseHandler):
         event: EventBase,
         context: EventContext,
         state: Optional[Iterable[EventBase]] = None,
-        auth_events: Optional[MutableStateMap[EventBase]] = None,
+        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
         backfilled: bool = False,
     ) -> None:
         """
@@ -2220,17 +2339,18 @@ class FederationHandler(BaseHandler):
             context:
                 The event context.
 
-                NB that this function potentially modifies it.
             state:
                 The state events used to check the event for soft-fail. If this is
                 not provided the current state events will be used.
-            auth_events:
-                Map from (event_type, state_key) to event
 
-                Normally, our calculated auth_events based on the state of the room
-                at the event's position in the DAG, though occasionally (eg if the
-                event is an outlier), may be the auth events claimed by the remote
-                server.
+            claimed_auth_event_map:
+                A map of (type, state_key) => event for the event's claimed auth_events.
+                Possibly incomplete, and possibly including events that are not yet
+                persisted, or authed, or in the right room.
+
+                Only populated where we may not already have persisted these events -
+                for example, when populating outliers.
+
             backfilled: True if the event was backfilled.
         """
         context = await self._check_event_auth(
@@ -2238,7 +2358,7 @@ class FederationHandler(BaseHandler):
             event,
             context,
             state=state,
-            auth_events=auth_events,
+            claimed_auth_event_map=claimed_auth_event_map,
             backfilled=backfilled,
         )
 
@@ -2302,7 +2422,7 @@ class FederationHandler(BaseHandler):
                     event,
                     res,
                     state=ev_info.state,
-                    auth_events=ev_info.auth_events,
+                    claimed_auth_event_map=ev_info.claimed_auth_event_map,
                     backfilled=backfilled,
                 )
             return res
@@ -2568,7 +2688,7 @@ class FederationHandler(BaseHandler):
         event: EventBase,
         context: EventContext,
         state: Optional[Iterable[EventBase]] = None,
-        auth_events: Optional[MutableStateMap[EventBase]] = None,
+        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
         backfilled: bool = False,
     ) -> EventContext:
         """
@@ -2580,21 +2700,19 @@ class FederationHandler(BaseHandler):
             context:
                 The event context.
 
-                NB that this function potentially modifies it.
             state:
                 The state events used to check the event for soft-fail. If this is
                 not provided the current state events will be used.
-            auth_events:
-                Map from (event_type, state_key) to event
 
-                Normally, our calculated auth_events based on the state of the room
-                at the event's position in the DAG, though occasionally (eg if the
-                event is an outlier), may be the auth events claimed by the remote
-                server.
+            claimed_auth_event_map:
+                A map of (type, state_key) => event for the event's claimed auth_events.
+                Possibly incomplete, and possibly including events that are not yet
+                persisted, or authed, or in the right room.
 
-                Also NB that this function adds entries to it.
+                Only populated where we may not already have persisted these events -
+                for example, when populating outliers, or the state for a backwards
+                extremity.
 
-                If this is not provided, it is calculated from the previous state IDs.
             backfilled: True if the event was backfilled.
 
         Returns:
@@ -2603,7 +2721,12 @@ class FederationHandler(BaseHandler):
         room_version = await self.store.get_room_version_id(event.room_id)
         room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
-        if not auth_events:
+        if claimed_auth_event_map:
+            # if we have a copy of the auth events from the event, use that as the
+            # basis for auth.
+            auth_events = claimed_auth_event_map
+        else:
+            # otherwise, we calculate what the auth events *should* be, and use that
             prev_state_ids = await context.get_prev_state_ids()
             auth_events_ids = self._event_auth_handler.compute_auth_events(
                 event, prev_state_ids, for_verification=True
@@ -2611,18 +2734,11 @@ class FederationHandler(BaseHandler):
             auth_events_x = await self.store.get_events(auth_events_ids)
             auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
 
-        # This is a hack to fix some old rooms where the initial join event
-        # didn't reference the create event in its auth events.
-        if event.type == EventTypes.Member and not event.auth_event_ids():
-            if len(event.prev_event_ids()) == 1 and event.depth < 5:
-                c = await self.store.get_event(
-                    event.prev_event_ids()[0], allow_none=True
-                )
-                if c and c.type == EventTypes.Create:
-                    auth_events[(c.type, c.state_key)] = c
-
         try:
-            context = await self._update_auth_events_and_context_for_auth(
+            (
+                context,
+                auth_events_for_auth,
+            ) = await self._update_auth_events_and_context_for_auth(
                 origin, event, context, auth_events
             )
         except Exception:
@@ -2635,9 +2751,10 @@ class FederationHandler(BaseHandler):
                 "Ignoring failure and continuing processing of event.",
                 event.event_id,
             )
+            auth_events_for_auth = auth_events
 
         try:
-            event_auth.check(room_version_obj, event, auth_events=auth_events)
+            event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
         except AuthError as e:
             logger.warning("Failed auth resolution for %r because %s", event, e)
             context.rejected = RejectedReason.AUTH_ERROR
@@ -2662,8 +2779,8 @@ class FederationHandler(BaseHandler):
         origin: str,
         event: EventBase,
         context: EventContext,
-        auth_events: MutableStateMap[EventBase],
-    ) -> EventContext:
+        input_auth_events: StateMap[EventBase],
+    ) -> Tuple[EventContext, StateMap[EventBase]]:
         """Helper for _check_event_auth. See there for docs.
 
         Checks whether a given event has the expected auth events. If it
@@ -2680,7 +2797,7 @@ class FederationHandler(BaseHandler):
             event:
             context:
 
-            auth_events:
+            input_auth_events:
                 Map from (event_type, state_key) to event
 
                 Normally, our calculated auth_events based on the state of the room
@@ -2688,11 +2805,12 @@ class FederationHandler(BaseHandler):
                 event is an outlier), may be the auth events claimed by the remote
                 server.
 
-                Also NB that this function adds entries to it.
-
         Returns:
-            updated context
+            updated context, updated auth event map
         """
+        # take a copy of input_auth_events before we modify it.
+        auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
+
         event_auth_events = set(event.auth_event_ids())
 
         # missing_auth is the set of the event's auth_events which we don't yet have
@@ -2721,7 +2839,7 @@ class FederationHandler(BaseHandler):
                     # The other side isn't around or doesn't implement the
                     # endpoint, so lets just bail out.
                     logger.info("Failed to get event auth from remote: %s", e1)
-                    return context
+                    return context, auth_events
 
                 seen_remotes = await self.store.have_seen_events(
                     event.room_id, [e.event_id for e in remote_auth_chain]
@@ -2752,7 +2870,10 @@ class FederationHandler(BaseHandler):
                             await self.state_handler.compute_event_context(e)
                         )
                         await self._auth_and_persist_event(
-                            origin, e, missing_auth_event_context, auth_events=auth
+                            origin,
+                            e,
+                            missing_auth_event_context,
+                            claimed_auth_event_map=auth,
                         )
 
                         if e.event_id in event_auth_events:
@@ -2770,14 +2891,14 @@ class FederationHandler(BaseHandler):
             # obviously be empty
             # (b) alternatively, why don't we do it earlier?
             logger.info("Skipping auth_event fetch for outlier")
-            return context
+            return context, auth_events
 
         different_auth = event_auth_events.difference(
             e.event_id for e in auth_events.values()
         )
 
         if not different_auth:
-            return context
+            return context, auth_events
 
         logger.info(
             "auth_events refers to events which are not in our calculated auth "
@@ -2803,7 +2924,7 @@ class FederationHandler(BaseHandler):
                 # XXX: should we reject the event in this case? It feels like we should,
                 # but then shouldn't we also do so if we've failed to fetch any of the
                 # auth events?
-                return context
+                return context, auth_events
 
         # now we state-resolve between our own idea of the auth events, and the remote's
         # idea of them.
@@ -2833,7 +2954,7 @@ class FederationHandler(BaseHandler):
             event, context, auth_events
         )
 
-        return context
+        return context, auth_events
 
     async def _update_context_for_auth_events(
         self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 0961dec5ab..8ffeabacf9 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -824,6 +824,7 @@ class IdentityHandler(BaseHandler):
         room_avatar_url: str,
         room_join_rules: str,
         room_name: str,
+        room_type: Optional[str],
         inviter_display_name: str,
         inviter_avatar_url: str,
         id_access_token: Optional[str] = None,
@@ -843,6 +844,7 @@ class IdentityHandler(BaseHandler):
                 notifications.
             room_join_rules: The join rules of the email (e.g. "public").
             room_name: The m.room.name of the room.
+            room_type: The type of the room from its m.room.create event (e.g "m.space").
             inviter_display_name: The current display name of the
                 inviter.
             inviter_avatar_url: The URL of the inviter's avatar.
@@ -869,6 +871,10 @@ class IdentityHandler(BaseHandler):
             "sender_display_name": inviter_display_name,
             "sender_avatar_url": inviter_avatar_url,
         }
+
+        if room_type is not None:
+            invite_config["org.matrix.msc3288.room_type"] = room_type
+
         # If a custom web client location is available, include it in the request.
         if self._web_client_location:
             invite_config["org.matrix.web_client_location"] = self._web_client_location
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index b9085bbccb..5fd4525700 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -70,7 +70,8 @@ class ReceiptsHandler(BaseHandler):
             )
             if not is_in_room:
                 logger.info(
-                    "Ignoring receipt from %s as we're not in the room",
+                    "Ignoring receipt for room %r from server %s as we're not in the room",
+                    room_id,
                     origin,
                 )
                 continue
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index fae2c098e3..6d433fad41 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -356,6 +356,12 @@ class RoomListHandler(BaseHandler):
         include_all_networks: bool = False,
         third_party_instance_id: Optional[str] = None,
     ) -> JsonDict:
+        """Get the public room list from remote server
+
+        Raises:
+            SynapseError
+        """
+
         if not self.enable_room_list_search:
             return {"chunk": [], "total_room_count_estimate": 0}
 
@@ -395,13 +401,16 @@ class RoomListHandler(BaseHandler):
             limit = None
             since_token = None
 
-        res = await self._get_remote_list_cached(
-            server_name,
-            limit=limit,
-            since_token=since_token,
-            include_all_networks=include_all_networks,
-            third_party_instance_id=third_party_instance_id,
-        )
+        try:
+            res = await self._get_remote_list_cached(
+                server_name,
+                limit=limit,
+                since_token=since_token,
+                include_all_networks=include_all_networks,
+                third_party_instance_id=third_party_instance_id,
+            )
+        except (RequestSendFailed, HttpResponseException):
+            raise SynapseError(502, "Failed to fetch room list")
 
         if search_filter:
             res = {
@@ -423,20 +432,21 @@ class RoomListHandler(BaseHandler):
         include_all_networks: bool = False,
         third_party_instance_id: Optional[str] = None,
     ) -> JsonDict:
+        """Wrapper around FederationClient.get_public_rooms that caches the
+        result.
+        """
+
         repl_layer = self.hs.get_federation_client()
         if search_filter:
             # We can't cache when asking for search
-            try:
-                return await repl_layer.get_public_rooms(
-                    server_name,
-                    limit=limit,
-                    since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            except (RequestSendFailed, HttpResponseException):
-                raise SynapseError(502, "Failed to fetch room list")
+            return await repl_layer.get_public_rooms(
+                server_name,
+                limit=limit,
+                since_token=since_token,
+                search_filter=search_filter,
+                include_all_networks=include_all_networks,
+                third_party_instance_id=third_party_instance_id,
+            )
 
         key = (
             server_name,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 65ad3efa6a..ba13196218 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -19,7 +19,12 @@ from http import HTTPStatus
 from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
 
 from synapse import types
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import (
+    AccountDataTypes,
+    EventContentFields,
+    EventTypes,
+    Membership,
+)
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -1237,6 +1242,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         if room_name_event:
             room_name = room_name_event.content.get("name", "")
 
+        room_type = None
+        room_create_event = room_state.get((EventTypes.Create, ""))
+        if room_create_event:
+            room_type = room_create_event.content.get(EventContentFields.ROOM_TYPE)
+
         room_join_rules = ""
         join_rules_event = room_state.get((EventTypes.JoinRules, ""))
         if join_rules_event:
@@ -1263,6 +1273,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             room_avatar_url=room_avatar_url,
             room_join_rules=room_join_rules,
             room_name=room_name,
+            room_type=room_type,
             inviter_display_name=inviter_display_name,
             inviter_avatar_url=inviter_avatar_url,
             id_access_token=id_access_token,
diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py
index e9f6aef06f..dda9659c11 100644
--- a/synapse/handlers/send_email.py
+++ b/synapse/handlers/send_email.py
@@ -16,7 +16,12 @@ import email.utils
 import logging
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
-from typing import TYPE_CHECKING
+from io import BytesIO
+from typing import TYPE_CHECKING, Optional
+
+from twisted.internet.defer import Deferred
+from twisted.internet.interfaces import IReactorTCP
+from twisted.mail.smtp import ESMTPSenderFactory
 
 from synapse.logging.context import make_deferred_yieldable
 
@@ -26,19 +31,75 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+async def _sendmail(
+    reactor: IReactorTCP,
+    smtphost: str,
+    smtpport: int,
+    from_addr: str,
+    to_addr: str,
+    msg_bytes: bytes,
+    username: Optional[bytes] = None,
+    password: Optional[bytes] = None,
+    require_auth: bool = False,
+    require_tls: bool = False,
+    tls_hostname: Optional[str] = None,
+) -> None:
+    """A simple wrapper around ESMTPSenderFactory, to allow substitution in tests
+
+    Params:
+        reactor: reactor to use to make the outbound connection
+        smtphost: hostname to connect to
+        smtpport: port to connect to
+        from_addr: "From" address for email
+        to_addr: "To" address for email
+        msg_bytes: Message content
+        username: username to authenticate with, if auth is enabled
+        password: password to give when authenticating
+        require_auth: if auth is not offered, fail the request
+        require_tls: if TLS is not offered, fail the reqest
+        tls_hostname: TLS hostname to check for. None to disable TLS.
+    """
+    msg = BytesIO(msg_bytes)
+
+    d: "Deferred[object]" = Deferred()
+
+    factory = ESMTPSenderFactory(
+        username,
+        password,
+        from_addr,
+        to_addr,
+        msg,
+        d,
+        heloFallback=True,
+        requireAuthentication=require_auth,
+        requireTransportSecurity=require_tls,
+        hostname=tls_hostname,
+    )
+
+    # the IReactorTCP interface claims host has to be a bytes, which seems to be wrong
+    reactor.connectTCP(smtphost, smtpport, factory, timeout=30, bindAddress=None)  # type: ignore[arg-type]
+
+    await make_deferred_yieldable(d)
+
+
 class SendEmailHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
 
-        self._sendmail = hs.get_sendmail()
         self._reactor = hs.get_reactor()
 
         self._from = hs.config.email.email_notif_from
         self._smtp_host = hs.config.email.email_smtp_host
         self._smtp_port = hs.config.email.email_smtp_port
-        self._smtp_user = hs.config.email.email_smtp_user
-        self._smtp_pass = hs.config.email.email_smtp_pass
+
+        user = hs.config.email.email_smtp_user
+        self._smtp_user = user.encode("utf-8") if user is not None else None
+        passwd = hs.config.email.email_smtp_pass
+        self._smtp_pass = passwd.encode("utf-8") if passwd is not None else None
         self._require_transport_security = hs.config.email.require_transport_security
+        self._enable_tls = hs.config.email.enable_smtp_tls
+
+        self._sendmail = _sendmail
 
     async def send_email(
         self,
@@ -82,17 +143,16 @@ class SendEmailHandler:
 
         logger.info("Sending email to %s" % email_address)
 
-        await make_deferred_yieldable(
-            self._sendmail(
-                self._smtp_host,
-                raw_from,
-                raw_to,
-                multipart_msg.as_string().encode("utf8"),
-                reactor=self._reactor,
-                port=self._smtp_port,
-                requireAuthentication=self._smtp_user is not None,
-                username=self._smtp_user,
-                password=self._smtp_pass,
-                requireTransportSecurity=self._require_transport_security,
-            )
+        await self._sendmail(
+            self._reactor,
+            self._smtp_host,
+            self._smtp_port,
+            raw_from,
+            raw_to,
+            multipart_msg.as_string().encode("utf8"),
+            username=self._smtp_user,
+            password=self._smtp_pass,
+            require_auth=self._smtp_user is not None,
+            require_tls=self._require_transport_security,
+            tls_hostname=self._smtp_host if self._enable_tls else None,
         )
diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index 5f7d4602bd..2517f278b6 100644
--- a/synapse/handlers/space_summary.py
+++ b/synapse/handlers/space_summary.py
@@ -16,7 +16,17 @@ import itertools
 import logging
 import re
 from collections import deque
-from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+)
 
 import attr
 
@@ -116,20 +126,22 @@ class SpaceSummaryHandler:
             max_children = max_rooms_per_space if processed_rooms else None
 
             if is_in_room:
-                room, events = await self._summarize_local_room(
+                room_entry = await self._summarize_local_room(
                     requester, None, room_id, suggested_only, max_children
                 )
 
+                events: Collection[JsonDict] = []
+                if room_entry:
+                    rooms_result.append(room_entry.room)
+                    events = room_entry.children
+
                 logger.debug(
                     "Query of local room %s returned events %s",
                     room_id,
                     ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
                 )
-
-                if room:
-                    rooms_result.append(room)
             else:
-                fed_rooms, fed_events = await self._summarize_remote_room(
+                fed_rooms = await self._summarize_remote_room(
                     queue_entry,
                     suggested_only,
                     max_children,
@@ -141,12 +153,10 @@ class SpaceSummaryHandler:
                 # user is not permitted see.
                 #
                 # Filter the returned results to only what is accessible to the user.
-                room_ids = set()
                 events = []
-                for room in fed_rooms:
-                    fed_room_id = room.get("room_id")
-                    if not fed_room_id or not isinstance(fed_room_id, str):
-                        continue
+                for room_entry in fed_rooms:
+                    room = room_entry.room
+                    fed_room_id = room_entry.room_id
 
                     # The room should only be included in the summary if:
                     #     a. the user is in the room;
@@ -169,7 +179,9 @@ class SpaceSummaryHandler:
 
                     # Check if the user is a member of any of the allowed spaces
                     # from the response.
-                    allowed_rooms = room.get("allowed_spaces")
+                    allowed_rooms = room.get("allowed_room_ids") or room.get(
+                        "allowed_spaces"
+                    )
                     if (
                         not include_room
                         and allowed_rooms
@@ -188,22 +200,23 @@ class SpaceSummaryHandler:
 
                     # The user can see the room, include it!
                     if include_room:
+                        # Before returning to the client, remove the allowed_room_ids
+                        # and allowed_spaces keys.
+                        room.pop("allowed_room_ids", None)
+                        room.pop("allowed_spaces", None)
+
                         rooms_result.append(room)
-                        room_ids.add(fed_room_id)
+                        events.extend(room_entry.children)
 
                     # All rooms returned don't need visiting again (even if the user
                     # didn't have access to them).
                     processed_rooms.add(fed_room_id)
 
-                for event in fed_events:
-                    if event.get("room_id") in room_ids:
-                        events.append(event)
-
                 logger.debug(
                     "Query of %s returned rooms %s, events %s",
                     room_id,
-                    [room.get("room_id") for room in fed_rooms],
-                    ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in fed_events],
+                    [room_entry.room.get("room_id") for room_entry in fed_rooms],
+                    ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
                 )
 
             # the room we queried may or may not have been returned, but don't process
@@ -230,11 +243,6 @@ class SpaceSummaryHandler:
                 )
                 processed_events.add(ev_key)
 
-        # Before returning to the client, remove the allowed_spaces key for any
-        # rooms.
-        for room in rooms_result:
-            room.pop("allowed_spaces", None)
-
         return {"rooms": rooms_result, "events": events_result}
 
     async def federation_space_summary(
@@ -283,20 +291,20 @@ class SpaceSummaryHandler:
                 # already done this room
                 continue
 
-            logger.debug("Processing room %s", room_id)
-
-            room, events = await self._summarize_local_room(
+            room_entry = await self._summarize_local_room(
                 None, origin, room_id, suggested_only, max_rooms_per_space
             )
 
             processed_rooms.add(room_id)
 
-            if room:
-                rooms_result.append(room)
-                events_result.extend(events)
+            if room_entry:
+                rooms_result.append(room_entry.room)
+                events_result.extend(room_entry.children)
 
-            # add any children to the queue
-            room_queue.extend(edge_event["state_key"] for edge_event in events)
+                # add any children to the queue
+                room_queue.extend(
+                    edge_event["state_key"] for edge_event in room_entry.children
+                )
 
         return {"rooms": rooms_result, "events": events_result}
 
@@ -307,7 +315,7 @@ class SpaceSummaryHandler:
         room_id: str,
         suggested_only: bool,
         max_children: Optional[int],
-    ) -> Tuple[Optional[JsonDict], Sequence[JsonDict]]:
+    ) -> Optional["_RoomEntry"]:
         """
         Generate a room entry and a list of event entries for a given room.
 
@@ -326,21 +334,16 @@ class SpaceSummaryHandler:
                 to a server-set limit.
 
         Returns:
-            A tuple of:
-                The room information, if the room should be returned to the
-                user. None, otherwise.
-
-                An iterable of the sorted children events. This may be limited
-                to a maximum size or may include all children.
+            A room entry if the room should be returned. None, otherwise.
         """
         if not await self._is_room_accessible(room_id, requester, origin):
-            return None, ()
+            return None
 
-        room_entry = await self._build_room_entry(room_id)
+        room_entry = await self._build_room_entry(room_id, for_federation=bool(origin))
 
         # If the room is not a space, return just the room information.
         if room_entry.get("room_type") != RoomTypes.SPACE:
-            return room_entry, ()
+            return _RoomEntry(room_id, room_entry)
 
         # Otherwise, look for child rooms/spaces.
         child_events = await self._get_child_events(room_id)
@@ -363,7 +366,7 @@ class SpaceSummaryHandler:
                 )
             )
 
-        return room_entry, events_result
+        return _RoomEntry(room_id, room_entry, events_result)
 
     async def _summarize_remote_room(
         self,
@@ -371,7 +374,7 @@ class SpaceSummaryHandler:
         suggested_only: bool,
         max_children: Optional[int],
         exclude_rooms: Iterable[str],
-    ) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
+    ) -> Iterable["_RoomEntry"]:
         """
         Request room entries and a list of event entries for a given room by querying a remote server.
 
@@ -386,11 +389,7 @@ class SpaceSummaryHandler:
                 Rooms IDs which do not need to be summarized.
 
         Returns:
-            A tuple of:
-                An iterable of rooms.
-
-                An iterable of the sorted children events. This may be limited
-                to a maximum size or may include all children.
+            An iterable of room entries.
         """
         room_id = room.room_id
         logger.info("Requesting summary for %s via %s", room_id, room.via)
@@ -414,11 +413,30 @@ class SpaceSummaryHandler:
                 e,
                 exc_info=logger.isEnabledFor(logging.DEBUG),
             )
-            return (), ()
+            return ()
+
+        # Group the events by their room.
+        children_by_room: Dict[str, List[JsonDict]] = {}
+        for ev in res.events:
+            if ev.event_type == EventTypes.SpaceChild:
+                children_by_room.setdefault(ev.room_id, []).append(ev.data)
+
+        # Generate the final results.
+        results = []
+        for fed_room in res.rooms:
+            fed_room_id = fed_room.get("room_id")
+            if not fed_room_id or not isinstance(fed_room_id, str):
+                continue
 
-        return res.rooms, tuple(
-            ev.data for ev in res.events if ev.event_type == EventTypes.SpaceChild
-        )
+            results.append(
+                _RoomEntry(
+                    fed_room_id,
+                    fed_room,
+                    children_by_room.get(fed_room_id, []),
+                )
+            )
+
+        return results
 
     async def _is_room_accessible(
         self, room_id: str, requester: Optional[str], origin: Optional[str]
@@ -532,8 +550,18 @@ class SpaceSummaryHandler:
         )
         return False
 
-    async def _build_room_entry(self, room_id: str) -> JsonDict:
-        """Generate en entry suitable for the 'rooms' list in the summary response"""
+    async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDict:
+        """
+        Generate en entry suitable for the 'rooms' list in the summary response.
+
+        Args:
+            room_id: The room ID to summarize.
+            for_federation: True if this is a summary requested over federation
+                (which includes additional fields).
+
+        Returns:
+            The JSON dictionary for the room.
+        """
         stats = await self._store.get_room_with_stats(room_id)
 
         # currently this should be impossible because we call
@@ -546,15 +574,6 @@ class SpaceSummaryHandler:
             current_state_ids[(EventTypes.Create, "")]
         )
 
-        room_version = await self._store.get_room_version(room_id)
-        allowed_rooms = None
-        if await self._event_auth_handler.has_restricted_join_rules(
-            current_state_ids, room_version
-        ):
-            allowed_rooms = await self._event_auth_handler.get_rooms_that_allow_join(
-                current_state_ids
-            )
-
         entry = {
             "room_id": stats["room_id"],
             "name": stats["name"],
@@ -569,9 +588,25 @@ class SpaceSummaryHandler:
             "guest_can_join": stats["guest_access"] == "can_join",
             "creation_ts": create_event.origin_server_ts,
             "room_type": create_event.content.get(EventContentFields.ROOM_TYPE),
-            "allowed_spaces": allowed_rooms,
         }
 
+        # Federation requests need to provide additional information so the
+        # requested server is able to filter the response appropriately.
+        if for_federation:
+            room_version = await self._store.get_room_version(room_id)
+            if await self._event_auth_handler.has_restricted_join_rules(
+                current_state_ids, room_version
+            ):
+                allowed_rooms = (
+                    await self._event_auth_handler.get_rooms_that_allow_join(
+                        current_state_ids
+                    )
+                )
+                if allowed_rooms:
+                    entry["allowed_room_ids"] = allowed_rooms
+                    # TODO Remove this key once the API is stable.
+                    entry["allowed_spaces"] = allowed_rooms
+
         # Filter out Nones – rather omit the field altogether
         room_entry = {k: v for k, v in entry.items() if v is not None}
 
@@ -606,10 +641,21 @@ class SpaceSummaryHandler:
         return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
 
 
-@attr.s(frozen=True, slots=True)
+@attr.s(frozen=True, slots=True, auto_attribs=True)
 class _RoomQueueEntry:
-    room_id = attr.ib(type=str)
-    via = attr.ib(type=Sequence[str])
+    room_id: str
+    via: Sequence[str]
+
+
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class _RoomEntry:
+    room_id: str
+    # The room summary for this room.
+    room: JsonDict
+    # An iterable of the sorted, stripped children events for children of this room.
+    #
+    # This may not include all children.
+    children: Collection[JsonDict] = ()
 
 
 def _has_valid_via(e: EventBase) -> bool:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f30bfcc93c..590642f510 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -269,14 +269,22 @@ class SyncHandler:
         self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
-        self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
-            hs.get_clock(), "sync"
-        )
         self.state = hs.get_state_handler()
         self.auth = hs.get_auth()
         self.storage = hs.get_storage()
         self.state_store = self.storage.state
 
+        # TODO: flush cache entries on subsequent sync request.
+        #    Once we get the next /sync request (ie, one with the same access token
+        #    that sets 'since' to 'next_batch'), we know that device won't need a
+        #    cached result any more, and we could flush the entry from the cache to save
+        #    memory.
+        self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
+            hs.get_clock(),
+            "sync",
+            timeout_ms=hs.config.caches.sync_response_cache_duration,
+        )
+
         # ExpiringCache((User, Device)) -> LruCache(user_id => event_id)
         self.lazy_loaded_members_cache: ExpiringCache[
             Tuple[str, Optional[str]], LruCache[str, str]
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0cb651a400..a97c448595 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -335,7 +335,8 @@ class TypingWriterHandler(FollowerTypingHandler):
         )
         if not is_in_room:
             logger.info(
-                "Ignoring typing update from %s as we're not in the room",
+                "Ignoring typing update for room %r from server %s as we're not in the room",
+                room_id,
                 origin,
             )
             return
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 473812b8e2..1cc13fc97b 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -45,7 +45,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.roommember import ProfileInfo
 from synapse.storage.state import StateFilter
-from synapse.types import JsonDict, Requester, UserID, create_requester
+from synapse.types import JsonDict, Requester, UserID, UserInfo, create_requester
 from synapse.util import Clock
 from synapse.util.caches.descriptors import cached
 
@@ -174,6 +174,16 @@ class ModuleApi:
         """The application name configured in the homeserver's configuration."""
         return self._hs.config.email.email_app_name
 
+    async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
+        """Get user info by user_id
+
+        Args:
+            user_id: Fully qualified user id.
+        Returns:
+            UserInfo object if a user was found, otherwise None
+        """
+        return await self._store.get_userinfo_by_id(user_id)
+
     async def get_user_by_req(
         self,
         req: SynapseRequest,
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 502a917588..f887970b76 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -23,7 +23,6 @@ from synapse.api.constants import EventContentFields, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
-    HttpResponseException,
     InvalidClientCredentialsError,
     ShadowBanError,
     SynapseError,
@@ -458,6 +457,9 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
                 "state_key": state_event["state_key"],
             }
 
+            # Mark all events as historical
+            event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
+
             # Make the state events float off on their own
             fake_prev_event_id = "$" + random_string(43)
 
@@ -562,7 +564,10 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
             "type": EventTypes.MSC2716_CHUNK,
             "sender": requester.user.to_string(),
             "room_id": room_id,
-            "content": {EventContentFields.MSC2716_CHUNK_ID: chunk_id_to_connect_to},
+            "content": {
+                EventContentFields.MSC2716_CHUNK_ID: chunk_id_to_connect_to,
+                EventContentFields.MSC2716_HISTORICAL: True,
+            },
             # Since the chunk event is put at the end of the chunk,
             # where the newest-in-time event is, copy the origin_server_ts from
             # the last event we're inserting
@@ -589,10 +594,6 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
         for ev in events_to_create:
             assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
 
-            # Mark all events as historical
-            # This has important semantics within the Synapse internals to backfill properly
-            ev["content"][EventContentFields.MSC2716_HISTORICAL] = True
-
             event_dict = {
                 "type": ev["type"],
                 "origin_server_ts": ev["origin_server_ts"],
@@ -602,6 +603,9 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
                 "prev_events": prev_event_ids.copy(),
             }
 
+            # Mark all events as historical
+            event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
+
             event, context = await self.event_creation_handler.create_event(
                 await self._create_requester_for_user_id_from_app_service(
                     ev["sender"], requester.app_service
@@ -778,12 +782,9 @@ class PublicRoomListRestServlet(TransactionRestServlet):
                     Codes.INVALID_PARAM,
                 )
 
-            try:
-                data = await handler.get_remote_public_room_list(
-                    server, limit=limit, since_token=since_token
-                )
-            except HttpResponseException as e:
-                raise e.to_synapse_error()
+            data = await handler.get_remote_public_room_list(
+                server, limit=limit, since_token=since_token
+            )
         else:
             data = await handler.get_local_public_room_list(
                 limit=limit, since_token=since_token
@@ -831,17 +832,15 @@ class PublicRoomListRestServlet(TransactionRestServlet):
                     Codes.INVALID_PARAM,
                 )
 
-            try:
-                data = await handler.get_remote_public_room_list(
-                    server,
-                    limit=limit,
-                    since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            except HttpResponseException as e:
-                raise e.to_synapse_error()
+            data = await handler.get_remote_public_room_list(
+                server,
+                limit=limit,
+                since_token=since_token,
+                search_filter=search_filter,
+                include_all_networks=include_all_networks,
+                third_party_instance_id=third_party_instance_id,
+            )
+
         else:
             data = await handler.get_local_public_room_list(
                 limit=limit,
diff --git a/synapse/server.py b/synapse/server.py
index 095dba9ad0..6c867f0f47 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -34,8 +34,6 @@ from typing import (
 )
 
 import twisted.internet.tcp
-from twisted.internet import defer
-from twisted.mail.smtp import sendmail
 from twisted.web.iweb import IPolicyForHTTPS
 from twisted.web.resource import IResource
 
@@ -443,10 +441,6 @@ class HomeServer(metaclass=abc.ABCMeta):
         return RoomShutdownHandler(self)
 
     @cache_in_self
-    def get_sendmail(self) -> Callable[..., defer.Deferred]:
-        return sendmail
-
-    @cache_in_self
     def get_state_handler(self) -> StateHandler:
         return StateHandler(self)
 
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index c8015a3848..95d2caff62 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -941,13 +941,13 @@ class DatabasePool:
 
         `lock` should generally be set to True (the default), but can be set
         to False if either of the following are true:
-
-        * there is a UNIQUE INDEX on the key columns. In this case a conflict
-          will cause an IntegrityError in which case this function will retry
-          the update.
-
-        * we somehow know that we are the only thread which will be updating
-          this table.
+            1. there is a UNIQUE INDEX on the key columns. In this case a conflict
+            will cause an IntegrityError in which case this function will retry
+            the update.
+            2. we somehow know that we are the only thread which will be updating
+            this table.
+        As an additional note, this parameter only matters for old SQLite versions
+        because we will use native upserts otherwise.
 
         Args:
             table: The table to upsert into
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 1edc96042b..1f0a39eac4 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -755,81 +755,145 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
         """
 
         @trace
-        def _claim_e2e_one_time_keys(txn):
-            sql = (
-                "SELECT key_id, key_json FROM e2e_one_time_keys_json"
-                " WHERE user_id = ? AND device_id = ? AND algorithm = ?"
-                " LIMIT 1"
+        def _claim_e2e_one_time_key_simple(
+            txn, user_id: str, device_id: str, algorithm: str
+        ) -> Optional[Tuple[str, str]]:
+            """Claim OTK for device for DBs that don't support RETURNING.
+
+            Returns:
+                A tuple of key name (algorithm + key ID) and key JSON, if an
+                OTK was found.
+            """
+
+            sql = """
+                SELECT key_id, key_json FROM e2e_one_time_keys_json
+                WHERE user_id = ? AND device_id = ? AND algorithm = ?
+                LIMIT 1
+            """
+
+            txn.execute(sql, (user_id, device_id, algorithm))
+            otk_row = txn.fetchone()
+            if otk_row is None:
+                return None
+
+            key_id, key_json = otk_row
+
+            self.db_pool.simple_delete_one_txn(
+                txn,
+                table="e2e_one_time_keys_json",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                    "algorithm": algorithm,
+                    "key_id": key_id,
+                },
             )
-            fallback_sql = (
-                "SELECT key_id, key_json, used FROM e2e_fallback_keys_json"
-                " WHERE user_id = ? AND device_id = ? AND algorithm = ?"
-                " LIMIT 1"
+            self._invalidate_cache_and_stream(
+                txn, self.count_e2e_one_time_keys, (user_id, device_id)
             )
-            result = {}
-            delete = []
-            used_fallbacks = []
-            for user_id, device_id, algorithm in query_list:
-                user_result = result.setdefault(user_id, {})
-                device_result = user_result.setdefault(device_id, {})
-                txn.execute(sql, (user_id, device_id, algorithm))
-                otk_row = txn.fetchone()
-                if otk_row is not None:
-                    key_id, key_json = otk_row
-                    device_result[algorithm + ":" + key_id] = key_json
-                    delete.append((user_id, device_id, algorithm, key_id))
-                else:
-                    # no one-time key available, so see if there's a fallback
-                    # key
-                    txn.execute(fallback_sql, (user_id, device_id, algorithm))
-                    fallback_row = txn.fetchone()
-                    if fallback_row is not None:
-                        key_id, key_json, used = fallback_row
-                        device_result[algorithm + ":" + key_id] = key_json
-                        if not used:
-                            used_fallbacks.append(
-                                (user_id, device_id, algorithm, key_id)
-                            )
-
-            # drop any one-time keys that were claimed
-            sql = (
-                "DELETE FROM e2e_one_time_keys_json"
-                " WHERE user_id = ? AND device_id = ? AND algorithm = ?"
-                " AND key_id = ?"
+
+            return f"{algorithm}:{key_id}", key_json
+
+        @trace
+        def _claim_e2e_one_time_key_returning(
+            txn, user_id: str, device_id: str, algorithm: str
+        ) -> Optional[Tuple[str, str]]:
+            """Claim OTK for device for DBs that support RETURNING.
+
+            Returns:
+                A tuple of key name (algorithm + key ID) and key JSON, if an
+                OTK was found.
+            """
+
+            # We can use RETURNING to do the fetch and DELETE in once step.
+            sql = """
+                DELETE FROM e2e_one_time_keys_json
+                WHERE user_id = ? AND device_id = ? AND algorithm = ?
+                    AND key_id IN (
+                        SELECT key_id FROM e2e_one_time_keys_json
+                        WHERE user_id = ? AND device_id = ? AND algorithm = ?
+                        LIMIT 1
+                    )
+                RETURNING key_id, key_json
+            """
+
+            txn.execute(
+                sql, (user_id, device_id, algorithm, user_id, device_id, algorithm)
             )
-            for user_id, device_id, algorithm, key_id in delete:
-                log_kv(
-                    {
-                        "message": "Executing claim e2e_one_time_keys transaction on database."
-                    }
-                )
-                txn.execute(sql, (user_id, device_id, algorithm, key_id))
-                log_kv({"message": "finished executing and invalidating cache"})
-                self._invalidate_cache_and_stream(
-                    txn, self.count_e2e_one_time_keys, (user_id, device_id)
+            otk_row = txn.fetchone()
+            if otk_row is None:
+                return None
+
+            key_id, key_json = otk_row
+            return f"{algorithm}:{key_id}", key_json
+
+        results = {}
+        for user_id, device_id, algorithm in query_list:
+            if self.database_engine.supports_returning:
+                # If we support RETURNING clause we can use a single query that
+                # allows us to use autocommit mode.
+                _claim_e2e_one_time_key = _claim_e2e_one_time_key_returning
+                db_autocommit = True
+            else:
+                _claim_e2e_one_time_key = _claim_e2e_one_time_key_simple
+                db_autocommit = False
+
+            row = await self.db_pool.runInteraction(
+                "claim_e2e_one_time_keys",
+                _claim_e2e_one_time_key,
+                user_id,
+                device_id,
+                algorithm,
+                db_autocommit=db_autocommit,
+            )
+            if row:
+                device_results = results.setdefault(user_id, {}).setdefault(
+                    device_id, {}
                 )
-            # mark fallback keys as used
-            for user_id, device_id, algorithm, key_id in used_fallbacks:
-                self.db_pool.simple_update_txn(
-                    txn,
-                    "e2e_fallback_keys_json",
-                    {
+                device_results[row[0]] = row[1]
+                continue
+
+            # No one-time key available, so see if there's a fallback
+            # key
+            row = await self.db_pool.simple_select_one(
+                table="e2e_fallback_keys_json",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                    "algorithm": algorithm,
+                },
+                retcols=("key_id", "key_json", "used"),
+                desc="_get_fallback_key",
+                allow_none=True,
+            )
+            if row is None:
+                continue
+
+            key_id = row["key_id"]
+            key_json = row["key_json"]
+            used = row["used"]
+
+            # Mark fallback key as used if not already.
+            if not used:
+                await self.db_pool.simple_update_one(
+                    table="e2e_fallback_keys_json",
+                    keyvalues={
                         "user_id": user_id,
                         "device_id": device_id,
                         "algorithm": algorithm,
                         "key_id": key_id,
                     },
-                    {"used": True},
+                    updatevalues={"used": True},
+                    desc="_get_fallback_key_set_used",
                 )
-                self._invalidate_cache_and_stream(
-                    txn, self.get_e2e_unused_fallback_key_types, (user_id, device_id)
+                await self.invalidate_cache_and_stream(
+                    "get_e2e_unused_fallback_key_types", (user_id, device_id)
                 )
 
-            return result
+            device_results = results.setdefault(user_id, {}).setdefault(device_id, {})
+            device_results[f"{algorithm}:{key_id}"] = key_json
 
-        return await self.db_pool.runInteraction(
-            "claim_e2e_one_time_keys", _claim_e2e_one_time_keys
-        )
+        return results
 
 
 class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 44018c1c31..bddf5ef192 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -671,27 +671,97 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
-    async def get_oldest_events_with_depth_in_room(self, room_id):
+    async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
+        """Gets the oldest events(backwards extremities) in the room along with the
+        aproximate depth.
+
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        event extremeties. If the current depth is close to the depth of given
+        oldest event, we can trigger a backfill.
+
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
+            # Assemble a dictionary with event_id -> depth for the oldest events
+            # we know of in the room. Backwards extremeties are the oldest
+            # events we know of in the room but we only know of them because
+            # some other event referenced them by prev_event and aren't peristed
+            # in our database yet (meaning we don't know their depth
+            # specifically). So we need to look for the aproximate depth from
+            # the events connected to the current backwards extremeties.
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM events as e
+                /**
+                 * Get the edge connections from the event_edges table
+                 * so we can see whether this event's prev_events points
+                 * to a backward extremity in the next join.
+                 */
+                INNER JOIN event_edges as g
+                ON g.event_id = e.event_id
+                /**
+                 * We find the "oldest" events in the room by looking for
+                 * events connected to backwards extremeties (oldest events
+                 * in the room that we know of so far).
+                 */
+                INNER JOIN event_backward_extremities as b
+                ON g.prev_event_id = b.event_id
+                WHERE b.room_id = ? AND g.is_state is ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id, False))
+
+            return dict(txn)
+
         return await self.db_pool.runInteraction(
-            "get_oldest_events_with_depth_in_room",
-            self.get_oldest_events_with_depth_in_room_txn,
+            "get_oldest_event_ids_with_depth_in_room",
+            get_oldest_event_ids_with_depth_in_room_txn,
             room_id,
         )
 
-    def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
-        sql = (
-            "SELECT b.event_id, MAX(e.depth) FROM events as e"
-            " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id"
-            " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id"
-            " WHERE b.room_id = ? AND g.is_state is ?"
-            " GROUP BY b.event_id"
-        )
+    async def get_insertion_event_backwards_extremities_in_room(
+        self, room_id
+    ) -> Dict[str, int]:
+        """Get the insertion events we know about that we haven't backfilled yet.
 
-        txn.execute(sql, (room_id, False))
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        insertion event. If the current depth is close to the depth of given
+        insertion event, we can trigger a backfill.
 
-        return dict(txn)
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+                /* We only want insertion events that are also marked as backwards extremities */
+                INNER JOIN insertion_event_extremities as b USING (event_id)
+                /* Get the depth of the insertion event from the events table */
+                INNER JOIN events AS e USING (event_id)
+                WHERE b.room_id = ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id,))
+
+            return dict(txn)
+
+        return await self.db_pool.runInteraction(
+            "get_insertion_event_backwards_extremities_in_room",
+            get_insertion_event_backwards_extremities_in_room_txn,
+            room_id,
+        )
 
     async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
         """Returns the event ID and depth for the event that has the max depth from a set of event IDs
@@ -1041,7 +1111,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                     if row[1] not in event_results:
                         queue.put((-row[0], row[1]))
 
-            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
             prev_event_id_results = txn.fetchall()
             logger.debug(
@@ -1136,6 +1205,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
+    async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
+        await self.db_pool.simple_upsert(
+            table="insertion_event_extremities",
+            keyvalues={"event_id": event_id},
+            values={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            insertion_values={},
+            desc="insert_insertion_extremity",
+            lock=False,
+        )
+
     async def insert_received_event_to_staging(
         self, origin: str, event: EventBase
     ) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 86baf397fb..40b53274fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1845,6 +1845,18 @@ class PersistEventsStore:
             },
         )
 
+        # When we receive an event with a `chunk_id` referencing the
+        # `next_chunk_id` of the insertion event, we can remove it from the
+        # `insertion_event_extremities` table.
+        sql = """
+            DELETE FROM insertion_event_extremities WHERE event_id IN (
+                SELECT event_id FROM insertion_events
+                WHERE next_chunk_id = ?
+            )
+        """
+
+        txn.execute(sql, (chunk_id,))
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
@@ -2101,15 +2113,17 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
+        # From the events passed in, add all of the prev events as backwards extremities.
+        # Ignore any events that are already backwards extrems or outliers.
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
+            "   SELECT 1 FROM event_backward_extremities"
+            "   WHERE event_id = ? AND room_id = ?"
             " )"
             " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
+            "   SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            "   AND outlier = ?"
             " )"
         )
 
@@ -2123,6 +2137,8 @@ class PersistEventsStore:
             ],
         )
 
+        # Delete all these events that we've already fetched and now know that their
+        # prev events are the new backwards extremeties.
         query = (
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 3c86adab56..375463e4e9 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -14,7 +14,6 @@
 
 import logging
 import threading
-from collections import namedtuple
 from typing import (
     Collection,
     Container,
@@ -27,6 +26,7 @@ from typing import (
     overload,
 )
 
+import attr
 from constantly import NamedConstant, Names
 from typing_extensions import Literal
 
@@ -42,7 +42,11 @@ from synapse.api.room_versions import (
 from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
 from synapse.events.utils import prune_event
-from synapse.logging.context import PreserveLoggingContext, current_context
+from synapse.logging.context import (
+    PreserveLoggingContext,
+    current_context,
+    make_deferred_yieldable,
+)
 from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
@@ -56,6 +60,8 @@ from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
+from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
@@ -74,7 +80,10 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
-_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+@attr.s(slots=True, auto_attribs=True)
+class _EventCacheEntry:
+    event: EventBase
+    redacted_event: Optional[EventBase]
 
 
 class EventRedactBehaviour(Names):
@@ -161,6 +170,13 @@ class EventsWorkerStore(SQLBaseStore):
             max_size=hs.config.caches.event_cache_size,
         )
 
+        # Map from event ID to a deferred that will result in a map from event
+        # ID to cache entry. Note that the returned dict may not have the
+        # requested event in it if the event isn't in the DB.
+        self._current_event_fetches: Dict[
+            str, ObservableDeferred[Dict[str, _EventCacheEntry]]
+        ] = {}
+
         self._event_fetch_lock = threading.Condition()
         self._event_fetch_list = []
         self._event_fetch_ongoing = 0
@@ -476,7 +492,9 @@ class EventsWorkerStore(SQLBaseStore):
 
         return events
 
-    async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
+    async def _get_events_from_cache_or_db(
+        self, event_ids: Iterable[str], allow_rejected: bool = False
+    ) -> Dict[str, _EventCacheEntry]:
         """Fetch a bunch of events from the cache or the database.
 
         If events are pulled from the database, they will be cached for future lookups.
@@ -485,53 +503,107 @@ class EventsWorkerStore(SQLBaseStore):
 
         Args:
 
-            event_ids (Iterable[str]): The event_ids of the events to fetch
+            event_ids: The event_ids of the events to fetch
 
-            allow_rejected (bool): Whether to include rejected events. If False,
+            allow_rejected: Whether to include rejected events. If False,
                 rejected events are omitted from the response.
 
         Returns:
-            Dict[str, _EventCacheEntry]:
-                map from event id to result
+            map from event id to result
         """
         event_entry_map = self._get_events_from_cache(
-            event_ids, allow_rejected=allow_rejected
+            event_ids,
         )
 
-        missing_events_ids = [e for e in event_ids if e not in event_entry_map]
+        missing_events_ids = {e for e in event_ids if e not in event_entry_map}
+
+        # We now look up if we're already fetching some of the events in the DB,
+        # if so we wait for those lookups to finish instead of pulling the same
+        # events out of the DB multiple times.
+        already_fetching: Dict[str, defer.Deferred] = {}
+
+        for event_id in missing_events_ids:
+            deferred = self._current_event_fetches.get(event_id)
+            if deferred is not None:
+                # We're already pulling the event out of the DB. Add the deferred
+                # to the collection of deferreds to wait on.
+                already_fetching[event_id] = deferred.observe()
+
+        missing_events_ids.difference_update(already_fetching)
 
         if missing_events_ids:
             log_ctx = current_context()
             log_ctx.record_event_fetch(len(missing_events_ids))
 
+            # Add entries to `self._current_event_fetches` for each event we're
+            # going to pull from the DB. We use a single deferred that resolves
+            # to all the events we pulled from the DB (this will result in this
+            # function returning more events than requested, but that can happen
+            # already due to `_get_events_from_db`).
+            fetching_deferred: ObservableDeferred[
+                Dict[str, _EventCacheEntry]
+            ] = ObservableDeferred(defer.Deferred())
+            for event_id in missing_events_ids:
+                self._current_event_fetches[event_id] = fetching_deferred
+
             # Note that _get_events_from_db is also responsible for turning db rows
             # into FrozenEvents (via _get_event_from_row), which involves seeing if
             # the events have been redacted, and if so pulling the redaction event out
             # of the database to check it.
             #
-            missing_events = await self._get_events_from_db(
-                missing_events_ids, allow_rejected=allow_rejected
-            )
+            try:
+                missing_events = await self._get_events_from_db(
+                    missing_events_ids,
+                )
 
-            event_entry_map.update(missing_events)
+                event_entry_map.update(missing_events)
+            except Exception as e:
+                with PreserveLoggingContext():
+                    fetching_deferred.errback(e)
+                raise e
+            finally:
+                # Ensure that we mark these events as no longer being fetched.
+                for event_id in missing_events_ids:
+                    self._current_event_fetches.pop(event_id, None)
+
+            with PreserveLoggingContext():
+                fetching_deferred.callback(missing_events)
+
+        if already_fetching:
+            # Wait for the other event requests to finish and add their results
+            # to ours.
+            results = await make_deferred_yieldable(
+                defer.gatherResults(
+                    already_fetching.values(),
+                    consumeErrors=True,
+                )
+            ).addErrback(unwrapFirstError)
+
+            for result in results:
+                event_entry_map.update(result)
+
+        if not allow_rejected:
+            event_entry_map = {
+                event_id: entry
+                for event_id, entry in event_entry_map.items()
+                if not entry.event.rejected_reason
+            }
 
         return event_entry_map
 
     def _invalidate_get_event_cache(self, event_id):
         self._get_event_cache.invalidate((event_id,))
 
-    def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
-        """Fetch events from the caches
+    def _get_events_from_cache(
+        self, events: Iterable[str], update_metrics: bool = True
+    ) -> Dict[str, _EventCacheEntry]:
+        """Fetch events from the caches.
 
-        Args:
-            events (Iterable[str]): list of event_ids to fetch
-            allow_rejected (bool): Whether to return events that were rejected
-            update_metrics (bool): Whether to update the cache hit ratio metrics
+        May return rejected events.
 
-        Returns:
-            dict of event_id -> _EventCacheEntry for each event_id in cache. If
-            allow_rejected is `False` then there will still be an entry but it
-            will be `None`
+        Args:
+            events: list of event_ids to fetch
+            update_metrics: Whether to update the cache hit ratio metrics
         """
         event_map = {}
 
@@ -542,10 +614,7 @@ class EventsWorkerStore(SQLBaseStore):
             if not ret:
                 continue
 
-            if allow_rejected or not ret.event.rejected_reason:
-                event_map[event_id] = ret
-            else:
-                event_map[event_id] = None
+            event_map[event_id] = ret
 
         return event_map
 
@@ -672,23 +741,23 @@ class EventsWorkerStore(SQLBaseStore):
                 with PreserveLoggingContext():
                     self.hs.get_reactor().callFromThread(fire, event_list, e)
 
-    async def _get_events_from_db(self, event_ids, allow_rejected=False):
+    async def _get_events_from_db(
+        self, event_ids: Iterable[str]
+    ) -> Dict[str, _EventCacheEntry]:
         """Fetch a bunch of events from the database.
 
+        May return rejected events.
+
         Returned events will be added to the cache for future lookups.
 
         Unknown events are omitted from the response.
 
         Args:
-            event_ids (Iterable[str]): The event_ids of the events to fetch
-
-            allow_rejected (bool): Whether to include rejected events. If False,
-                rejected events are omitted from the response.
+            event_ids: The event_ids of the events to fetch
 
         Returns:
-            Dict[str, _EventCacheEntry]:
-                map from event id to result. May return extra events which
-                weren't asked for.
+            map from event id to result. May return extra events which
+            weren't asked for.
         """
         fetched_events = {}
         events_to_fetch = event_ids
@@ -717,9 +786,6 @@ class EventsWorkerStore(SQLBaseStore):
 
             rejected_reason = row["rejected_reason"]
 
-            if not allow_rejected and rejected_reason:
-                continue
-
             # If the event or metadata cannot be parsed, log the error and act
             # as if the event is unknown.
             try:
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 6ad1a0cf7f..14670c2881 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -29,7 +29,7 @@ from synapse.storage.databases.main.stats import StatsStore
 from synapse.storage.types import Connection, Cursor
 from synapse.storage.util.id_generators import IdGenerator
 from synapse.storage.util.sequence import build_sequence_generator
-from synapse.types import UserID
+from synapse.types import UserID, UserInfo
 from synapse.util.caches.descriptors import cached
 
 if TYPE_CHECKING:
@@ -146,6 +146,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
 
     @cached()
     async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
+        """Deprecated: use get_userinfo_by_id instead"""
         return await self.db_pool.simple_select_one(
             table="users",
             keyvalues={"name": user_id},
@@ -166,6 +167,33 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
             desc="get_user_by_id",
         )
 
+    async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
+        """Get a UserInfo object for a user by user ID.
+
+        Note! Currently uses the cache of `get_user_by_id`. Once that deprecated method is removed,
+        this method should be cached.
+
+        Args:
+             user_id: The user to fetch user info for.
+        Returns:
+            `UserInfo` object if user found, otherwise `None`.
+        """
+        user_data = await self.get_user_by_id(user_id)
+        if not user_data:
+            return None
+        return UserInfo(
+            appservice_id=user_data["appservice_id"],
+            consent_server_notice_sent=user_data["consent_server_notice_sent"],
+            consent_version=user_data["consent_version"],
+            creation_ts=user_data["creation_ts"],
+            is_admin=bool(user_data["admin"]),
+            is_deactivated=bool(user_data["deactivated"]),
+            is_guest=bool(user_data["is_guest"]),
+            is_shadow_banned=bool(user_data["shadow_banned"]),
+            user_id=UserID.from_string(user_data["name"]),
+            user_type=user_data["user_type"],
+        )
+
     async def is_trial_user(self, user_id: str) -> bool:
         """Checks if user is in the "trial" period, i.e. within the first
         N days of registration defined by `mau_trial_days` config
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 68f1b40ea6..e8157ba3d4 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -629,14 +629,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # We don't update the event cache hit ratio as it completely throws off
         # the hit ratio counts. After all, we don't populate the cache if we
         # miss it here
-        event_map = self._get_events_from_cache(
-            member_event_ids, allow_rejected=False, update_metrics=False
-        )
+        event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)
 
         missing_member_event_ids = []
         for event_id in member_event_ids:
             ev_entry = event_map.get(event_id)
-            if ev_entry:
+            if ev_entry and not ev_entry.event.rejected_reason:
                 if ev_entry.event.membership == Membership.JOIN:
                     users_in_room[ev_entry.event.state_key] = ProfileInfo(
                         display_name=ev_entry.event.content.get("displayname", None),
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 36340a652a..fd4dd67d91 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 61
+SCHEMA_VERSION = 62
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
diff --git a/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
new file mode 100644
index 0000000000..b731ef284a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
@@ -0,0 +1,24 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Add a table that keeps track of which "insertion" events need to be backfilled
+CREATE TABLE IF NOT EXISTS insertion_event_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
+CREATE INDEX IF NOT EXISTS insertion_event_extremities_room_id ON insertion_event_extremities(room_id);
diff --git a/synapse/types.py b/synapse/types.py
index 429bb013d2..80fa903c4b 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -751,3 +751,32 @@ def get_verify_key_from_cross_signing_key(key_info):
     # and return that one key
     for key_id, key_data in keys.items():
         return (key_id, decode_verify_key_bytes(key_id, decode_base64(key_data)))
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class UserInfo:
+    """Holds information about a user. Result of get_userinfo_by_id.
+
+    Attributes:
+        user_id:  ID of the user.
+        appservice_id:  Application service ID that created this user.
+        consent_server_notice_sent:  Version of policy documents the user has been sent.
+        consent_version:  Version of policy documents the user has consented to.
+        creation_ts:  Creation timestamp of the user.
+        is_admin:  True if the user is an admin.
+        is_deactivated:  True if the user has been deactivated.
+        is_guest:  True if the user is a guest user.
+        is_shadow_banned:  True if the user has been shadow-banned.
+        user_type:  User type (None for normal user, 'support' and 'bot' other options).
+    """
+
+    user_id: UserID
+    appservice_id: Optional[int]
+    consent_server_notice_sent: Optional[str]
+    consent_version: Optional[str]
+    user_type: Optional[str]
+    creation_ts: int
+    is_admin: bool
+    is_deactivated: bool
+    is_guest: bool
+    is_shadow_banned: bool
diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py
deleted file mode 100644
index abc12f0837..0000000000
--- a/synapse/util/jsonobject.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class JsonEncodedObject:
-    """A common base class for defining protocol units that are represented
-    as JSON.
-
-    Attributes:
-        unrecognized_keys (dict): A dict containing all the key/value pairs we
-            don't recognize.
-    """
-
-    valid_keys = []  # keys we will store
-    """A list of strings that represent keys we know about
-    and can handle. If we have values for these keys they will be
-    included in the `dictionary` instance variable.
-    """
-
-    internal_keys = []  # keys to ignore while building dict
-    """A list of strings that should *not* be encoded into JSON.
-    """
-
-    required_keys = []
-    """A list of strings that we require to exist. If they are not given upon
-    construction it raises an exception.
-    """
-
-    def __init__(self, **kwargs):
-        """Takes the dict of `kwargs` and loads all keys that are *valid*
-        (i.e., are included in the `valid_keys` list) into the dictionary`
-        instance variable.
-
-        Any keys that aren't recognized are added to the `unrecognized_keys`
-        attribute.
-
-        Args:
-            **kwargs: Attributes associated with this protocol unit.
-        """
-        for required_key in self.required_keys:
-            if required_key not in kwargs:
-                raise RuntimeError("Key %s is required" % required_key)
-
-        self.unrecognized_keys = {}  # Keys we were given not listed as valid
-        for k, v in kwargs.items():
-            if k in self.valid_keys or k in self.internal_keys:
-                self.__dict__[k] = v
-            else:
-                self.unrecognized_keys[k] = v
-
-    def get_dict(self):
-        """Converts this protocol unit into a :py:class:`dict`, ready to be
-        encoded as JSON.
-
-        The keys it encodes are: `valid_keys` - `internal_keys`
-
-        Returns
-            dict
-        """
-        d = {
-            k: _encode(v)
-            for (k, v) in self.__dict__.items()
-            if k in self.valid_keys and k not in self.internal_keys
-        }
-        d.update(self.unrecognized_keys)
-        return d
-
-    def get_internal_dict(self):
-        d = {
-            k: _encode(v, internal=True)
-            for (k, v) in self.__dict__.items()
-            if k in self.valid_keys
-        }
-        d.update(self.unrecognized_keys)
-        return d
-
-    def __str__(self):
-        return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__))
-
-
-def _encode(obj, internal=False):
-    if type(obj) is list:
-        return [_encode(o, internal=internal) for o in obj]
-
-    if isinstance(obj, JsonEncodedObject):
-        if internal:
-            return obj.get_internal_dict()
-        else:
-            return obj.get_dict()
-
-    return obj