summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/replication.py93
-rw-r--r--synapse/federation/transport.py2
-rw-r--r--synapse/handlers/federation.py7
-rw-r--r--synapse/handlers/presence.py12
-rw-r--r--synapse/handlers/room.py7
-rw-r--r--synapse/handlers/typing.py120
-rw-r--r--synapse/http/matrixfederationclient.py35
-rw-r--r--synapse/rest/room.py32
-rw-r--r--synapse/rest/transactions.py2
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/schema/delta/v9.sql23
-rw-r--r--synapse/storage/schema/transactions.sql6
-rw-r--r--synapse/storage/transactions.py106
14 files changed, 403 insertions, 46 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 01f87fe423..0cb632fb08 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -334,7 +334,7 @@ class ReplicationLayer(object):
             defer.returnValue(response)
             return
 
-        logger.debug("[%s] Transacition is new", transaction.transaction_id)
+        logger.debug("[%s] Transaction is new", transaction.transaction_id)
 
         with PreserveLoggingContext():
             dl = []
@@ -685,6 +685,7 @@ class _TransactionQueue(object):
         self.transport_layer = transport_layer
 
         self._clock = hs.get_clock()
+        self.store = hs.get_datastore()
 
         # Is a mapping from destinations -> deferreds. Used to keep track
         # of which destinations have transactions in flight and when they are
@@ -728,8 +729,14 @@ class _TransactionQueue(object):
                 (pdu, deferred, order)
             )
 
+            def eb(failure):
+                if not deferred.called:
+                    deferred.errback(failure)
+                else:
+                    logger.warn("Failed to send pdu", failure)
+
             with PreserveLoggingContext():
-                self._attempt_new_transaction(destination)
+                self._attempt_new_transaction(destination).addErrback(eb)
 
             deferreds.append(deferred)
 
@@ -739,6 +746,9 @@ class _TransactionQueue(object):
     def enqueue_edu(self, edu):
         destination = edu.destination
 
+        if destination == self.server_name:
+            return
+
         deferred = defer.Deferred()
         self.pending_edus_by_dest.setdefault(destination, []).append(
             (edu, deferred)
@@ -748,7 +758,7 @@ class _TransactionQueue(object):
             if not deferred.called:
                 deferred.errback(failure)
             else:
-                logger.exception("Failed to send edu", failure)
+                logger.warn("Failed to send edu", failure)
 
         with PreserveLoggingContext():
             self._attempt_new_transaction(destination).addErrback(eb)
@@ -770,10 +780,33 @@ class _TransactionQueue(object):
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
+
+        (retry_last_ts, retry_interval) = (0, 0)
+        retry_timings = yield self.store.get_destination_retry_timings(
+            destination
+        )
+        if retry_timings:
+            (retry_last_ts, retry_interval) = (
+                retry_timings.retry_last_ts, retry_timings.retry_interval
+            )
+            if retry_last_ts + retry_interval > int(self._clock.time_msec()):
+                logger.info(
+                    "TX [%s] not ready for retry yet - "
+                    "dropping transaction for now",
+                    destination,
+                )
+                return
+            else:
+                logger.info("TX [%s] is ready for retry", destination)
+
         if destination in self.pending_transactions:
+            # XXX: pending_transactions can get stuck on by a never-ending
+            # request at which point pending_pdus_by_dest just keeps growing.
+            # we need application-layer timeouts of some flavour of these
+            # requests
             return
 
-        #  list of (pending_pdu, deferred, order)
+        # list of (pending_pdu, deferred, order)
         pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
         pending_edus = self.pending_edus_by_dest.pop(destination, [])
         pending_failures = self.pending_failures_by_dest.pop(destination, [])
@@ -781,7 +814,14 @@ class _TransactionQueue(object):
         if not pending_pdus and not pending_edus and not pending_failures:
             return
 
-        logger.debug("TX [%s] Attempting new transaction", destination)
+        logger.debug(
+            "TX [%s] Attempting new transaction "
+            "(pdus: %d, edus: %d, failures: %d)",
+            destination,
+            len(pending_pdus),
+            len(pending_edus),
+            len(pending_failures)
+        )
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[2])
@@ -814,7 +854,11 @@ class _TransactionQueue(object):
             yield self.transaction_actions.prepare_to_send(transaction)
 
             logger.debug("TX [%s] Persisted transaction", destination)
-            logger.debug("TX [%s] Sending transaction...", destination)
+            logger.info(
+                "TX [%s] Sending transaction [%s]",
+                destination,
+                transaction.transaction_id,
+            )
 
             # Actually send the transaction
 
