summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/appservice/api.py3
-rw-r--r--synapse/federation/federation_server.py2
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/federation/transaction_queue.py48
-rw-r--r--synapse/handlers/appservice.py37
-rw-r--r--synapse/handlers/message.py104
-rw-r--r--synapse/handlers/room_list.py5
-rw-r--r--synapse/handlers/sync.py3
-rw-r--r--synapse/metrics/__init__.py43
-rw-r--r--synapse/metrics/metric.py32
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/replication/http/send_event.py2
-rw-r--r--synapse/rest/client/v1/room.py7
-rw-r--r--synapse/storage/__init__.py4
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/engines/__init__.py5
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/storage/events_worker.py20
-rw-r--r--synapse/types.py2
-rw-r--r--synapse/util/caches/response_cache.py46
-rw-r--r--synapse/util/caches/stream_change_cache.py4
21 files changed, 300 insertions, 88 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 40c433d7ae..11e9c37c63 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -73,7 +73,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         super(ApplicationServiceApi, self).__init__(hs)
         self.clock = hs.get_clock()
 
-        self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
+        self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
+                                                 timeout_ms=HOUR_IN_MS)
 
     @defer.inlineCallbacks
     def query_user(self, service, user_id):
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bea7fd0b71..e4ce037acf 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -65,7 +65,7 @@ class FederationServer(FederationBase):
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
-        self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
+        self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
 
     @defer.inlineCallbacks
     @log_function
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5acebc1..945832283f 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
 import synapse.metrics
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 from collections import namedtuple
 
 import logging
@@ -56,19 +56,19 @@ class FederationRemoteSendQueue(object):
         self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
-        self.presence_changed = sorteddict()  # Stream position -> user_id
+        self.presence_changed = SortedDict()  # Stream position -> user_id
 
         self.keyed_edu = {}  # (destination, key) -> EDU
-        self.keyed_edu_changed = sorteddict()  # stream position -> (destination, key)
+        self.keyed_edu_changed = SortedDict()  # stream position -> (destination, key)
 
-        self.edus = sorteddict()  # stream position -> Edu
+        self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = sorteddict()  # stream position -> (destination, Failure)
+        self.failures = SortedDict()  # stream position -> (destination, Failure)
 
-        self.device_messages = sorteddict()  # stream position -> destination
+        self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
-        self.pos_time = sorteddict()
+        self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 6da60718a9..965c1d2fda 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -169,7 +169,7 @@ class TransactionQueue(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=20,
+                    last_token, self._last_poked_id, limit=100,
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,12 +177,13 @@ class TransactionQueue(object):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                for event in events:
+                @defer.inlineCallbacks
+                def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.event_id)
                     if not is_mine and send_on_behalf_of is None:
-                        continue
+                        return
 
                     try:
                         # Get the state from before the event.
@@ -197,8 +198,11 @@ class TransactionQueue(object):
                             ],
                         )
                     except Exception:
-                        logger.exception("Failed to calculate hosts in room")
-                        continue
+                        logger.exception(
+                            "Failed to calculate hosts in room for event: %s",
+                            event.event_id,
+                        )
+                        return
 
                     destinations = set(destinations)
 
@@ -212,12 +216,44 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
-                events_processed_counter.inc_by(len(events))
+                @defer.inlineCallbacks
+                def handle_room_events(events):
+                    for event in events:
+                        yield handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [
+                        logcontext.run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ],
+                    consumeErrors=True
+                ))
 
                 yield self.store.update_federation_out_pos(
                     "events", next_token
                 )
 
+                if events:
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_lag.set(
+                        now - ts, "federation_sender",
+                    )
+                    synapse.metrics.event_processing_last_ts.set(
+                        ts, "federation_sender",
+                    )
+
+                events_processed_counter.inc_by(len(events))
+
+                synapse.metrics.event_processing_positions.set(
+                    next_token, "federation_sender",
+                )
+
         finally:
             self._is_processing = False
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3dd3fa2a27..0245197c02 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
 import synapse
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+    make_deferred_yieldable, preserve_fn, run_in_background,
+)
 
 import logging
 
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
                     if not events:
                         break
 
