summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/api/filtering.py2
-rw-r--r--synapse/api/urls.py2
-rw-r--r--synapse/app/__init__.py2
-rw-r--r--synapse/app/appservice.py2
-rw-r--r--synapse/app/client_reader.py2
-rw-r--r--synapse/app/event_creator.py2
-rw-r--r--synapse/app/federation_reader.py2
-rw-r--r--synapse/app/federation_sender.py2
-rw-r--r--synapse/app/frontend_proxy.py2
-rwxr-xr-xsynapse/app/homeserver.py5
-rw-r--r--synapse/app/media_repository.py2
-rw-r--r--synapse/app/pusher.py2
-rw-r--r--synapse/app/synchrotron.py2
-rw-r--r--synapse/app/user_dir.py2
-rw-r--r--synapse/config/__main__.py2
-rw-r--r--synapse/federation/federation_client.py10
-rw-r--r--synapse/federation/federation_server.py32
-rw-r--r--synapse/federation/transaction_queue.py36
-rw-r--r--synapse/handlers/e2e_keys.py2
-rw-r--r--synapse/handlers/federation.py131
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/typing.py23
-rw-r--r--synapse/notifier.py8
-rw-r--r--synapse/python_dependencies.py27
-rw-r--r--synapse/state/v1.py14
-rw-r--r--synapse/storage/monthly_active_users.py5
-rw-r--r--synapse/storage/state.py30
-rw-r--r--synapse/storage/transactions.py23
-rw-r--r--synapse/util/caches/expiringcache.py24
-rw-r--r--synapse/util/logcontext.py41
30 files changed, 299 insertions, 142 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index a31a9a17e0..eed8c67e6a 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -226,7 +226,7 @@ class Filtering(object):
             jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
                                 format_checker=FormatChecker())
         except jsonschema.ValidationError as e:
-            raise SynapseError(400, e.message)
+            raise SynapseError(400, str(e))
 
 
 class FilterCollection(object):
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 71347912f1..6d9f1ca0ef 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -64,7 +64,7 @@ class ConsentURIBuilder(object):
         """
         mac = hmac.new(
             key=self._hmac_secret,
-            msg=user_id,
+            msg=user_id.encode('ascii'),
             digestmod=sha256,
         ).hexdigest()
         consent_uri = "%s_matrix/consent?%s" % (
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 3b6b9368b8..c3afcc573b 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -24,7 +24,7 @@ try:
     python_dependencies.check_requirements()
 except python_dependencies.MissingRequirementError as e:
     message = "\n".join([
-        "Missing Requirement: %s" % (e.message,),
+        "Missing Requirement: %s" % (str(e),),
         "To install run:",
         "    pip install --upgrade --force \"%s\"" % (e.dependency,),
         "",
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 02039f7e79..8559e141af 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -136,7 +136,7 @@ def start(config_options):
             "Synapse appservice", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.appservice"
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 4c73c637bb..76aed8c60a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -153,7 +153,7 @@ def start(config_options):
             "Synapse client reader", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.client_reader"
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index bc82197b2a..9060ab14f6 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -169,7 +169,7 @@ def start(config_options):
             "Synapse event creator", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.event_creator"
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 18ca71ef99..228a297fb8 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -140,7 +140,7 @@ def start(config_options):
             "Synapse federation reader", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.federation_reader"
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 6501c57792..e9a99d76e1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -160,7 +160,7 @@ def start(config_options):
             "Synapse federation sender", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.federation_sender"
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index b076fbe522..fc4b25de1c 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -228,7 +228,7 @@ def start(config_options):
             "Synapse frontend proxy", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.frontend_proxy"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 8c5d858b0b..e3f0d99a3f 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
         try:
             database_engine.check_database(db_conn.cursor())
         except IncorrectDatabaseSetup as e:
-            quit_with_error(e.message)
+            quit_with_error(str(e))
 
 
 # Gauges to expose monthly active user control metrics
@@ -328,7 +328,7 @@ def setup(config_options):
             config_options,
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     if not config:
@@ -386,7 +386,6 @@ def setup(config_options):
         hs.get_pusherpool().start()
         hs.get_datastore().start_profiling()
         hs.get_datastore().start_doing_background_updates()
-        hs.get_federation_client().start_get_pdu_cache()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 992d182dba..acc0487adc 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -133,7 +133,7 @@ def start(config_options):
             "Synapse media repository", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.media_repository"
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 2ec4c7defb..630dcda478 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -191,7 +191,7 @@ def start(config_options):
             "Synapse pusher", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.pusher"
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index df81b7bcbe..9a7fc6ee9d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -410,7 +410,7 @@ def start(config_options):
             "Synapse synchrotron", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.synchrotron"
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index b383e79c1c..0a5f62b509 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -188,7 +188,7 @@ def start(config_options):
             "Synapse user directory", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.user_dir"
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 58c97a70af..8fccf573ee 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -25,7 +25,7 @@ if __name__ == "__main__":
         try:
             config = HomeServerConfig.load_config("", sys.argv[3:])
         except ConfigError as e:
-            sys.stderr.write("\n" + e.message + "\n")
+            sys.stderr.write("\n" + str(e) + "\n")
             sys.exit(1)
 
         print (getattr(config, key))
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5a92428f56..d05ed91d64 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -209,8 +209,6 @@ class FederationClient(FederationBase):
         Will attempt to get the PDU from each destination in the list until
         one succeeds.
 
-        This will persist the PDU locally upon receipt.
-
         Args:
             destinations (list): Which home servers to query
             event_id (str): event to fetch
@@ -289,8 +287,7 @@ class FederationClient(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def get_state_for_room(self, destination, room_id, event_id):
-        """Requests all of the `current` state PDUs for a given room from
-        a remote home server.
+        """Requests all of the room state at a given event from a remote home server.
 
         Args:
             destination (str): The remote homeserver to query for the state.
@@ -298,9 +295,10 @@ class FederationClient(FederationBase):
             event_id (str): The id of the event we want the state at.
 
         Returns:
-            Deferred: Results in a list of PDUs.
+            Deferred[Tuple[List[EventBase], List[EventBase]]]:
+                A list of events in the state, and a list of events in the auth chain
+                for the given event.
         """
