summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorNeil Johnson <neil@matrix.org>2018-08-01 13:42:50 +0100
committerNeil Johnson <neil@matrix.org>2018-08-01 13:42:50 +0100
commit303f1c851fc363d3fc4fc4c91328abd89863341d (patch)
treee8d8c1ba55141f3eae7aa8ea74a509e3dac42e28 /synapse
parentfix known broken test (diff)
parentMerge pull request #3543 from bebehei/docker (diff)
downloadsynapse-303f1c851fc363d3fc4fc4c91328abd89863341d.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into neilj/mau_sign_in_log_in_limits
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/send_queue.py63
-rw-r--r--synapse/federation/transaction_queue.py32
-rw-r--r--synapse/federation/transport/server.py3
-rw-r--r--synapse/federation/units.py1
-rw-r--r--synapse/handlers/federation.py28
-rw-r--r--synapse/secrets.py7
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/event_federation.py2
-rw-r--r--synapse/storage/events.py6
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/stream.py2
-rw-r--r--synapse/util/caches/descriptors.py131
14 files changed, 98 insertions, 187 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e501251b6e..657935d1ac 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -207,10 +207,6 @@ class FederationServer(FederationBase):
                     edu.content
                 )
 
-        pdu_failures = getattr(transaction, "pdu_failures", [])
-        for fail in pdu_failures:
-            logger.info("Got failure %r", fail)
-
         response = {
             "pdus": pdu_results,
         }
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 5157c3860d..0bb468385d 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = SortedDict()  # stream position -> (destination, Failure)
-
         self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
@@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object):
 
         for queue_name in [
             "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
-            "edus", "failures", "device_messages", "pos_time",
+            "edus", "device_messages", "pos_time",
         ]:
             register(queue_name, getattr(self, queue_name))
 
@@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of failure map
-            keys = self.failures.keys()
-            i = self.failures.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.failures[key]
-
             # Delete things out of device map
             keys = self.device_messages.keys()
             i = self.device_messages.bisect_left(position_to_delete)
@@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
-    def send_failure(self, failure, destination):
-        """As per TransactionQueue"""
-        pos = self._next_pos()
-
-        self.failures[pos] = (destination, str(failure))
-        self.notifier.on_new_replication_data()
-
     def send_device_messages(self, destination):
         """As per TransactionQueue"""
         pos = self._next_pos()
@@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed failures
-        i = self.failures.bisect_right(from_token)
-        j = self.failures.bisect_right(to_token) + 1
-        failures = self.failures.items()[i:j]
-
-        for (pos, (destination, failure)) in failures:
-            rows.append((pos, FailureRow(
-                destination=destination,
-                failure=failure,
-            )))
-
         # Fetch changed device messages
         i = self.device_messages.bisect_right(from_token)
         j = self.device_messages.bisect_right(to_token) + 1
@@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
-    "destination",  # str
-    "failure",
-))):
-    """Streams failures to a remote server. Failures are issued when there was
-    something wrong with a transaction the remote sent us, e.g. it included
-    an event that was invalid.
-    """
-
-    TypeId = "f"
-
-    @staticmethod
-    def from_data(data):
-        return FailureRow(
-            destination=data["destination"],
-            failure=data["failure"],
-        )
-
-    def to_data(self):
-        return {
-            "destination": self.destination,
-            "failure": self.failure,
-        }
-
-    def add_to_buffer(self, buff):
-        buff.failures.setdefault(self.destination, []).append(self.failure)
-
-
 class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
     "destination",  # str
 ))):
@@ -471,7 +417,6 @@ TypeToRow = {
         PresenceRow,
         KeyedEduRow,
         EduRow,
-        FailureRow,
         DeviceRow,
     )
 }
@@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
     "presence",  # list(UserPresenceState)
     "keyed_edus",  # dict of destination -> { key -> Edu }
     "edus",  # dict of destination -> [Edu]
-    "failures",  # dict of destination -> [failures]
     "device_destinations",  # set of destinations
 ))
 