@@ -835,6 +879,8 @@ class _TransactionQueue(object):
                 transaction, json_data_cb
             )
 
+            logger.info("TX [%s] got %d response", destination, code)
+
             logger.debug("TX [%s] Sent transaction", destination)
             logger.debug("TX [%s] Marking as delivered...", destination)
 
@@ -847,8 +893,14 @@ class _TransactionQueue(object):
 
             for deferred in deferreds:
                 if code == 200:
+                    if retry_last_ts:
+                        # this host is alive! reset retry schedule
+                        yield self.store.set_destination_retry_timings(
+                            destination, 0, 0
+                        )
                     deferred.callback(None)
                 else:
+                    self.set_retrying(destination, retry_interval)
                     deferred.errback(RuntimeError("Got status %d" % code))
 
                 # Ensures we don't continue until all callbacks on that
@@ -861,11 +913,15 @@ class _TransactionQueue(object):
             logger.debug("TX [%s] Yielded to callbacks", destination)
 
         except Exception as e:
-            logger.error("TX Problem in _attempt_transaction")
-
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
-            logger.exception(e)
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
+
+            self.set_retrying(destination, retry_interval)
 
             for deferred in deferreds:
                 if not deferred.called:
@@ -877,3 +933,22 @@ class _TransactionQueue(object):
 
             # Check to see if there is anything else to send.
             self._attempt_new_transaction(destination)
+
+    @defer.inlineCallbacks
+    def set_retrying(self, destination, retry_interval):
+        # track that this destination is having problems and we should
+        # give it a chance to recover before trying it again
+
+        if retry_interval:
+            retry_interval *= 2
+            # plateau at hourly retries for now
+            if retry_interval >= 60 * 60 * 1000:
+                retry_interval = 60 * 60 * 1000
+        else:
+            retry_interval = 2000  # try again at first after 2 seconds
+
+        yield self.store.set_destination_retry_timings(
+            destination,
+            int(self._clock.time_msec()),
+            retry_interval
+        )
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index 8d86152085..0f11c6d491 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -155,7 +155,7 @@ class TransportLayer(object):
     @defer.inlineCallbacks
     @log_function
     def send_transaction(self, transaction, json_data_callback=None):
-        """ Sends the given Transaction to it's destination
+        """ Sends the given Transaction to its destination
 
         Args:
             transaction (Transaction)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 925eb5376e..cfb5029774 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -207,6 +207,13 @@ class FederationHandler(BaseHandler):
                 e.msg,
                 affected=event.event_id,
             )
+            
+        # if we're receiving valid events from an origin,
+        # it's probably a good idea to mark it as not in retry-state
+        # for sending (although this is a bit of a leap)
+        retry_timings = yield self.store.get_destination_retry_timings(origin)
+        if (retry_timings and retry_timings.retry_last_ts):
+            self.store.set_destination_retry_timings(origin, 0, 0)
 
         room = yield self.store.get_room(event.room_id)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 815d40f166..84a039489f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -659,10 +659,6 @@ class PresenceHandler(BaseHandler):
             if room_ids:
                 logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
 
-            if not observers and not room_ids:
-                logger.debug(" | no interested observers or room IDs")
-                continue
-
             state = dict(push)
             del state["user_id"]
 
@@ -683,6 +679,10 @@ class PresenceHandler(BaseHandler):
             self._user_cachemap_latest_serial += 1
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
+            if not observers and not room_ids:
+                logger.debug(" | no interested observers or room IDs")
+                continue
+
             self.push_update_to_clients(
                 observed_user=user,
                 users_to_push=observers,
@@ -804,6 +804,7 @@ class PresenceEventSource(object):
             )
 
     @defer.inlineCallbacks
+    @log_function
     def get_new_events_for_user(self, user, from_key, limit):
         from_key = int(from_key)
 
@@ -816,7 +817,8 @@ class PresenceEventSource(object):
         # TODO(paul): use a DeferredList ? How to limit concurrency.
         for observed_user in cachemap.keys():
             cached = cachemap[observed_user]
-            if not (from_key < cached.serial):
+
+            if cached.serial <= from_key:
                 continue
 
             if (yield self.is_visible(observer_user, observed_user)):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a000b44036..c802e8f69d 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -260,6 +260,7 @@ class RoomMemberHandler(BaseHandler):
 
         self.distributor = hs.get_distributor()
         self.distributor.declare("user_joined_room")
+        self.distributor.declare("user_left_room")
 
     @defer.inlineCallbacks
     def get_room_members(self, room_id, membership=Membership.JOIN):
@@ -387,6 +388,12 @@ class RoomMemberHandler(BaseHandler):
                 do_auth=do_auth,
             )
 