+                    events_by_room = {}
                     for event in events:
+                        events_by_room.setdefault(event.room_id, []).append(event)
+
+                    @defer.inlineCallbacks
+                    def handle_event(event):
                         # Gather interested services
                         services = yield self._get_services_for_event(event)
                         if len(services) == 0:
-                            continue  # no services need notifying
+                            return  # no services need notifying
 
                         # Do we know this user exists? If not, poke the user
                         # query API for all services which match that user regex.
@@ -108,9 +115,33 @@ class ApplicationServicesHandler(object):
                                 service, event
                             )
 
-                    events_processed_counter.inc_by(len(events))
+                    @defer.inlineCallbacks
+                    def handle_room_events(events):
+                        for event in events:
+                            yield handle_event(event)
+
+                    yield make_deferred_yieldable(defer.gatherResults([
+                        run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ], consumeErrors=True))
 
                     yield self.store.set_appservice_last_pos(upper_bound)
+
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_positions.set(
+                        upper_bound, "appservice_sender",
+                    )
+
+                    events_processed_counter.inc_by(len(events))
+
+                    synapse.metrics.event_processing_lag.set(
+                        now - ts, "appservice_sender",
+                    )
+                    synapse.metrics.event_processing_last_ts.set(
+                        ts, "appservice_sender",
+                    )
             finally:
                 self.is_processing = False
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2134741719..726a3932da 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -454,40 +454,39 @@ class EventCreationHandler(object):
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        with (yield self.limiter.queue(builder.room_id)):
-            self.validator.validate_new(builder)
-
-            if builder.type == EventTypes.Member:
-                membership = builder.content.get("membership", None)
-                target = UserID.from_string(builder.state_key)
-
-                if membership in {Membership.JOIN, Membership.INVITE}:
-                    # If event doesn't include a display name, add one.
-                    profile = self.profile_handler
-                    content = builder.content
-
-                    try:
-                        if "displayname" not in content:
-                            content["displayname"] = yield profile.get_displayname(target)
-                        if "avatar_url" not in content:
-                            content["avatar_url"] = yield profile.get_avatar_url(target)
-                    except Exception as e:
-                        logger.info(
-                            "Failed to get profile information for %r: %s",
-                            target, e
-                        )
+        self.validator.validate_new(builder)
+
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            target = UserID.from_string(builder.state_key)
+
+            if membership in {Membership.JOIN, Membership.INVITE}:
+                # If event doesn't include a display name, add one.
+                profile = self.profile_handler
+                content = builder.content
+
+                try:
+                    if "displayname" not in content:
+                        content["displayname"] = yield profile.get_displayname(target)
+                    if "avatar_url" not in content:
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                except Exception as e:
+                    logger.info(
+                        "Failed to get profile information for %r: %s",
+                        target, e
+                    )
 
-            if token_id is not None:
-                builder.internal_metadata.token_id = token_id
+        if token_id is not None:
+            builder.internal_metadata.token_id = token_id
 
-            if txn_id is not None:
-                builder.internal_metadata.txn_id = txn_id
+        if txn_id is not None:
+            builder.internal_metadata.txn_id = txn_id
 
-            event, context = yield self.create_new_client_event(
-                builder=builder,
-                requester=requester,
-                prev_event_ids=prev_event_ids,
-            )
+        event, context = yield self.create_new_client_event(
+            builder=builder,
+            requester=requester,
+            prev_event_ids=prev_event_ids,
+        )
 
         defer.returnValue((event, context))
 