-
         try:
             # First we try and ask for just the IDs, as thats far quicker if
             # we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9a571e4fc7..819e8f7331 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
 from synapse.util.logutils import log_function
 
 # when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
 
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
-                try:
-                    yield self._handle_received_pdu(
-                        origin, pdu
-                    )
-                    pdu_results[event_id] = {}
-                except FederationError as e:
-                    logger.warn("Error handling PDU %s: %s", event_id, e)
-                    pdu_results[event_id] = {"error": str(e)}
-                except Exception as e:
-                    f = failure.Failure()
-                    pdu_results[event_id] = {"error": str(e)}
-                    logger.error(
-                        "Failed to handle PDU %s: %s",
-                        event_id, f.getTraceback().rstrip(),
-                    )
+                with nested_logging_context(event_id):
+                    try:
+                        yield self._handle_received_pdu(
+                            origin, pdu
+                        )
+                        pdu_results[event_id] = {}
+                    except FederationError as e:
+                        logger.warn("Error handling PDU %s: %s", event_id, e)
+                        pdu_results[event_id] = {"error": str(e)}
+                    except Exception as e:
+                        f = failure.Failure()
+                        pdu_results[event_id] = {"error": str(e)}
+                        logger.error(
+                            "Failed to handle PDU %s: %s",
+                            event_id, f.getTraceback().rstrip(),
+                        )
 
         yield concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(),
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 8cbf8c4f7f..98b5950800 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -137,26 +137,6 @@ class TransactionQueue(object):
 
         self._processing_pending_presence = False
 