+            if prev_state and prev_state.membership == Membership.JOIN:
+                user = self.hs.parse_userid(event.user_id)
+                self.distributor.fire(
+                    "user_left_room", user=user, room_id=event.room_id
+                )
+
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d88a53242c..46a0b299a1 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -43,7 +43,23 @@ class TypingNotificationHandler(BaseHandler):
 
         self.federation.register_edu_handler("m.typing", self._recv_edu)
 
-        self._member_typing_until = {}
+        hs.get_distributor().observe("user_left_room", self.user_left_room)
+
+        self._member_typing_until = {} # clock time we expect to stop
+        self._member_typing_timer = {} # deferreds to manage theabove
+
+        # map room IDs to serial numbers
+        self._room_serials = {}
+        self._latest_room_serial = 0
+        # map room IDs to sets of users currently typing
+        self._room_typing = {}
+
+    def tearDown(self):
+        """Cancels all the pending timers.
+        Normally this shouldn't be needed, but it's required from unit tests
+        to avoid a "Reactor was unclean" warning."""
+        for t in self._member_typing_timer.values():
+            self.clock.cancel_call_later(t)
 
     @defer.inlineCallbacks
     def started_typing(self, target_user, auth_user, room_id, timeout):
@@ -53,12 +69,24 @@ class TypingNotificationHandler(BaseHandler):
         if target_user != auth_user:
             raise AuthError(400, "Cannot set another user's typing state")
 
+        yield self.auth.check_joined_room(room_id, target_user.to_string())
+
+        logger.debug(
+            "%s has started typing in %s", target_user.to_string(), room_id
+        )
+
         until = self.clock.time_msec() + timeout
         member = RoomMember(room_id=room_id, user=target_user)
 
         was_present = member in self._member_typing_until
 
+        if member in self._member_typing_timer:
+            self.clock.cancel_call_later(self._member_typing_timer[member])
+
         self._member_typing_until[member] = until
+        self._member_typing_timer[member] = self.clock.call_later(
+            timeout / 1000, lambda: self._stopped_typing(member)
+        )
 
         if was_present:
             # No point sending another notification
@@ -78,18 +106,39 @@ class TypingNotificationHandler(BaseHandler):
         if target_user != auth_user:
             raise AuthError(400, "Cannot set another user's typing state")
 
+        yield self.auth.check_joined_room(room_id, target_user.to_string())
+
+        logger.debug(
+            "%s has stopped typing in %s", target_user.to_string(), room_id
+        )
+
         member = RoomMember(room_id=room_id, user=target_user)
 
+        yield self._stopped_typing(member)
+
+    @defer.inlineCallbacks
+    def user_left_room(self, user, room_id):
+        if user.is_mine:
+            member = RoomMember(room_id=room_id, user=user)
+            yield self._stopped_typing(member)
+
+    @defer.inlineCallbacks
+    def _stopped_typing(self, member):
         if member not in self._member_typing_until:
             # No point
             defer.returnValue(None)
 
         yield self._push_update(
-            room_id=room_id,
-            user=target_user,
+            room_id=member.room_id,
+            user=member.user,
             typing=False,
         )
 
+        del self._member_typing_until[member]
+
+        self.clock.cancel_call_later(self._member_typing_timer[member])
+        del self._member_typing_timer[member]
+
     @defer.inlineCallbacks
     def _push_update(self, room_id, user, typing):
         localusers = set()
@@ -101,12 +150,11 @@ class TypingNotificationHandler(BaseHandler):
             ignore_user=user
         )
 
-        for u in localusers:
-            self.push_update_to_clients(
+        if localusers:
+            self._push_update_local(
                 room_id=room_id,
-                observer_user=u,
-                observed_user=user,
-                typing=typing,
+                user=user,
+                typing=typing
             )
 
         deferreds = []
@@ -135,29 +183,65 @@ class TypingNotificationHandler(BaseHandler):
             room_id, localusers=localusers
         )
 