@@ -557,27 +556,34 @@ class EventCreationHandler(object):
 
         See self.create_event and self.send_nonmember_event.
         """
-        event, context = yield self.create_event(
-            requester,
-            event_dict,
-            token_id=requester.access_token_id,
-            txn_id=txn_id
-        )
 
-        spam_error = self.spam_checker.check_event_for_spam(event)
-        if spam_error:
-            if not isinstance(spam_error, basestring):
-                spam_error = "Spam is not permitted here"
-            raise SynapseError(
-                403, spam_error, Codes.FORBIDDEN
+        # We limit the number of concurrent event sends in a room so that we
+        # don't fork the DAG too much. If we don't limit then we can end up in
+        # a situation where event persistence can't keep up, causing
+        # extremities to pile up, which in turn leads to state resolution
+        # taking longer.
+        with (yield self.limiter.queue(event_dict["room_id"])):
+            event, context = yield self.create_event(
+                requester,
+                event_dict,
+                token_id=requester.access_token_id,
+                txn_id=txn_id
             )
 
-        yield self.send_nonmember_event(
-            requester,
-            event,
-            context,
-            ratelimit=ratelimit,
-        )
+            spam_error = self.spam_checker.check_event_for_spam(event)
+            if spam_error:
+                if not isinstance(spam_error, basestring):
+                    spam_error = "Spam is not permitted here"
+                raise SynapseError(
+                    403, spam_error, Codes.FORBIDDEN
+                )
+
+            yield self.send_nonmember_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+            )
         defer.returnValue(event)
 
     @measure_func("create_new_client_event")
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index f5657a3521..ef3a3b352a 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,8 +44,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
-        self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000)
-        self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+        self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
+        self.remote_response_cache = ResponseCache(hs, "remote_room_list",
+                                                   timeout_ms=30 * 1000)
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 37fae43f71..da234c741f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -171,7 +171,8 @@ class SyncHandler(object):
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache(
-            hs, timeout_ms=SYNC_RESPONSE_CACHE_MS,
+            hs, "sync",
+            timeout_ms=SYNC_RESPONSE_CACHE_MS,
         )
         self.state = hs.get_state_handler()
 
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 50d99d7a5c..e3b831db67 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -17,12 +17,13 @@ import logging
 import functools
 import time
 import gc
+import platform
 
 from twisted.internet import reactor
 
 from .metric import (
     CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
-    MemoryUsageMetric,
+    MemoryUsageMetric, GaugeMetric,
 )
 from .process_collector import register_process_collector
 
@@ -30,6 +31,7 @@ from .process_collector import register_process_collector
 logger = logging.getLogger(__name__)
 
 
+running_on_pypy = platform.python_implementation() == 'PyPy'
 all_metrics = []
 all_collectors = []
 
@@ -63,6 +65,13 @@ class Metrics(object):
         """
         return self._register(CounterMetric, *args, **kwargs)
 