-    def can_send_to(self, destination):
-        """Can we send messages to the given server?
-
-        We can't send messages to ourselves. If we are running on localhost
-        then we can only federation with other servers running on localhost.
-        Otherwise we only federate with servers on a public domain.
-
-        Args:
-            destination(str): The server we are possibly trying to send to.
-        Returns:
-            bool: True if we can send to the server.
-        """
-
-        if destination == self.server_name:
-            return False
-        if self.server_name.startswith("localhost"):
-            return destination.startswith("localhost")
-        else:
-            return not destination.startswith("localhost")
-
     def notify_new_events(self, current_id):
         """This gets called when we have some new events we might want to
         send out to other servers.
@@ -279,10 +259,7 @@ class TransactionQueue(object):
         self._order += 1
 
         destinations = set(destinations)
-        destinations = set(
-            dest for dest in destinations if self.can_send_to(dest)
-        )
-
+        destinations.discard(self.server_name)
         logger.debug("Sending to: %s", str(destinations))
 
         if not destinations:
@@ -358,7 +335,7 @@ class TransactionQueue(object):
 
         for destinations, states in hosts_and_states:
             for destination in destinations:
-                if not self.can_send_to(destination):
+                if destination == self.server_name:
                     continue
 
                 self.pending_presence_by_dest.setdefault(
@@ -377,7 +354,8 @@ class TransactionQueue(object):
             content=content,
         )
 
-        if not self.can_send_to(destination):
+        if destination == self.server_name:
+            logger.info("Not sending EDU to ourselves")
             return
 
         sent_edus_counter.inc()
@@ -392,10 +370,8 @@ class TransactionQueue(object):
         self._attempt_new_transaction(destination)
 
     def send_device_messages(self, destination):
-        if destination == self.server_name or destination == "localhost":
-            return
-
-        if not self.can_send_to(destination):
+        if destination == self.server_name:
+            logger.info("Not sending device update to ourselves")
             return
 
         self._attempt_new_transaction(destination)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 578e9250fb..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -341,7 +341,7 @@ class E2eKeysHandler(object):
 def _exception_to_failure(e):
     if isinstance(e, CodeMessageException):
         return {
-            "status": e.code, "message": e.message,
+            "status": e.code, "message": str(e),
         }
 
     if isinstance(e, NotRetryingDestination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2ccdc3bfa7..d05b63673f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -106,7 +106,7 @@ class FederationHandler(BaseHandler):
 
         self.hs = hs
 
-        self.store = hs.get_datastore()
+        self.store = hs.get_datastore()  # type: synapse.storage.DataStore
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -323,14 +323,22 @@ class FederationHandler(BaseHandler):
                         affected=pdu.event_id,
                     )
 
-                # Calculate the state of the previous events, and
-                # de-conflict them to find the current state.
-                state_groups = []
+                # Calculate the state after each of the previous events, and
+                # resolve them to find the correct state at the current event.
                 auth_chains = set()
+                event_map = {
+                    event_id: pdu,
+                }
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(room_id, list(seen))
-                    state_groups.append(ours)
+                    ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+                    # state_maps is a list of mappings from (type, state_key) to event_id
+                    # type: list[dict[tuple[str, str], str]]
+                    state_maps = list(ours.values())
+
+                    # we don't need this any more, let's delete it.
+                    del ours
 
                     # Ask the remote server for the states we don't
                     # know about
@@ -339,27 +347,65 @@ class FederationHandler(BaseHandler):
                             "[%s %s] Requesting state at missing prev_event %s",
                             room_id, event_id, p,
                         )
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, room_id, p,
+
+                        with logcontext.nested_logging_context(p):
+                            # note that if any of the missing prevs share missing state or
+                            # auth events, the requests to fetch those events are deduped
+                            # by the get_pdu_cache in federation_client.
+                            remote_state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
                             )
-                        )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
+
+                            # we want the state *after* p; get_state_for_room returns the
+                            # state *before* p.
+                            remote_event = yield self.federation_client.get_pdu(
+                                [origin], p, outlier=True,
+                            )
+
+                            if remote_event is None:
+                                raise Exception(
+                                    "Unable to get missing prev_event %s" % (p, )
+                                )
+
+                            if remote_event.is_state():
+                                remote_state.append(remote_event)
+
+                            # XXX hrm I'm not convinced that duplicate events will compare
+                            # for equality, so I'm not sure this does what the author
+                            # hoped.
+                            auth_chains.update(got_auth_chain)
+
+                            remote_state_map = {
+                                (x.type, x.state_key): x.event_id for x in remote_state
+                            }
+                            state_maps.append(remote_state_map)
+
+                            for x in remote_state:
+                                event_map[x.event_id] = x
 
                     # Resolve any conflicting state
+                    @defer.inlineCallbacks
                     def fetch(ev_ids):
-                        return self.store.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False
+                        fetched = yield self.store.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False,
                         )
+                        # add any events we fetch here to the `event_map` so that we
+                        # can use them to build the state event list below.
+                        event_map.update(fetched)
+                        defer.returnValue(fetched)
 
                     room_version = yield self.store.get_room_version(room_id)
                     state_map = yield resolve_events_with_factory(
-                        room_version, state_groups, {event_id: pdu}, fetch
+                        room_version, state_maps, event_map, fetch,
                     )
 
-                    state = (yield self.store.get_events(state_map.values())).values()
+                    # we need to give _process_received_pdu the actual state events
+                    # rather than event ids, so generate that now.
+                    state = [
+                        event_map[e] for e in six.itervalues(state_map)
+                    ]
                     auth_chain = list(auth_chains)
                 except Exception:
                     logger.warn(
@@ -483,20 +529,21 @@ class FederationHandler(BaseHandler):
                 "[%s %s] Handling received prev_event %s",
                 room_id, event_id, ev.event_id,
             )
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    ev,
-                    sent_to_us_directly=False,
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn(
-                        "[%s %s] Received prev_event %s failed history check.",
-                        room_id, event_id, ev.event_id,
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
                     )
-                else:
-                    raise
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
     @defer.inlineCallbacks
     def _process_received_pdu(self, origin, event, state, auth_chain):
@@ -572,6 +619,10 @@ class FederationHandler(BaseHandler):
                     })
                     seen_ids.add(e.event_id)
 
+                logger.info(
+                    "[%s %s] persisting newly-received auth/state events %s",
+                    room_id, event_id, [e["event"].event_id for e in event_infos]
+                )
                 yield self._handle_new_events(origin, event_infos)
 
             try:
@@ -1135,7 +1186,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1581,15 +1633,22 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..f284d5a385 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    room_id, str(e.message)
+                    room_id, str(e)
                 )
 
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a0d..c610933dd4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -68,6 +69,11 @@ class TypingHandler(object):
         # map room IDs to sets of users currently typing
         self._room_typing = {}
 
+        # caches which room_ids changed at which serials
+        self._typing_stream_change_cache = StreamChangeCache(
+            "TypingStreamChangeCache", self._latest_room_serial,
+        )
+
         self.clock.looping_call(
             self._handle_timeouts,
             5000,
@@ -218,6 +224,7 @@ class TypingHandler(object):
 
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
+                    logger.debug("sending typing update to %s", domain)
                     self.federation.send_edu(
                         destination=domain,
                         edu_type="m.typing",
@@ -274,19 +281,29 @@ class TypingHandler(object):
 
         self._latest_room_serial += 1
         self._room_serials[member.room_id] = self._latest_room_serial
+        self._typing_stream_change_cache.entity_has_changed(
+            member.room_id, self._latest_room_serial,
+        )
 
         self.notifier.on_new_event(
             "typing_key", self._latest_room_serial, rooms=[member.room_id]
         )
 
     def get_all_typing_updates(self, last_id, current_id):
-        # TODO: Work out a way to do this without scanning the entire state.
         if last_id == current_id:
             return []
 
+        changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+            last_id,
+        )
+
+        if changed_rooms is None:
+            changed_rooms = self._room_serials
+
         rows = []
-        for room_id, serial in self._room_serials.items():
-            if last_id < serial and serial <= current_id:
+        for room_id in changed_rooms:
+            serial = self._room_serials[room_id]
+            if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
                 rows.append((serial, room_id, list(typing)))
         rows.sort()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f1d92c1395..340b16ce25 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import StreamToken
 from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
-from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
@@ -248,7 +249,10 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        run_in_background(self._notify_app_services, room_stream_id)
+        run_as_background_process(
+            "notify_app_services",
+            self._notify_app_services, room_stream_id,
+        )
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(room_stream_id)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index c779f69fa0..0f339a0320 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -33,31 +33,32 @@ logger = logging.getLogger(__name__)
 # [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
 REQUIREMENTS = {
     "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
-    "frozendict>=0.4": ["frozendict"],
+    "frozendict>=1": ["frozendict"],
     "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
     "canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
     "signedjson>=1.0.0": ["signedjson>=1.0.0"],
     "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
-    "service_identity>=1.0.0": ["service_identity>=1.0.0"],
+    "service_identity>=16.0.0": ["service_identity>=16.0.0"],
     "Twisted>=17.1.0": ["twisted>=17.1.0"],
     "treq>=15.1": ["treq>=15.1"],
 
     # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
     "pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
 
-    "pyyaml": ["yaml"],
-    "pyasn1": ["pyasn1"],
-    "daemonize": ["daemonize"],
-    "bcrypt": ["bcrypt>=3.1.0"],
-    "pillow": ["PIL"],
-    "pydenticon": ["pydenticon"],
-    "sortedcontainers": ["sortedcontainers"],
-    "pysaml2>=3.0.0": ["saml2>=3.0.0"],
-    "pymacaroons-pynacl": ["pymacaroons"],
+    "pyyaml>=3.11": ["yaml"],
+    "pyasn1>=0.1.9": ["pyasn1"],
+    "pyasn1-modules>=0.0.7": ["pyasn1_modules"],
+    "daemonize>=2.3.1": ["daemonize"],
+    "bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
+    "pillow>=3.1.2": ["PIL"],
+    "pydenticon>=0.2": ["pydenticon"],
+    "sortedcontainers>=1.4.4": ["sortedcontainers"],
+    "pysaml2>=3.0.0": ["saml2"],
+    "pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
-    "six": ["six"],
-    "prometheus_client": ["prometheus_client"],
+    "six>=1.10": ["six"],
+    "prometheus_client>=0.0.18": ["prometheus_client"],
 
     # we use attr.s(slots), which arrived in 16.0.0
     "attrs>=16.0.0": ["attr>=16.0.0"],
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index c95477d318..7a7157b352 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
         for event_ids in itervalues(conflicted_state)
         for event_id in event_ids
     )
+    needed_event_count = len(needed_events)
     if event_map is not None:
         needed_events -= set(iterkeys(event_map))
 
-    logger.info("Asking for %d conflicted events", len(needed_events))
+    logger.info(
+        "Asking for %d/%d conflicted events",
+        len(needed_events),
+        needed_event_count,
+    )
 
     # dict[str, FrozenEvent]: a map from state event id to event. Only includes
     # the state events which are in conflict (and those in event_map)
@@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
     )
 
     new_needed_events = set(itervalues(auth_events))
+    new_needed_event_count = len(new_needed_events)
     new_needed_events -= needed_events
     if event_map is not None:
         new_needed_events -= set(iterkeys(event_map))
 
-    logger.info("Asking for %d auth events", len(new_needed_events))
+    logger.info(
+        "Asking for %d/%d auth events",
+        len(new_needed_events),
+        new_needed_event_count,
+    )
 
     state_map_new = yield state_map_factory(new_needed_events)
     state_map.update(state_map_new)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 59580949f1..0fe8c8e24c 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             Deferred[bool]: True if a new entry was created, False if an
                 existing one was updated.
         """
