summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/urls.py2
-rwxr-xr-xsynapse/app/homeserver.py1
-rw-r--r--synapse/federation/transaction_queue.py36
-rw-r--r--synapse/handlers/typing.py1
-rw-r--r--synapse/storage/transactions.py23
-rw-r--r--synapse/util/caches/expiringcache.py24
6 files changed, 52 insertions, 35 deletions
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 71347912f1..6d9f1ca0ef 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -64,7 +64,7 @@ class ConsentURIBuilder(object):
         """
         mac = hmac.new(
             key=self._hmac_secret,
-            msg=user_id,
+            msg=user_id.encode('ascii'),
             digestmod=sha256,
         ).hexdigest()
         consent_uri = "%s_matrix/consent?%s" % (
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index a98fdbd210..e3f0d99a3f 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -386,7 +386,6 @@ def setup(config_options):
         hs.get_pusherpool().start()
         hs.get_datastore().start_profiling()
         hs.get_datastore().start_doing_background_updates()
-        hs.get_federation_client().start_get_pdu_cache()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 8cbf8c4f7f..98b5950800 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -137,26 +137,6 @@ class TransactionQueue(object):
 
         self._processing_pending_presence = False
 
-    def can_send_to(self, destination):
-        """Can we send messages to the given server?
-
-        We can't send messages to ourselves. If we are running on localhost
-        then we can only federation with other servers running on localhost.
-        Otherwise we only federate with servers on a public domain.
-
-        Args:
-            destination(str): The server we are possibly trying to send to.
-        Returns:
-            bool: True if we can send to the server.
-        """
-
-        if destination == self.server_name:
-            return False
-        if self.server_name.startswith("localhost"):
-            return destination.startswith("localhost")
-        else:
-            return not destination.startswith("localhost")
-
     def notify_new_events(self, current_id):
         """This gets called when we have some new events we might want to
         send out to other servers.
@@ -279,10 +259,7 @@ class TransactionQueue(object):
         self._order += 1
 
         destinations = set(destinations)
-        destinations = set(
-            dest for dest in destinations if self.can_send_to(dest)
-        )
-
+        destinations.discard(self.server_name)
         logger.debug("Sending to: %s", str(destinations))
 
         if not destinations:
@@ -358,7 +335,7 @@ class TransactionQueue(object):
 
         for destinations, states in hosts_and_states:
             for destination in destinations:
-                if not self.can_send_to(destination):
+                if destination == self.server_name:
                     continue
 
                 self.pending_presence_by_dest.setdefault(
@@ -377,7 +354,8 @@ class TransactionQueue(object):
             content=content,
         )
 
-        if not self.can_send_to(destination):
+        if destination == self.server_name:
+            logger.info("Not sending EDU to ourselves")
             return
 
         sent_edus_counter.inc()
@@ -392,10 +370,8 @@ class TransactionQueue(object):
         self._attempt_new_transaction(destination)
 
     def send_device_messages(self, destination):
-        if destination == self.server_name or destination == "localhost":
-            return
-
-        if not self.can_send_to(destination):
+        if destination == self.server_name:
+            logger.info("Not sending device update to ourselves")
             return
 
         self._attempt_new_transaction(destination)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 65f475d639..c610933dd4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -224,6 +224,7 @@ class TypingHandler(object):
 
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
+                    logger.debug("sending typing update to %s", domain)
                     self.federation.send_edu(
                         destination=domain,
                         edu_type="m.typing",
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index baf0379a68..ab54977a75 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.caches.expiringcache import ExpiringCache
 
 from ._base import SQLBaseStore, db_to_json
 
@@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
     )
 )
 
+SENTINEL = object()
+
 
 class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
@@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore):
 
         self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
 
+        self._destination_retry_cache = ExpiringCache(
+            cache_name="get_destination_retry_timings",
+            clock=self._clock,
+            expiry_ms=5 * 60 * 1000,
+        )
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore):
         """
         pass
 
+    @defer.inlineCallbacks
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.
 
@@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore):
             None if not retrying
             Otherwise a dict for the retry scheme
         """
-        return self.runInteraction(
+
+        result = self._destination_retry_cache.get(destination, SENTINEL)
+        if result is not SENTINEL:
+            defer.returnValue(result)
+
+        result = yield self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
 
+        # We don't hugely care about race conditions between getting and
+        # invalidating the cache, since we time out fairly quickly anyway.
+        self._destination_retry_cache[destination] = result
+        defer.returnValue(result)
+
     def _get_destination_retry_timings(self, txn, destination):
         result = self._simple_select_one_txn(
             txn,
@@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
+        self._destination_retry_cache.pop(destination)
         return self.runInteraction(
             "set_destination_retry_timings",
             self._set_destination_retry_timings,
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 9af4ec4aa8..f369780277 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,7 +16,7 @@
 import logging
 from collections import OrderedDict
 
-from six import itervalues
+from six import iteritems, itervalues
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import register_cache
@@ -24,6 +24,9 @@ from synapse.util.caches import register_cache
 logger = logging.getLogger(__name__)
 
 
+SENTINEL = object()
+
+
 class ExpiringCache(object):
     def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
                  reset_expiry_on_get=False, iterable=False):
@@ -95,6 +98,21 @@ class ExpiringCache(object):
 
         return entry.value
 
+    def pop(self, key, default=SENTINEL):
+        """Removes and returns the value with the given key from the cache.
+
+        If the key isn't in the cache then `default` will be returned if
+        specified, otherwise `KeyError` will get raised.
+
+        Identical functionality to `dict.pop(..)`.
+        """
+
+        value = self._cache.pop(key, default)
+        if value is SENTINEL:
+            raise KeyError(key)
+
+        return value
+
     def __contains__(self, key):
         return key in self._cache
 
@@ -122,7 +140,7 @@ class ExpiringCache(object):
 
         keys_to_delete = set()
 
-        for key, cache_entry in self._cache.items():
+        for key, cache_entry in iteritems(self._cache):
             if now - cache_entry.time > self._expiry_ms:
                 keys_to_delete.add(key)
 
@@ -146,6 +164,8 @@ class ExpiringCache(object):
 
 
 class _CacheEntry(object):
+    __slots__ = ["time", "value"]
+
     def __init__(self, time, value):
         self.time = time
         self.value = value