@@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows):
         presence=[],
         keyed_edus={},
         edus={},
-        failures={},
         device_destinations=set(),
     )
 
@@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows):
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in iteritems(buff.failures):
-        for failure in failure_list:
-            transaction_queue.send_failure(destination, failure)
-
     for destination in buff.device_destinations:
         transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 6996d6b695..78f9d40a3a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -116,9 +116,6 @@ class TransactionQueue(object):
             ),
         )
 
-        # destination -> list of tuple(failure, deferred)
-        self.pending_failures_by_dest = {}
-
         # destination -> stream_id of last successfully sent to-device message.
         # NB: may be a long or an int.
         self.last_device_stream_id_by_dest = {}
@@ -382,19 +379,6 @@ class TransactionQueue(object):
 
         self._attempt_new_transaction(destination)
 
-    def send_failure(self, failure, destination):
-        if destination == self.server_name or destination == "localhost":
-            return
-
-        if not self.can_send_to(destination):
-            return
-
-        self.pending_failures_by_dest.setdefault(
-            destination, []
-        ).append(failure)
-
-        self._attempt_new_transaction(destination)
-
     def send_device_messages(self, destination):
         if destination == self.server_name or destination == "localhost":
             return
@@ -469,7 +453,6 @@ class TransactionQueue(object):
                 pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
                 pending_edus = self.pending_edus_by_dest.pop(destination, [])
                 pending_presence = self.pending_presence_by_dest.pop(destination, {})
-                pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
                 pending_edus.extend(
                     self.pending_edus_keyed_by_dest.pop(destination, {}).values()
@@ -497,7 +480,7 @@ class TransactionQueue(object):
                     logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
                                  destination, len(pending_pdus))
 
-                if not pending_pdus and not pending_edus and not pending_failures:
+                if not pending_pdus and not pending_edus:
                     logger.debug("TX [%s] Nothing to send", destination)
                     self.last_device_stream_id_by_dest[destination] = (
                         device_stream_id
@@ -507,7 +490,7 @@ class TransactionQueue(object):
                 # END CRITICAL SECTION
 
                 success = yield self._send_new_transaction(
-                    destination, pending_pdus, pending_edus, pending_failures,
+                    destination, pending_pdus, pending_edus,
                 )
                 if success:
                     sent_transactions_counter.inc()
@@ -584,14 +567,12 @@ class TransactionQueue(object):
 
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
-    def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures):
+    def _send_new_transaction(self, destination, pending_pdus, pending_edus):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
         pdus = [x[0] for x in pending_pdus]
         edus = pending_edus
-        failures = [x.get_dict() for x in pending_failures]
 
         success = True
 
@@ -601,11 +582,10 @@ class TransactionQueue(object):
 
         logger.debug(
             "TX [%s] {%s} Attempting new transaction"
-            " (pdus: %d, edus: %d, failures: %d)",
+            " (pdus: %d, edus: %d)",
             destination, txn_id,
             len(pdus),
             len(edus),
-            len(failures)
         )
 
         logger.debug("TX [%s] Persisting transaction...", destination)
@@ -617,7 +597,6 @@ class TransactionQueue(object):
             destination=destination,
             pdus=pdus,
             edus=edus,
-            pdu_failures=failures,
         )
 
         self._next_txn_id += 1
@@ -627,12 +606,11 @@ class TransactionQueue(object):
         logger.debug("TX [%s] Persisted transaction", destination)
         logger.info(
             "TX [%s] {%s} Sending transaction [%s],"
-            " (PDUs: %d, EDUs: %d, failures: %d)",
+            " (PDUs: %d, EDUs: %d)",
             destination, txn_id,
             transaction.transaction_id,
             len(pdus),
             len(edus),
-            len(failures),
         )
 
         # Actually send the transaction
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8574898f0c..3b5ea9515a 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -283,11 +283,10 @@ class FederationSendServlet(BaseFederationServlet):
             )
 
             logger.info(
-                "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
+                "Received txn %s from %s. (PDUs: %d, EDUs: %d)",
                 transaction_id, origin,
                 len(transaction_data.get("pdus", [])),
                 len(transaction_data.get("edus", [])),
-                len(transaction_data.get("failures", [])),
             )
 
             # We should ideally be getting this from the security layer.
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index bb1b3b13f7..c5ab14314e 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject):
         "previous_ids",
         "pdus",
         "edus",