-        for u in localusers:
-            self.push_update_to_clients(
+        if localusers:
+            self._push_update_local(
                 room_id=room_id,
-                observer_user=u,
-                observed_user=user,
+                user=user,
                 typing=content["typing"]
             )
 
-    def push_update_to_clients(self, room_id, observer_user, observed_user,
-                               typing):
-        # TODO(paul) steal this from presence.py
-        pass
+    def _push_update_local(self, room_id, user, typing):
+        if room_id not in self._room_serials:
+            self._room_serials[room_id] = 0
+            self._room_typing[room_id] = set()
+
+        room_set = self._room_typing[room_id]
+        if typing:
+            room_set.add(user)
+        elif user in room_set:
+            room_set.remove(user)
+
+        self._latest_room_serial += 1
+        self._room_serials[room_id] = self._latest_room_serial
+
+        self.notifier.on_new_user_event(rooms=[room_id])
 
 
 class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
+        self._handler = None
+
+    def handler(self):
+        # Avoid cyclic dependency in handler setup
+        if not self._handler:
+            self._handler = self.hs.get_handlers().typing_notification_handler
+        return self._handler
+
+    def _make_event_for(self, room_id):
+        typing = self.handler()._room_typing[room_id]
+        return {
+            "type": "m.typing",
+            "room_id": room_id,
+            "typing": [u.to_string() for u in typing],
+        }
 
     def get_new_events_for_user(self, user, from_key, limit):
-        return ([], from_key)
+        from_key = int(from_key)
+        handler = self.handler()
+
+        events = []
+        for room_id in handler._room_serials:
+            if handler._room_serials[room_id] <= from_key:
+                continue
+
+            # TODO: check if user is in room
+            events.append(self._make_event_for(room_id))
+
+        return (events, handler._latest_room_serial)
 
     def get_current_key(self):
-        return 0
+        return self.handler()._latest_room_serial
 
     def get_pagination_rows(self, user, pagination_config, key):
         return ([], pagination_config.from_key)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 510f07dd7b..fc5b5ab809 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -89,8 +89,8 @@ class MatrixFederationHttpClient(object):
             ("", "", path_bytes, param_bytes, query_bytes, "",)
         )
 
-        logger.debug("Sending request to %s: %s %s",
-                     destination, method, url_bytes)
+        logger.info("Sending request to %s: %s %s",
+                    destination, method, url_bytes)
 
         logger.debug(
             "Types: %s",
@@ -101,6 +101,8 @@ class MatrixFederationHttpClient(object):
             ]
         )
 
+        # XXX: Would be much nicer to retry only at the transaction-layer
+        # (once we have reliable transactions in place)
         retries_left = 5
 
         endpoint = self._getEndpoint(reactor, destination)
@@ -127,11 +129,20 @@ class MatrixFederationHttpClient(object):
                 break
             except Exception as e:
                 if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                    logger.warn("DNS Lookup failed to %s with %s", destination,
-                                e)
+                    logger.warn(
+                        "DNS Lookup failed to %s with %s",
+                        destination,
+                        e
+                    )
                     raise SynapseError(400, "Domain specified not found.")
 
-                logger.exception("Got error in _create_request")
+                logger.warn(
+                    "Sending request failed to %s: %s %s : %s",
+                    destination,
+                    method,
+                    url_bytes,
+                    e
+                )
                 _print_ex(e)
 
                 if retries_left:
@@ -140,15 +151,21 @@ class MatrixFederationHttpClient(object):
                 else:
                     raise
 
+        logger.info(
+            "Received response %d %s for %s: %s %s",
+            response.code,
+            response.phrase,
+            destination,
+            method,
+            url_bytes
+        )
+
         if 200 <= response.code < 300:
             # We need to update the transactions table to say it was sent?
             pass
         else:
             # :'(
             # Update transactions table?
-            logger.error(
-                "Got response %d %s", response.code, response.phrase
-            )
             raise CodeMessageException(
                 response.code, response.phrase
             )
@@ -284,7 +301,7 @@ def _print_ex(e):
         for ex in e.reasons:
             _print_ex(ex)
     else:
-        logger.exception(e)
+        logger.warn(e)
 
 
 class _JsonProducer(object):
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 3147d7a60b..e7f28c2784 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -483,6 +483,37 @@ class RoomRedactEventRestServlet(RestServlet):
         defer.returnValue(response)
 
 
+class RoomTypingRestServlet(RestServlet):
+    PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/typing/(?P<user_id>[^/]*)$")
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, room_id, user_id):
+        auth_user = yield self.auth.get_user_by_req(request)
+
+        room_id = urllib.unquote(room_id)
+        target_user = self.hs.parse_userid(urllib.unquote(user_id))
+
+        content = _parse_json(request)
+
+        typing_handler = self.handlers.typing_notification_handler
+
+        if content["typing"]:
+            yield typing_handler.started_typing(
+                target_user=target_user,
+                auth_user=auth_user,
+                room_id=room_id,
+                timeout=content.get("timeout", 30000),
+            )
+        else:
+            yield typing_handler.stopped_typing(
+                target_user=target_user,
+                auth_user=auth_user,
+                room_id=room_id,
+            )
+
+        defer.returnValue((200, {}))
+
+
 def _parse_json(request):
     try:
         content = json.loads(request.content.read())