+    def register_gauge(self, *args, **kwargs):
+        """
+        Returns:
+            GaugeMetric
+        """
+        return self._register(GaugeMetric, *args, **kwargs)
+
     def register_callback(self, *args, **kwargs):
         """
         Returns:
@@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
 tick_time = reactor_metrics.register_distribution("tick_time")
 pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
 
+synapse_metrics = get_metrics_for("synapse")
+
+# Used to track where various components have processed in the event stream,
+# e.g. federation sending, appservice sending, etc.
+event_processing_positions = synapse_metrics.register_gauge(
+    "event_processing_positions", labels=["name"],
+)
+
+# Used to track the current max events stream position
+event_persisted_position = synapse_metrics.register_gauge(
+    "event_persisted_position",
+)
+
+# Used to track the received_ts of the last event processed by various
+# components
+event_processing_last_ts = synapse_metrics.register_gauge(
+    "event_processing_last_ts", labels=["name"],
+)
+
+# Used to track the lag processing events. This is the time difference
+# between the last processed event's received_ts and the time it was
+# finished being processed.
+event_processing_lag = synapse_metrics.register_gauge(
+    "event_processing_lag", labels=["name"],
+)
+
 
 def runUntilCurrentTimer(func):
 
@@ -174,6 +209,9 @@ def runUntilCurrentTimer(func):
         tick_time.inc_by(end - start)
         pending_calls_metric.inc_by(num_pending)
 
+        if running_on_pypy:
+            return ret
+
         # Check if we need to do a manual GC (since its been disabled), and do
         # one if necessary.
         threshold = gc.get_threshold()
@@ -206,6 +244,7 @@ try:
 
     # We manually run the GC each reactor tick so that we can get some metrics
     # about time spent doing GC,
-    gc.disable()
+    if not running_on_pypy:
+        gc.disable()
 except AttributeError:
     pass
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index ff5aa8c0e1..89bd47c3f7 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
         # dict[list[str]]: value for each set of label values. the keys are the
         # label values, in the same order as the labels in self.labels.
         #
-        # (if the metric is a scalar, the (single) key is the empty list).
+        # (if the metric is a scalar, the (single) key is the empty tuple).
         self.counts = {}
 
         # Scalar metrics are never empty
@@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
         )
 
 
+class GaugeMetric(BaseMetric):
+    """A metric that can go up or down
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(GaugeMetric, self).__init__(*args, **kwargs)
+
+        # dict[list[str]]: value for each set of label values. the keys are the
+        # label values, in the same order as the labels in self.labels.
+        #
+        # (if the metric is a scalar, the (single) key is the empty tuple).
+        self.guages = {}
+
+    def set(self, v, *values):
+        if len(values) != self.dimension():
+            raise ValueError(
+                "Expected as many values to inc() as labels (%d)" % (self.dimension())
+            )
+
+        # TODO: should assert that the tag values are all strings
+
+        self.guages[values] = v
+
+    def render(self):
+        return flatten(
+            self._render_for_labels(k, self.guages[k])
+            for k in sorted(self.guages.keys())
+        )
+
+
 class CallbackMetric(BaseMetric):
     """A metric that returns the numeric value returned by a callback whenever
     it is rendered. Typically this is used to implement gauges that yield the
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 40eedb63cb..f9596bddaf 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -34,8 +34,8 @@ REQUIREMENTS = {
     "bcrypt": ["bcrypt>=3.1.0"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
-    "blist": ["blist"],
     "pysaml2>=3.0.0": ["saml2>=3.0.0"],
+    "sortedcontainers": ["sortedcontainers"],
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index bbe2f967b7..c6a6551d24 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -115,7 +115,7 @@ class ReplicationSendEventRestServlet(RestServlet):
         self.clock = hs.get_clock()
 
         # The responses are tiny, so we may as well cache them for a while
-        self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
+        self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
 
     def on_PUT(self, request, event_id):
         result = self.response_cache.get(event_id)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index d06cbdc35e..2ad0e5943b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
                 content=content,
             )
         else:
-            event, context = yield self.event_creation_hander.create_event(
+            event = yield self.event_creation_hander.create_and_send_nonmember_event(
                 requester,
                 event_dict,
-                token_id=requester.access_token_id,
                 txn_id=txn_id,
             )
 
-            yield self.event_creation_hander.send_nonmember_event(
-                requester, event, context,
-            )
-
         ret = {}
         if event:
             ret = {"event_id": event.event_id}
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index eacd49d6a5..8cdfd50f90 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore,
     def count_r30_users(self):
         """
         Counts the number of 30 day retained users, defined as:-
-         * Users who have created their accounts more than 30 days
+         * Users who have created their accounts more than 30 days ago
          * Where last seen at most 30 days ago