+        # Am consciously deciding to lock the table on the basis that is ought
+        # never be a big table and alternative approaches (batching multiple
+        # upserts into a single txn) introduced a lot of extra complexity.
+        # See https://github.com/matrix-org/synapse/issues/3854 for more
         is_insert = yield self._simple_upsert(
             desc="upsert_monthly_active_user",
             table="monthly_active_users",
@@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             values={
                 "timestamp": int(self._clock.time_msec()),
             },
-            lock=False,
         )
         if is_insert:
             self.user_last_seen_monthly_active.invalidate((user_id,))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 4b971efdba..3f4cbd61c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_state_groups_ids(self, room_id, event_ids):
+    def get_state_groups_ids(self, _room_id, event_ids):
+        """Get the event IDs of all the state for the state groups for the given events
+
+        Args:
+            _room_id (str): id of the room for these events
+            event_ids (iterable[str]): ids of the events
+
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
+        """
         if not event_ids:
             defer.returnValue({})
 
@@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_state_ids_for_group(self, state_group):
-        """Get the state IDs for the given state group
+        """Get the event IDs of all the state in the given state group
 
         Args:
             state_group (int)
@@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     def get_state_groups(self, room_id, event_ids):
         """ Get the state groups for the given list of event_ids
 
-        The return value is a dict mapping group names to lists of events.
+        Returns:
+            Deferred[dict[int, list[EventBase]]]:
+                dict of state_group_id -> list of state events.
         """
         if not event_ids:
             defer.returnValue({})