@@ -538,3 +569,4 @@ def register_servlets(hs, http_server):
     RoomStateRestServlet(hs).register(http_server)
     RoomInitialSyncRestServlet(hs).register(http_server)
     RoomRedactEventRestServlet(hs).register(http_server)
+    RoomTypingRestServlet(hs).register(http_server)
diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py
index 93c0122f30..8c41ab4edb 100644
--- a/synapse/rest/transactions.py
+++ b/synapse/rest/transactions.py
@@ -19,7 +19,7 @@ import logging
 
 logger = logging.getLogger(__name__)
 
-
+# FIXME: elsewhere we use FooStore to indicate something in the storage layer...
 class HttpTransactionStore(object):
 
     def __init__(self):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f15e3dfe62..04ab39341d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -67,7 +67,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 9
 
 
 class _RollbackButIsFineException(Exception):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..e72200e2f7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -650,7 +650,7 @@ class JoinHelper(object):
     to dump the results into.
 
     Attributes:
-        taples (list): List of `Table` classes
+        tables (list): List of `Table` classes
         EntryType (type)
     """
 
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..ad680c64da
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,23 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
+
+PRAGMA user_version = 9;
\ No newline at end of file
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
 
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
 
 from collections import namedtuple
 
+from twisted.internet import defer
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
 
+    # a write-through cache of DestinationsTable.EntryType indexed by
+    # destination string
+    destination_retry_cache = {}
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
-        # First we find out what the prev_txs should be.
+        # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
         query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
 
         return ReceivedTransactionsTable.decode_results(txn.fetchall())
 
+    def get_destination_retry_timings(self, destination):
+        """Gets the current retry timings (if any) for a given destination.
+
+        Args:
+            destination (str)
+
+        Returns:
+            None if not retrying
+            Otherwise a DestinationsTable.EntryType for the retry scheme
+        """
+        if destination in self.destination_retry_cache:
+            return defer.succeed(self.destination_retry_cache[destination])
+
+        return self.runInteraction(
+            "get_destination_retry_timings",
+            self._get_destination_retry_timings, destination)
+
+    def _get_destination_retry_timings(cls, txn, destination):
+        query = DestinationsTable.select_statement("destination = ?")
+        txn.execute(query, (destination,))
+        result = txn.fetchall()
+        if result:
+            result = DestinationsTable.decode_single_result(result)
+            if result.retry_last_ts > 0:
+                return result
+            else:
+                return None
+
+    def set_destination_retry_timings(self, destination,
+                                      retry_last_ts, retry_interval):
+        """Sets the current retry timings for a given destination.
+        Both timings should be zero if retrying is no longer occuring.
+
+        Args:
+            destination (str)
+            retry_last_ts (int) - time of last retry attempt in unix epoch ms
+            retry_interval (int) - how long until next retry in ms
+        """
+
+        self.destination_retry_cache[destination] = (
+            DestinationsTable.EntryType(
+                destination,
+                retry_last_ts,
+                retry_interval
+            )
+        )
+
+        # XXX: we could chose to not bother persisting this if our cache thinks
+        # this is a NOOP
+        return self.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings,
+            destination,
+            retry_last_ts,
+            retry_interval,
+        )
+
+    def _set_destination_retry_timings(cls, txn, destination,
+                                       retry_last_ts, retry_interval):
+
+        query = (
+            "INSERT OR REPLACE INTO %s "
+            "(destination, retry_last_ts, retry_interval) "
+            "VALUES (?, ?, ?) "
+        ) % DestinationsTable.table_name
+
+        txn.execute(query, (destination, retry_last_ts, retry_interval))
+
+    def get_destinations_needing_retry(self):
+        """Get all destinations which are due a retry for sending a transaction.
+
+        Returns:
+            list: A list of `DestinationsTable.EntryType`
+        """
+
+        return self.runInteraction(
+            "get_destinations_needing_retry",
+            self._get_destinations_needing_retry
+        )
+
+    def _get_destinations_needing_retry(cls, txn):
+        where = "retry_last_ts > 0 and retry_next_ts < now()"
+        query = DestinationsTable.select_statement(where)
+        txn.execute(query)
+        return DestinationsTable.decode_results(txn.fetchall())
+
 
 class ReceivedTransactionsTable(Table):
     table_name = "received_transactions"
@@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
     ]
 
     EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+
+class DestinationsTable(Table):
+    table_name = "destinations"
+
+    fields = [
+        "destination",
+        "retry_last_ts",
+        "retry_interval",
+    ]
+
+    EntryType = namedtuple("DestinationsEntry", fields)