-        "pdu_failures",
     ]
 
     internal_keys = [
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 49068c06d9..91d8def08b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -76,7 +76,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_federation_client()
+        self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -255,7 +255,7 @@ class FederationHandler(BaseHandler):
                     # know about
                     for p in prevs - seen:
                         state, got_auth_chain = (
-                            yield self.replication_layer.get_state_for_room(
+                            yield self.federation_client.get_state_for_room(
                                 origin, pdu.room_id, p
                             )
                         )
@@ -338,7 +338,7 @@ class FederationHandler(BaseHandler):
         #
         # see https://github.com/matrix-org/synapse/pull/1744
 
-        missing_events = yield self.replication_layer.get_missing_events(
+        missing_events = yield self.federation_client.get_missing_events(
             origin,
             pdu.room_id,
             earliest_events_ids=list(latest),
@@ -522,7 +522,7 @@ class FederationHandler(BaseHandler):
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
 
-        events = yield self.replication_layer.backfill(
+        events = yield self.federation_client.backfill(
             dest,
             room_id,
             limit=limit,
@@ -570,7 +570,7 @@ class FederationHandler(BaseHandler):
         state_events = {}
         events_to_state = {}
         for e_id in edges:
-            state, auth = yield self.replication_layer.get_state_for_room(
+            state, auth = yield self.federation_client.get_state_for_room(
                 destination=dest,
                 room_id=room_id,
                 event_id=e_id
@@ -612,7 +612,7 @@ class FederationHandler(BaseHandler):
                 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
                         logcontext.run_in_background(
-                            self.replication_layer.get_pdu,
+                            self.federation_client.get_pdu,
                             [dest],
                             event_id,
                             outlier=True,
@@ -893,7 +893,7 @@ class FederationHandler(BaseHandler):
 
         Invites must be signed by the invitee's server before distribution.
         """
-        pdu = yield self.replication_layer.send_invite(
+        pdu = yield self.federation_client.send_invite(
             destination=target_host,
             room_id=event.room_id,
             event_id=event.event_id,
@@ -955,7 +955,7 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-            ret = yield self.replication_layer.send_join(target_hosts, event)
+            ret = yield self.federation_client.send_join(target_hosts, event)
 
             origin = ret["origin"]
             state = ret["state"]
@@ -1211,7 +1211,7 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        yield self.replication_layer.send_leave(
+        yield self.federation_client.send_leave(
             target_hosts,
             event
         )
@@ -1234,7 +1234,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
                                content={},):
-        origin, pdu = yield self.replication_layer.make_membership_event(
+        origin, pdu = yield self.federation_client.make_membership_event(
             target_hosts,
             room_id,
             user_id,
@@ -1567,7 +1567,7 @@ class FederationHandler(BaseHandler):
                     missing_auth_events.add(e_id)
 
         for e_id in missing_auth_events:
-            m_ev = yield self.replication_layer.get_pdu(
+            m_ev = yield self.federation_client.get_pdu(
                 [origin],
                 e_id,
                 outlier=True,
@@ -1777,7 +1777,7 @@ class FederationHandler(BaseHandler):
             logger.info("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
             try:
-                remote_auth_chain = yield self.replication_layer.get_event_auth(
+                remote_auth_chain = yield self.federation_client.get_event_auth(
                     origin, event.room_id, event.event_id
                 )
 
@@ -1893,7 +1893,7 @@ class FederationHandler(BaseHandler):
 
                 try:
                     # 2. Get remote difference.
-                    result = yield self.replication_layer.query_auth(
+                    result = yield self.federation_client.query_auth(
                         origin,
                         event.room_id,
                         event.event_id,
@@ -2192,7 +2192,7 @@ class FederationHandler(BaseHandler):
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
-            yield self.replication_layer.forward_third_party_invite(
+            yield self.federation_client.forward_third_party_invite(
                 destinations,
                 room_id,
                 event_dict,
diff --git a/synapse/secrets.py b/synapse/secrets.py
index f397daaa5e..f05e9ea535 100644
--- a/synapse/secrets.py
+++ b/synapse/secrets.py
@@ -20,17 +20,16 @@ See https://docs.python.org/3/library/secrets.html#module-secrets for the API
 used in Python 3.6, and the API emulated in Python 2.7.
 """
 
-import six
+import sys
 
-if six.PY3:
+# secrets is available since python 3.6
+if sys.version_info[0:2] >= (3, 6):
     import secrets
 
     def Secrets():
         return secrets
 
-
 else:
-
     import os
     import binascii
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 3ce1b34aea..12282a8b3d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -66,6 +66,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 PresenceStore, TransactionStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 ApplicationServiceStore,
+                EventsStore,
                 EventFederationStore,
                 MediaRepositoryStore,
                 RejectionsStore,
@@ -73,7 +74,6 @@ class DataStore(RoomMemberStore, RoomStore,
                 PusherStore,
                 PushRuleStore,
                 ApplicationServiceTransactionStore,
-                EventsStore,
                 ReceiptsStore,
                 EndToEndKeyStore,
                 SearchStore,
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 9f12b360bc..31248d5e06 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d3ee90017..8bd35df119 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
 from synapse.util.caches.descriptors import cached
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2f482af3a1..c98e524ba1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,8 @@ from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
@@ -193,7 +195,9 @@ def _retry_on_integrity_error(func):
     return f
 
 
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 027bf8c85e..10dce21cea 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,7 +24,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async import Linearizer
 from synapse.util.caches import intern_string
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 25d0097b58..b9f2b74ac6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -43,7 +43,7 @@ from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index f8a07df6b8..861c24809c 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -473,105 +473,101 @@ class CacheListDescriptor(_CacheDescriptorBase):
 
         @functools.wraps(self.orig)
         def wrapped(*args, **kwargs):
-            # If we're passed a cache_context then we'll want to call its invalidate()
-            # whenever we are invalidated
+            # If we're passed a cache_context then we'll want to call its
+            # invalidate() whenever we are invalidated
             invalidate_callback = kwargs.pop("on_invalidate", None)
 
             arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
             keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
             list_args = arg_dict[self.list_name]
 
-            # cached is a dict arg -> deferred, where deferred results in a
-            # 2-tuple (`arg`, `result`)
             results = {}
-            cached_defers = {}
-            missing = []
+
+            def update_results_dict(res, arg):
+                results[arg] = res
+
+            # list of deferreds to wait for
+            cached_defers = []
+
+            missing = set()
 
             # If the cache takes a single arg then that is used as the key,
             # otherwise a tuple is used.
             if num_args == 1:
-                def cache_get(arg):
-                    return cache.get(arg, callback=invalidate_callback)
+                def arg_to_cache_key(arg):
+                    return arg
             else:
-                key = list(keyargs)
+                keylist = list(keyargs)
 
-                def cache_get(arg):
-                    key[self.list_pos] = arg
-                    return cache.get(tuple(key), callback=invalidate_callback)
+                def arg_to_cache_key(arg):
+                    keylist[self.list_pos] = arg
+                    return tuple(keylist)
 
             for arg in list_args:
                 try:
-                    res = cache_get(arg)
-
+                    res = cache.get(arg_to_cache_key(arg),
+                                    callback=invalidate_callback)
                     if not isinstance(res, ObservableDeferred):
                         results[arg] = res
                     elif not res.has_succeeded():
                         res = res.observe()
-                        res.addCallback(lambda r, arg: (arg, r), arg)
-                        cached_defers[arg] = res
+                        res.addCallback(update_results_dict, arg)
+                        cached_defers.append(res)
                     else:
                         results[arg] = res.get_result()
                 except KeyError:
-                    missing.append(arg)
+                    missing.add(arg)
 
             if missing:
+                # we need an observable deferred for each entry in the list,
+                # which we put in the cache. Each deferred resolves with the
+                # relevant result for that key.
+                deferreds_map = {}
+                for arg in missing:
+                    deferred = defer.Deferred()
+                    deferreds_map[arg] = deferred
+                    key = arg_to_cache_key(arg)
+                    observable = ObservableDeferred(deferred)
+                    cache.set(key, observable, callback=invalidate_callback)
+
+                def complete_all(res):
+                    # the wrapped function has completed. It returns a
+                    # a dict. We can now resolve the observable deferreds in
+                    # the cache and update our own result map.
+                    for e in missing:
+                        val = res.get(e, None)
+                        deferreds_map[e].callback(val)
+                        results[e] = val
+
+                def errback(f):
+                    # the wrapped function has failed. Invalidate any cache
+                    # entries we're supposed to be populating, and fail
+                    # their deferreds.
+                    for e in missing:
+                        key = arg_to_cache_key(e)
+                        cache.invalidate(key)
+                        deferreds_map[e].errback(f)
+
+                    # return the failure, to propagate to our caller.
+                    return f
+
                 args_to_call = dict(arg_dict)
-                args_to_call[self.list_name] = missing
+                args_to_call[self.list_name] = list(missing)
 
-                ret_d = defer.maybeDeferred(
+                cached_defers.append(defer.maybeDeferred(
                     logcontext.preserve_fn(self.function_to_call),
                     **args_to_call
-                )
-
-                ret_d = ObservableDeferred(ret_d)
-
-                # We need to create deferreds for each arg in the list so that
-                # we can insert the new deferred into the cache.
-                for arg in missing:
-                    observer = ret_d.observe()
-                    observer.addCallback(lambda r, arg: r.get(arg, None), arg)
-
-                    observer = ObservableDeferred(observer)
-
-                    if num_args == 1:
-                        cache.set(
-                            arg, observer,
-                            callback=invalidate_callback
-                        )
-
-                        def invalidate(f, key):
-                            cache.invalidate(key)
-                            return f
-                        observer.addErrback(invalidate, arg)
-                    else:
-                        key = list(keyargs)
-                        key[self.list_pos] = arg
-                        cache.set(
-                            tuple(key), observer,
-                            callback=invalidate_callback
-                        )
-
-                        def invalidate(f, key):
-                            cache.invalidate(key)
-                            return f
-                        observer.addErrback(invalidate, tuple(key))
-
-                    res = observer.observe()
-                    res.addCallback(lambda r, arg: (arg, r), arg)
-
-                    cached_defers[arg] = res
+                ).addCallbacks(complete_all, errback))
 
             if cached_defers:
-                def update_results_dict(res):
-                    results.update(res)
-                    return results
-
-                return logcontext.make_deferred_yieldable(defer.gatherResults(
-                    list(cached_defers.values()),
+                d = defer.gatherResults(
+                    cached_defers,
                     consumeErrors=True,
-                ).addCallback(update_results_dict).addErrback(
+                ).addCallbacks(
+                    lambda _: results,
                     unwrapFirstError
-                ))
+                )
+                return logcontext.make_deferred_yieldable(d)
             else:
                 return results
 
@@ -625,7 +621,8 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal
     cache.
 
     Args:
-        cache (Cache): The underlying cache to use.
+        cached_method_name (str): The name of the single-item lookup method.
+            This is only used to find the cache to use.
         list_name (str): The name of the argument that is the list to use to
             do batch lookups in the cache.
         num_args (int): Number of arguments to use as the key in the cache