-         * Where account creation and last_seen are > 30 days
+         * Where account creation and last_seen are > 30 days apart
 
          Returns counts globaly for a given user as well as breaking
          by platform
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2fbebd4907..2262776ab2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -376,7 +376,7 @@ class SQLBaseStore(object):
         Returns:
             A list of dicts where the key is the column header.
         """
-        col_headers = list(intern(column[0]) for column in cursor.description)
+        col_headers = list(intern(str(column[0])) for column in cursor.description)
         results = list(
             dict(zip(col_headers, row)) for row in cursor
         )
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 338b495611..8c868ece75 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,6 +18,7 @@ from .postgres import PostgresEngine
 from .sqlite3 import Sqlite3Engine
 
 import importlib
+import platform
 
 
 SUPPORTED_MODULE = {
@@ -31,6 +32,10 @@ def create_engine(database_config):
     engine_class = SUPPORTED_MODULE.get(name, None)
 
     if engine_class:
+        # pypy requires psycopg2cffi rather than psycopg2
+        if (name == "psycopg2" and
+                platform.python_implementation() == "PyPy"):
+            name = "psycopg2cffi"
         module = importlib.import_module(name)
         return engine_class(module, database_config)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5d1ef3ea46..99557edc33 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore):
                     new_forward_extremeties=new_forward_extremeties,
                 )
                 persist_event_counter.inc_by(len(chunk))
+                synapse.metrics.event_persisted_position.set(
+                    chunk[-1][0].internal_metadata.stream_ordering,
+                )
                 for event, context in chunk:
                     if context.app_service:
                         origin_type = "local"
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 2e23dd78ba..a937b9bceb 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
 class EventsWorkerStore(SQLBaseStore):
+    def get_received_ts(self, event_id):
+        """Get received_ts (when it was persisted) for the event.
+
+        Raises an exception for unknown events.
+
+        Args:
+            event_id (str)
+
+        Returns:
+            Deferred[int|None]: Timestamp in milliseconds, or None for events
+            that were persisted before received_ts was implemented.
+        """
+        return self._simple_select_one_onecol(
+            table="events",
+            keyvalues={
+                "event_id": event_id,
+            },
+            retcol="received_ts",
+            desc="get_received_ts",
+        )
 
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
diff --git a/synapse/types.py b/synapse/types.py
index 7cb24cecb2..cc7c182a78 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -169,7 +169,7 @@ class DomainSpecificString(
         except Exception:
             return False
 
-    __str__ = to_string
+    __repr__ = to_string
 
 
 class UserID(DomainSpecificString):
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 00af539880..066fa423fd 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from synapse.util.async import ObservableDeferred
+from synapse.util.caches import metrics as cache_metrics
 
 
 class ResponseCache(object):
@@ -24,20 +25,63 @@ class ResponseCache(object):
     used rather than trying to compute a new response.
     """
 
-    def __init__(self, hs, timeout_ms=0):
+    def __init__(self, hs, name, timeout_ms=0):
         self.pending_result_cache = {}  # Requests that haven't finished yet.
 
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._metrics = cache_metrics.register_cache(
+            "response_cache",
+            size_callback=lambda: self.size(),
+            cache_name=name,
+        )
+
+    def size(self):
+        return len(self.pending_result_cache)
+
     def get(self, key):
+        """Look up the given key.
+
+        Returns a deferred which doesn't follow the synapse logcontext rules,
+        so you'll probably want to make_deferred_yieldable it.
+
+        Args:
+            key (str):
+
+        Returns:
+            twisted.internet.defer.Deferred|None: None if there is no entry
+            for this key; otherwise a deferred result.
+        """
         result = self.pending_result_cache.get(key)
         if result is not None:
+            self._metrics.inc_hits()
             return result.observe()
         else:
+            self._metrics.inc_misses()
             return None
 
     def set(self, key, deferred):
+        """Set the entry for the given key to the given deferred.
+
+        *deferred* should run its callbacks in the sentinel logcontext (ie,
+        you should wrap normal synapse deferreds with
+        logcontext.run_in_background).
+
+        Returns a new Deferred which also doesn't follow the synapse logcontext
+        rules, so you will want to make_deferred_yieldable it
+
+        (TODO: before using this more widely, it might make sense to refactor
+        it and get() so that they do the necessary wrapping rather than having
+        to do it everywhere ResponseCache is used.)
+
+        Args:
+            key (str):
+            deferred (twisted.internet.defer.Deferred):
+
+        Returns:
+            twisted.internet.defer.Deferred
+        """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 941d873ab8..2ff46090a6 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -16,7 +16,7 @@
 from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
 
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 import logging
 
 
@@ -35,7 +35,7 @@ class StreamChangeCache(object):
     def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
         self._max_size = int(max_size * CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
-        self._cache = sorteddict()
+        self._cache = SortedDict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
         self.metrics = register_cache(self.name, self._cache)