@@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 member events (if True), or to exclude member events (if False)
 
         Returns:
-            dictionary state_group -> (dict of (type, state_key) -> event id)
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         results = {}
 
@@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            Deferred[dict[int, dict[(type, state_key), EventBase]]]
-                a dictionary mapping from state group to state dictionary.
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         if types is not None:
             non_member_types = [t for t in types if t[0] != EventTypes.Member]
@@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            Deferred[dict[int, dict[(type, state_key), EventBase]]]
-                a dictionary mapping from state group to state dictionary.
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         if types:
             types = frozenset(types)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index baf0379a68..ab54977a75 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.caches.expiringcache import ExpiringCache
 
 from ._base import SQLBaseStore, db_to_json
 
@@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
     )
 )
 
+SENTINEL = object()
+
 
 class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
@@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore):
 
         self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
 
+        self._destination_retry_cache = ExpiringCache(
+            cache_name="get_destination_retry_timings",
+            clock=self._clock,
+            expiry_ms=5 * 60 * 1000,
+        )
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore):
         """
         pass
 
+    @defer.inlineCallbacks
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.
 
@@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore):
             None if not retrying
             Otherwise a dict for the retry scheme
         """
-        return self.runInteraction(
+
+        result = self._destination_retry_cache.get(destination, SENTINEL)
+        if result is not SENTINEL:
+            defer.returnValue(result)
+
+        result = yield self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
 
+        # We don't hugely care about race conditions between getting and
+        # invalidating the cache, since we time out fairly quickly anyway.
+        self._destination_retry_cache[destination] = result
+        defer.returnValue(result)
+
     def _get_destination_retry_timings(self, txn, destination):
         result = self._simple_select_one_txn(
             txn,
@@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
+        self._destination_retry_cache.pop(destination)
         return self.runInteraction(
             "set_destination_retry_timings",
             self._set_destination_retry_timings,
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 9af4ec4aa8..f369780277 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,7 +16,7 @@
 import logging
 from collections import OrderedDict
 
-from six import itervalues
+from six import iteritems, itervalues
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import register_cache
@@ -24,6 +24,9 @@ from synapse.util.caches import register_cache
 logger = logging.getLogger(__name__)
 
 
+SENTINEL = object()
+
+
 class ExpiringCache(object):
     def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
                  reset_expiry_on_get=False, iterable=False):
@@ -95,6 +98,21 @@ class ExpiringCache(object):
 
         return entry.value
 
+    def pop(self, key, default=SENTINEL):
+        """Removes and returns the value with the given key from the cache.
+
+        If the key isn't in the cache then `default` will be returned if
+        specified, otherwise `KeyError` will get raised.
+
+        Identical functionality to `dict.pop(..)`.
+        """
+
+        value = self._cache.pop(key, default)
+        if value is SENTINEL:
+            raise KeyError(key)
+
+        return value
+
     def __contains__(self, key):
         return key in self._cache
 
@@ -122,7 +140,7 @@ class ExpiringCache(object):
 
         keys_to_delete = set()
 
-        for key, cache_entry in self._cache.items():
+        for key, cache_entry in iteritems(self._cache):
             if now - cache_entry.time > self._expiry_ms:
                 keys_to_delete.add(key)
 
@@ -146,6 +164,8 @@ class ExpiringCache(object):
 
 
 class _CacheEntry(object):
+    __slots__ = ["time", "value"]
+
     def __init__(self, time, value):
         self.time = time
         self.value = value
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d37610..89224b26cc 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
 
     sentinel = Sentinel()
 
-    def __init__(self, name=None, parent_context=None):
+    def __init__(self, name=None, parent_context=None, request=None):
         self.previous_context = LoggingContext.current_context()
         self.name = name
 
@@ -218,6 +218,13 @@ class LoggingContext(object):
 
         self.parent_context = parent_context
 
+        if self.parent_context is not None:
+            self.parent_context.copy_to(self)
+
+        if request is not None:
+            # the request param overrides the request from the parent context
+            self.request = request
+
     def __str__(self):
         return "%s@%x" % (self.name, id(self))
 
@@ -256,9 +263,6 @@ class LoggingContext(object):
             )
         self.alive = True
 
-        if self.parent_context is not None:
-            self.parent_context.copy_to(self)
-
         return self
 
     def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
                 )
 
 
+def nested_logging_context(suffix, parent_context=None):
+    """Creates a new logging context as a child of another.
+
+    The nested logging context will have a 'request' made up of the parent context's
+    request, plus the given suffix.
+
+    CPU/db usage stats will be added to the parent context's on exit.
+
+    Normal usage looks like:
+
+        with nested_logging_context(suffix):
+            # ... do stuff
+
+    Args:
+        suffix (str): suffix to add to the parent context's 'request'.
+        parent_context (LoggingContext|None): parent context. Will use the current context
+            if None.
+
+    Returns:
+        LoggingContext: new logging context.
+    """
+    if parent_context is None:
+        parent_context = LoggingContext.current_context()
+    return LoggingContext(
+        parent_context=parent_context,
+        request=parent_context.request + "-" + suffix,
+    )
+
+
 def preserve_fn(f):
     """Function decorator which wraps the function with run_in_background"""
     def g(*args, **kwargs):