From c1f18892bbd7286e7075af3ae2cd1f96bdf16be1 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 8 Mar 2017 12:58:56 +0000
Subject: Bump changelog and version

---
 CHANGES.rst         | 32 ++++++++++++++++++++++++++++++++
 synapse/__init__.py |  2 +-
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/CHANGES.rst b/CHANGES.rst
index c8856d9575..be75b3b210 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,35 @@
+Changes in synapse v0.19.3-rc1 (2017-03-08)
+===========================================
+
+Features:
+
+* Add some administration functionalities. Thanks to morteza-araby! (PR #1784)
+
+
+Changes:
+
+* Reduce database table sizes (PR #1873, #1916, #1923, #1963)
+* Update contrib/ to not use syutil. Thanks to andrewshadura! (PR #1907)
+* Don't fetch current state when sending an event in common case (PR #1955)
+
+
+Bug fixes:
+
+* Fix synapse_port_db failure. Thanks to Pneumaticat! (PR #1904)
+* Fix caching to not cache error responses (PR #1913)
+* Fix APIs to make kick & ban reasons work (PR #1917)
+* Fix bugs in the /keys/changes api (PR #1921)
+* Fix bug where users couldn't forget rooms they were banned from (PR #1922)
+* Fix issue with long language values in pushers API (PR #1925)
+* Fix a race in transaction queue (PR #1930)
+* Fix dynamic thumbnailing to preserve aspect ratio. Thanks to jkolo! (PR
+#1945)
+* Fix device list update to not constantly resync (PR #1964)
+* Fix potential for huge memory usage when getting device that have
+  changed (PR #1969)
+
+
+
 Changes in synapse v0.19.2 (2017-02-20)
 =======================================
 
diff --git a/synapse/__init__.py b/synapse/__init__.py
index ff251ce597..787d621067 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.19.2"
+__version__ = "0.19.3-rc1"
-- 
cgit 1.4.1


From 3170c56e075493317f9586e98266a81b37eea50a Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 8 Mar 2017 12:58:56 +0000
Subject: Bump changelog and version

---
 CHANGES.rst         | 32 ++++++++++++++++++++++++++++++++
 synapse/__init__.py |  2 +-
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/CHANGES.rst b/CHANGES.rst
index c8856d9575..1817f666de 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,35 @@
+Changes in synapse v0.19.3-rc1 (2017-03-08)
+===========================================
+
+Features:
+
+* Add some administration functions. Thanks to morteza-araby! (PR #1784)
+
+
+Changes:
+
+* Reduce database table sizes (PR #1873, #1916, #1923, #1963)
+* Update contrib/ to not use syutil. Thanks to andrewshadura! (PR #1907)
+* Don't fetch current state when sending an event in common case (PR #1955)
+
+
+Bug fixes:
+
+* Fix synapse_port_db failure. Thanks to Pneumaticat! (PR #1904)
+* Fix caching to not cache error responses (PR #1913)
+* Fix APIs to make kick & ban reasons work (PR #1917)
+* Fix bugs in the /keys/changes api (PR #1921)
+* Fix bug where users couldn't forget rooms they were banned from (PR #1922)
+* Fix issue with long language values in pushers API (PR #1925)
+* Fix a race in transaction queue (PR #1930)
+* Fix dynamic thumbnailing to preserve aspect ratio. Thanks to jkolo! (PR
+  #1945)
+* Fix device list update to not constantly resync (PR #1964)
+* Fix potential for huge memory usage when getting device that have
+  changed (PR #1969)
+
+
+
 Changes in synapse v0.19.2 (2017-02-20)
 =======================================
 
diff --git a/synapse/__init__.py b/synapse/__init__.py
index ff251ce597..787d621067 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.19.2"
+__version__ = "0.19.3-rc1"
-- 
cgit 1.4.1


From 254b7c5b156a08f79d0d79e5af0ab768fec6455e Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 13 Mar 2017 10:02:58 +0000
Subject: Bump changelog and versions

---
 CHANGES.rst         | 9 +++++++++
 synapse/__init__.py | 2 +-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/CHANGES.rst b/CHANGES.rst
index d1078fbc09..94fdc69185 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,12 @@
+Changes in synapse v0.19.3-rc2 (2017-03-13)
+===========================================
+
+Bug fixes:
+
+* Fix bug in handling of incoming device list updates over federation.
+
+
+
 Changes in synapse v0.19.3-rc1 (2017-03-08)
 ===========================================
 
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 787d621067..bcd2ed6440 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.19.3-rc1"
+__version__ = "0.19.3-rc2"
-- 
cgit 1.4.1


From 9adf1991cabde3cba837f49ae5f59dd7d36c300f Mon Sep 17 00:00:00 2001
From: Keyvan Fatehi <keyvanfatehi@gmail.com>
Date: Wed, 15 Mar 2017 03:21:45 +0000
Subject: Update README: specify python2.7 in virtualenv

Signed-off-by: Keyvan Fatehi <keyvanfatehi@gmail.com>
---
 README.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/README.rst b/README.rst
index a73c71c77e..afd4bbc4d0 100644
--- a/README.rst
+++ b/README.rst
@@ -808,7 +808,7 @@ directory of your choice::
 Synapse has a number of external dependencies, that are easiest
 to install using pip and a virtualenv::
 
-    virtualenv env
+    virtualenv -p python2.7 env
     source env/bin/activate
     python synapse/python_dependencies.py | xargs pip install
     pip install lxml mock
-- 
cgit 1.4.1


From be2024354967c6e23944968a39e04272969ae974 Mon Sep 17 00:00:00 2001
From: Stefan Majewsky <majewsky@gmx.net>
Date: Sat, 18 Mar 2017 17:16:12 +0100
Subject: README.md: fix link to client list on matrix.org/docs

---
 README.rst | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/README.rst b/README.rst
index 77e0b470a3..25ead6e5f9 100644
--- a/README.rst
+++ b/README.rst
@@ -20,7 +20,7 @@ The overall architecture is::
              https://somewhere.org/_matrix      https://elsewhere.net/_matrix
 
 ``#matrix:matrix.org`` is the official support room for Matrix, and can be
-accessed by any client from https://matrix.org/docs/projects/try-matrix-now or
+accessed by any client from https://matrix.org/docs/projects/try-matrix-now.html or
 via IRC bridge at irc://irc.freenode.net/matrix.
 
 Synapse is currently in rapid development, but as of version 0.5 we believe it
@@ -68,7 +68,7 @@ or mandatory service provider in Matrix, unlike WhatsApp, Facebook, Hangouts,
 etc.
 
 We'd like to invite you to join #matrix:matrix.org (via
-https://matrix.org/docs/projects/try-matrix-now), run a homeserver, take a look
+https://matrix.org/docs/projects/try-matrix-now.html), run a homeserver, take a look
 at the `Matrix spec <https://matrix.org/docs/spec>`_, and experiment with the
 `APIs <https://matrix.org/docs/api>`_ and `Client SDKs
 <http://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
@@ -321,7 +321,7 @@ Debian
 
 Matrix provides official Debian packages via apt from http://matrix.org/packages/debian/.
 Note that these packages do not include a client - choose one from
-https://matrix.org/docs/projects/try-matrix-now/ (or build your own with one of our SDKs :)
+https://matrix.org/docs/projects/try-matrix-now.html (or build your own with one of our SDKs :)
 
 Fedora
 ------
-- 
cgit 1.4.1


From 633dcc316c314fe60ca647fe71955f0cf5b75698 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Mon, 20 Mar 2017 16:37:14 +0000
Subject: Bump changelog and version

---
 CHANGES.rst         | 6 ++++++
 synapse/__init__.py | 2 +-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/CHANGES.rst b/CHANGES.rst
index 94fdc69185..2a46af52a8 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,9 @@
+Changes in synapse v0.19.3 (2017-03-20)
+=======================================
+
+No changes since v0.19.3-rc2
+
+
 Changes in synapse v0.19.3-rc2 (2017-03-13)
 ===========================================
 
diff --git a/synapse/__init__.py b/synapse/__init__.py
index bcd2ed6440..7628e7c505 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.19.3-rc2"
+__version__ = "0.19.3"
-- 
cgit 1.4.1


From 64778693be0c6be4c36e02fc58200ecfb27533a4 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Mon, 20 Mar 2017 15:34:35 +0000
Subject: fix up some key verif docstrings

---
 synapse/crypto/keyring.py | 23 +++++++++++++++++++++--
 synapse/storage/keys.py   |  5 +++--
 2 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 80f27f8c53..0a21392a62 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -238,8 +238,14 @@ class Keyring(object):
             d.addBoth(rm, server_name)
 
     def get_server_verify_keys(self, verify_requests):
-        """Takes a dict of KeyGroups and tries to find at least one key for
-        each group.
+        """Tries to find at least one key for each verify request
+
+        For each verify_request, verify_request.deferred is called back with
+        params (server_name, key_id, VerifyKey) if a key is found, or errbacked
+        with a SynapseError if none of the keys are found.
+
+        Args:
+            verify_requests (list[VerifyKeyRequest]): list of verify requests
         """
 
         # These are functions that produce keys given a list of key ids
@@ -252,8 +258,11 @@ class Keyring(object):
         @defer.inlineCallbacks
         def do_iterations():
             with Measure(self.clock, "get_server_verify_keys"):
+                # dict[str, dict[str, VerifyKey]]: results so far.
+                # map server_name -> key_id -> VerifyKey
                 merged_results = {}
 
+                # dict[str, set(str)]: keys to fetch for each server
                 missing_keys = {}
                 for verify_request in verify_requests:
                     missing_keys.setdefault(verify_request.server_name, set()).update(
@@ -315,6 +324,16 @@ class Keyring(object):
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
+        """
+
+        Args:
+            server_name_and_key_ids (list[(str, iterable[str])]):
+                list of (server_name, iterable[key_id]) tuples to fetch keys for
+
+        Returns:
+            Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
+                server_name -> key_id -> VerifyKey
+        """
         res = yield preserve_context_over_deferred(defer.gatherResults(
             [
                 preserve_fn(self.store.get_server_verify_keys)(
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 86b37b9ddd..3b5e0a4fb9 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -101,9 +101,10 @@ class KeyStore(SQLBaseStore):
         key_ids
         Args:
             server_name (str): The name of the server.
-            key_ids (list of str): List of key_ids to try and look up.
+            key_ids (iterable[str]): key_ids to try and look up.
         Returns:
-            (list of VerifyKey): The verification keys.
+            Deferred: resolves to dict[str, VerifyKey]: map from
+               key_id to verification key.
         """
         keys = {}
         for key_id in key_ids:
-- 
cgit 1.4.1


From 95f21c7a66b256e9e5330edd0f35a83b177c84a8 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Wed, 22 Mar 2017 13:54:20 +0000
Subject: Fix caching of remote servers' signature keys

The `@cached` decorator on `KeyStore._get_server_verify_key` was missing
its `num_args` parameter, which meant that it was returning the wrong key for
any server which had more than one recorded key.

By way of a fix, change the default for `num_args` to be *all* arguments. To
implement that, factor out a common base class for `CacheDescriptor` and `CacheListDescriptor`.
---
 synapse/util/caches/descriptors.py    | 135 ++++++++++++++++++----------------
 tests/storage/test_keys.py            |  53 +++++++++++++
 tests/util/caches/__init__.py         |  14 ++++
 tests/util/caches/test_descriptors.py |  86 ++++++++++++++++++++++
 4 files changed, 225 insertions(+), 63 deletions(-)
 create mode 100644 tests/storage/test_keys.py
 create mode 100644 tests/util/caches/__init__.py
 create mode 100644 tests/util/caches/test_descriptors.py

diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 998de70d29..19595df422 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -189,7 +189,55 @@ class Cache(object):
         self.cache.clear()
 
 
-class CacheDescriptor(object):
+class _CacheDescriptorBase(object):
+    def __init__(self, orig, num_args, inlineCallbacks, cache_context=False):
+        self.orig = orig
+
+        if inlineCallbacks:
+            self.function_to_call = defer.inlineCallbacks(orig)
+        else:
+            self.function_to_call = orig
+
+        arg_spec = inspect.getargspec(orig)
+        all_args = arg_spec.args
+
+        if "cache_context" in all_args:
+            if not cache_context:
+                raise ValueError(
+                    "Cannot have a 'cache_context' arg without setting"
+                    " cache_context=True"
+                )
+        elif cache_context:
+            raise ValueError(
+                "Cannot have cache_context=True without having an arg"
+                " named `cache_context`"
+            )
+
+        if num_args is None:
+            num_args = len(all_args) - 1
+            if cache_context:
+                num_args -= 1
+
+        if len(all_args) < num_args + 1:
+            raise Exception(
+                "Not enough explicit positional arguments to key off for %r: "
+                "got %i args, but wanted %i. (@cached cannot key off *args or "
+                "**kwargs)"
+                % (orig.__name__, len(all_args), num_args)
+            )
+
+        self.num_args = num_args
+        self.arg_names = all_args[1:num_args + 1]
+
+        if "cache_context" in self.arg_names:
+            raise Exception(
+                "cache_context arg cannot be included among the cache keys"
+            )
+
+        self.add_cache_context = cache_context
+
+
+class CacheDescriptor(_CacheDescriptorBase):
     """ A method decorator that applies a memoizing cache around the function.
 
     This caches deferreds, rather than the results themselves. Deferreds that
@@ -217,52 +265,24 @@ class CacheDescriptor(object):
             r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate)
             defer.returnValue(r1 + r2)
 
+    Args:
+        num_args (int): number of positional arguments (excluding ``self`` and
+            ``cache_context``) to use as cache keys. Defaults to all named
+            args of the function.
     """
-    def __init__(self, orig, max_entries=1000, num_args=1, tree=False,
+    def __init__(self, orig, max_entries=1000, num_args=None, tree=False,
                  inlineCallbacks=False, cache_context=False, iterable=False):
-        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
 
-        self.orig = orig
+        super(CacheDescriptor, self).__init__(
+            orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
+            cache_context=cache_context)
 
-        if inlineCallbacks:
-            self.function_to_call = defer.inlineCallbacks(orig)
-        else:
-            self.function_to_call = orig
+        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
 
         self.max_entries = max_entries
-        self.num_args = num_args
         self.tree = tree
-
         self.iterable = iterable
 
-        all_args = inspect.getargspec(orig)
-        self.arg_names = all_args.args[1:num_args + 1]
-
-        if "cache_context" in all_args.args:
-            if not cache_context:
-                raise ValueError(
-                    "Cannot have a 'cache_context' arg without setting"
-                    " cache_context=True"
-                )
-            try:
-                self.arg_names.remove("cache_context")
-            except ValueError:
-                pass
-        elif cache_context:
-            raise ValueError(
-                "Cannot have cache_context=True without having an arg"
-                " named `cache_context`"
-            )
-
-        self.add_cache_context = cache_context
-
-        if len(self.arg_names) < self.num_args:
-            raise Exception(
-                "Not enough explicit positional arguments to key off of for %r."
-                " (@cached cannot key off of *args or **kwargs)"
-                % (orig.__name__,)
-            )
-
     def __get__(self, obj, objtype=None):
         cache = Cache(
             name=self.orig.__name__,
@@ -338,48 +358,36 @@ class CacheDescriptor(object):
         return wrapped
 
 
-class CacheListDescriptor(object):
+class CacheListDescriptor(_CacheDescriptorBase):
     """Wraps an existing cache to support bulk fetching of keys.
 
     Given a list of keys it looks in the cache to find any hits, then passes
     the list of missing keys to the wrapped fucntion.
     """
 
-    def __init__(self, orig, cached_method_name, list_name, num_args=1,
+    def __init__(self, orig, cached_method_name, list_name, num_args=None,
                  inlineCallbacks=False):
         """
         Args:
             orig (function)
-            method_name (str); The name of the chached method.
+            cached_method_name (str): The name of the chached method.
             list_name (str): Name of the argument which is the bulk lookup list
-            num_args (int)
+            num_args (int): number of positional arguments (excluding ``self``,
+                but including list_name) to use as cache keys. Defaults to all
+                named args of the function.
             inlineCallbacks (bool): Whether orig is a generator that should
                 be wrapped by defer.inlineCallbacks
         """
-        self.orig = orig
+        super(CacheListDescriptor, self).__init__(
+            orig, num_args=num_args, inlineCallbacks=inlineCallbacks)
 
-        if inlineCallbacks:
-            self.function_to_call = defer.inlineCallbacks(orig)
-        else:
-            self.function_to_call = orig
-
-        self.num_args = num_args
         self.list_name = list_name
 
-        self.arg_names = inspect.getargspec(orig).args[1:num_args + 1]
         self.list_pos = self.arg_names.index(self.list_name)
-
         self.cached_method_name = cached_method_name
 
         self.sentinel = object()
 
-        if len(self.arg_names) < self.num_args:
-            raise Exception(
-                "Not enough explicit positional arguments to key off of for %r."
-                " (@cached cannot key off of *args or **kwars)"
-                % (orig.__name__,)
-            )
-
         if self.list_name not in self.arg_names:
             raise Exception(
                 "Couldn't see arguments %r for %r."
@@ -487,7 +495,7 @@ class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
         self.cache.invalidate(self.key)
 
 
-def cached(max_entries=1000, num_args=1, tree=False, cache_context=False,
+def cached(max_entries=1000, num_args=None, tree=False, cache_context=False,
            iterable=False):
     return lambda orig: CacheDescriptor(
         orig,
@@ -499,8 +507,8 @@ def cached(max_entries=1000, num_args=1, tree=False, cache_context=False,
     )
 
 
-def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False,
-                          iterable=False):
+def cachedInlineCallbacks(max_entries=1000, num_args=None, tree=False,
+                          cache_context=False, iterable=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
@@ -512,7 +520,7 @@ def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_contex
     )
 
 
-def cachedList(cached_method_name, list_name, num_args=1, inlineCallbacks=False):
+def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=False):
     """Creates a descriptor that wraps a function in a `CacheListDescriptor`.
 
     Used to do batch lookups for an already created cache. A single argument
@@ -525,7 +533,8 @@ def cachedList(cached_method_name, list_name, num_args=1, inlineCallbacks=False)
         cache (Cache): The underlying cache to use.
         list_name (str): The name of the argument that is the list to use to
             do batch lookups in the cache.
-        num_args (int): Number of arguments to use as the key in the cache.
+        num_args (int): Number of arguments to use as the key in the cache
+            (including list_name). Defaults to all named parameters.
         inlineCallbacks (bool): Should the function be wrapped in an
             `defer.inlineCallbacks`?
 
diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py
new file mode 100644
index 0000000000..0be790d8f8
--- /dev/null
+++ b/tests/storage/test_keys.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import signedjson.key
+from twisted.internet import defer
+
+import tests.unittest
+import tests.utils
+
+
+class KeyStoreTestCase(tests.unittest.TestCase):
+    def __init__(self, *args, **kwargs):
+        super(KeyStoreTestCase, self).__init__(*args, **kwargs)
+        self.store = None  # type: synapse.storage.keys.KeyStore
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        hs = yield tests.utils.setup_test_homeserver()
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def test_get_server_verify_keys(self):
+        key1 = signedjson.key.decode_verify_key_base64(
+            "ed25519", "key1", "fP5l4JzpZPq/zdbBg5xx6lQGAAOM9/3w94cqiJ5jPrw"
+        )
+        key2 = signedjson.key.decode_verify_key_base64(
+            "ed25519", "key2", "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+        )
+        yield self.store.store_server_verify_key(
+            "server1", "from_server", 0, key1
+        )
+        yield self.store.store_server_verify_key(
+            "server1", "from_server", 0, key2
+        )
+
+        res = yield self.store.get_server_verify_keys(
+            "server1", ["ed25519:key1", "ed25519:key2", "ed25519:key3"])
+
+        self.assertEqual(len(res.keys()), 2)
+        self.assertEqual(res["ed25519:key1"].version, "key1")
+        self.assertEqual(res["ed25519:key2"].version, "key2")
diff --git a/tests/util/caches/__init__.py b/tests/util/caches/__init__.py
new file mode 100644
index 0000000000..451dae3b6c
--- /dev/null
+++ b/tests/util/caches/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py
new file mode 100644
index 0000000000..419281054d
--- /dev/null
+++ b/tests/util/caches/test_descriptors.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import mock
+from twisted.internet import defer
+from synapse.util.caches import descriptors
+from tests import unittest
+
+
+class DescriptorTestCase(unittest.TestCase):
+    @defer.inlineCallbacks
+    def test_cache(self):
+        class Cls(object):
+            def __init__(self):
+                self.mock = mock.Mock()
+
+            @descriptors.cached()
+            def fn(self, arg1, arg2):
+                return self.mock(arg1, arg2)
+
+        obj = Cls()
+
+        obj.mock.return_value = 'fish'
+        r = yield obj.fn(1, 2)
+        self.assertEqual(r, 'fish')
+        obj.mock.assert_called_once_with(1, 2)
+        obj.mock.reset_mock()
+
+        # a call with different params should call the mock again
+        obj.mock.return_value = 'chips'
+        r = yield obj.fn(1, 3)
+        self.assertEqual(r, 'chips')
+        obj.mock.assert_called_once_with(1, 3)
+        obj.mock.reset_mock()
+
+        # the two values should now be cached
+        r = yield obj.fn(1, 2)
+        self.assertEqual(r, 'fish')
+        r = yield obj.fn(1, 3)
+        self.assertEqual(r, 'chips')
+        obj.mock.assert_not_called()
+
+    @defer.inlineCallbacks
+    def test_cache_num_args(self):
+        """Only the first num_args arguments should matter to the cache"""
+
+        class Cls(object):
+            def __init__(self):
+                self.mock = mock.Mock()
+
+            @descriptors.cached(num_args=1)
+            def fn(self, arg1, arg2):
+                return self.mock(arg1, arg2)
+
+        obj = Cls()
+        obj.mock.return_value = 'fish'
+        r = yield obj.fn(1, 2)
+        self.assertEqual(r, 'fish')
+        obj.mock.assert_called_once_with(1, 2)
+        obj.mock.reset_mock()
+
+        # a call with different params should call the mock again
+        obj.mock.return_value = 'chips'
+        r = yield obj.fn(2, 3)
+        self.assertEqual(r, 'chips')
+        obj.mock.assert_called_once_with(2, 3)
+        obj.mock.reset_mock()
+
+        # the two values should now be cached; we should be able to vary
+        # the second argument and still get the cached result.
+        r = yield obj.fn(1, 4)
+        self.assertEqual(r, 'fish')
+        r = yield obj.fn(2, 5)
+        self.assertEqual(r, 'chips')
+        obj.mock.assert_not_called()
-- 
cgit 1.4.1


From e08f81d96a421df97c101461e19ec766ea606fb7 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 00:15:58 +0000
Subject: Add a missing yield in device key upload

(this would only very very rarely actually be a useful thing, so the main
problem was the logcontext leak...)
---
 synapse/handlers/e2e_keys.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index e40495d1ab..c02d41a74c 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -316,7 +316,7 @@ class E2eKeysHandler(object):
         # old access_token without an associated device_id. Either way, we
         # need to double-check the device is registered to avoid ending up with
         # keys without a corresponding device.
-        self.device_handler.check_device_registered(user_id, device_id)
+        yield self.device_handler.check_device_registered(user_id, device_id)
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
-- 
cgit 1.4.1


From 19b9366d73585e815e59e7a6d07d6e307af5e0fb Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 00:17:46 +0000
Subject: Fix a couple of logcontext leaks

Use preserve_fn to correctly manage the logcontexts around things we don't want
to yield on.
---
 synapse/api/auth.py        | 5 ++---
 synapse/util/retryutils.py | 5 +++--
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 03a215ab1b..9dbc7993df 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -23,7 +23,7 @@ from synapse import event_auth
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import AuthError, Codes
 from synapse.types import UserID
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util import logcontext
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -209,8 +209,7 @@ class Auth(object):
                 default=[""]
             )[0]
             if user and access_token and ip_addr:
-                preserve_context_over_fn(
-                    self.store.insert_client_ip,
+                logcontext.preserve_fn(self.store.insert_client_ip)(
                     user=user,
                     access_token=access_token,
                     ip=ip_addr,
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 153ef001ad..b68e8c4e9f 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import synapse.util.logcontext
 from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException
@@ -173,4 +173,5 @@ class RetryDestinationLimiter(object):
                     "Failed to store set_destination_retry_timings",
                 )
 
-        store_retry_timings()
+        # we deliberately do this in the background.
+        synapse.util.logcontext.preserve_fn(store_retry_timings)()
-- 
cgit 1.4.1


From ad8a26e361136fb1637503f42e8f3cee49b25710 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 00:27:04 +0000
Subject: MatrixFederationHttpClient: clean up

rename _create_request to _request, and push ascii-encoding of `destination`
and `path` down into it
---
 synapse/http/matrixfederationclient.py | 41 ++++++++++++++++++++--------------
 1 file changed, 24 insertions(+), 17 deletions(-)

diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 82586e3dea..f15903f862 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -103,11 +103,15 @@ class MatrixFederationHttpClient(object):
         )
 
     @defer.inlineCallbacks
-    def _create_request(self, destination, method, path_bytes,
-                        body_callback, headers_dict={}, param_bytes=b"",
-                        query_bytes=b"", retry_on_dns_fail=True,
-                        timeout=None, long_retries=False):
-        """ Creates and sends a request to the given url
+    def _request(self, destination, method, path,
+                 body_callback, headers_dict={}, param_bytes=b"",
+                 query_bytes=b"", retry_on_dns_fail=True,
+                 timeout=None, long_retries=False):
+        """ Creates and sends a request to the given server
+        Args:
+            destination (str): The remote server to send the HTTP request to.
+            method (str): HTTP method
+            path (str): The HTTP path
 
         Returns:
             Deferred: resolves with the http response object on success.
@@ -115,6 +119,9 @@ class MatrixFederationHttpClient(object):
             Fails with ``HTTPRequestException``: if we get an HTTP response
             code >= 300.
         """
+        destination = destination.encode("ascii")
+        path_bytes = path.encode("ascii")
+
         headers_dict[b"User-Agent"] = [self.version_string]
         headers_dict[b"Host"] = [destination]
 
@@ -288,10 +295,10 @@ class MatrixFederationHttpClient(object):
             producer = _JsonProducer(json_data)
             return producer
 
-        response = yield self._create_request(
-            destination.encode("ascii"),
+        response = yield self._request(
+            destination,
             "PUT",
-            path.encode("ascii"),
+            path,
             body_callback=body_callback,
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
@@ -333,10 +340,10 @@ class MatrixFederationHttpClient(object):
             )
             return _JsonProducer(data)
 
-        response = yield self._create_request(
-            destination.encode("ascii"),
+        response = yield self._request(
+            destination,
             "POST",
-            path.encode("ascii"),
+            path,
             body_callback=body_callback,
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
@@ -386,10 +393,10 @@ class MatrixFederationHttpClient(object):
             self.sign_request(destination, method, url_bytes, headers_dict)
             return None
 
-        response = yield self._create_request(
-            destination.encode("ascii"),
+        response = yield self._request(
+            destination,
             "GET",
-            path.encode("ascii"),
+            path,
             query_bytes=query_bytes,
             body_callback=body_callback,
             retry_on_dns_fail=retry_on_dns_fail,
@@ -434,10 +441,10 @@ class MatrixFederationHttpClient(object):
             self.sign_request(destination, method, url_bytes, headers_dict)
             return None
 
-        response = yield self._create_request(
-            destination.encode("ascii"),
+        response = yield self._request(
+            destination,
             "GET",
-            path.encode("ascii"),
+            path,
             query_bytes=query_bytes,
             body_callback=body_callback,
             retry_on_dns_fail=retry_on_dns_fail
-- 
cgit 1.4.1


From 4bd597d9fcb8e6c6888ee3e8fa683ba812272997 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 00:12:21 +0000
Subject: push federation retry limiter down to matrixfederationclient

rather than having to instrument everywhere we make a federation call,
make the MatrixFederationHttpClient manage the retry limiter.
---
 synapse/crypto/keyring.py               |  39 +++---
 synapse/federation/federation_client.py |  33 ++---
 synapse/federation/transaction_queue.py | 216 +++++++++++++-----------------
 synapse/federation/transport/client.py  |   1 +
 synapse/handlers/e2e_keys.py            |  32 ++---
 synapse/http/matrixfederationclient.py  | 228 ++++++++++++++++++--------------
 synapse/util/retryutils.py              |  16 ++-
 tests/handlers/test_typing.py           |   2 +
 8 files changed, 280 insertions(+), 287 deletions(-)

diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 80f27f8c53..c4bc4f4d31 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -15,7 +15,6 @@
 
 from synapse.crypto.keyclient import fetch_server_key
 from synapse.api.errors import SynapseError, Codes
-from synapse.util.retryutils import get_retry_limiter
 from synapse.util import unwrapFirstError
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import (
@@ -363,30 +362,24 @@ class Keyring(object):
     def get_keys_from_server(self, server_name_and_key_ids):
         @defer.inlineCallbacks
         def get_key(server_name, key_ids):
-            limiter = yield get_retry_limiter(
-                server_name,
-                self.clock,
-                self.store,
-            )
-            with limiter:
-                keys = None
-                try:
-                    keys = yield self.get_server_verify_key_v2_direct(
-                        server_name, key_ids
-                    )
-                except Exception as e:
-                    logger.info(
-                        "Unable to get key %r for %r directly: %s %s",
-                        key_ids, server_name,
-                        type(e).__name__, str(e.message),
-                    )
+            keys = None
+            try:
+                keys = yield self.get_server_verify_key_v2_direct(
+                    server_name, key_ids
+                )
+            except Exception as e:
+                logger.info(
+                    "Unable to get key %r for %r directly: %s %s",
+                    key_ids, server_name,
+                    type(e).__name__, str(e.message),
+                )
 
-                if not keys:
-                    keys = yield self.get_server_verify_key_v1_direct(
-                        server_name, key_ids
-                    )
+            if not keys:
+                keys = yield self.get_server_verify_key_v1_direct(
+                    server_name, key_ids
+                )
 
-                    keys = {server_name: keys}
+                keys = {server_name: keys}
 
             defer.returnValue(keys)
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5dcd4eecce..dc44727b36 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.events import FrozenEvent, builder
 import synapse.metrics
 
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
 
 import copy
 import itertools
@@ -234,31 +234,24 @@ class FederationClient(FederationBase):
                 continue
 
             try:
-                limiter = yield get_retry_limiter(
-                    destination,
-                    self._clock,
-                    self.store,
+                transaction_data = yield self.transport_layer.get_event(
+                    destination, event_id, timeout=timeout,
                 )
 
-                with limiter:
-                    transaction_data = yield self.transport_layer.get_event(
-                        destination, event_id, timeout=timeout,
-                    )
-
-                    logger.debug("transaction_data %r", transaction_data)
+                logger.debug("transaction_data %r", transaction_data)
 
-                    pdu_list = [
-                        self.event_from_pdu_json(p, outlier=outlier)
-                        for p in transaction_data["pdus"]
-                    ]
+                pdu_list = [
+                    self.event_from_pdu_json(p, outlier=outlier)
+                    for p in transaction_data["pdus"]
+                ]
 
-                    if pdu_list and pdu_list[0]:
-                        pdu = pdu_list[0]
+                if pdu_list and pdu_list[0]:
+                    pdu = pdu_list[0]
 
-                        # Check signatures are correct.
-                        signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
+                    # Check signatures are correct.
+                    signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
 
-                        break
+                    break
 
                 pdu_attempts[destination] = now
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c802dd67a3..d7ecefcc64 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import datetime
 
 from twisted.internet import defer
 
@@ -22,9 +22,7 @@ from .units import Transaction, Edu
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
 from synapse.util.logcontext import preserve_context_over_fn
-from synapse.util.retryutils import (
-    get_retry_limiter, NotRetryingDestination,
-)
+from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id
 from synapse.handlers.presence import format_user_presence_state
@@ -312,13 +310,6 @@ class TransactionQueue(object):
             yield run_on_reactor()
 
             while True:
-                limiter = yield get_retry_limiter(
-                    destination,
-                    self.clock,
-                    self.store,
-                    backoff_on_404=True,  # If we get a 404 the other side has gone
-                )
-
                 device_message_edus, device_stream_id, dev_list_id = (
                     yield self._get_new_device_messages(destination)
                 )
@@ -374,7 +365,6 @@ class TransactionQueue(object):
 
                 success = yield self._send_new_transaction(
                     destination, pending_pdus, pending_edus, pending_failures,
-                    limiter=limiter,
                 )
                 if success:
                     # Remove the acknowledged device messages from the database
@@ -392,12 +382,24 @@ class TransactionQueue(object):
                     self.last_device_list_stream_id_by_dest[destination] = dev_list_id
                 else:
                     break
-        except NotRetryingDestination:
+        except NotRetryingDestination as e:
             logger.debug(
-                "TX [%s] not ready for retry yet - "
+                "TX [%s] not ready for retry yet (next retry at %s) - "
                 "dropping transaction for now",
                 destination,
+                datetime.datetime.fromtimestamp(
+                    (e.retry_last_ts + e.retry_interval) / 1000.0
+                ),
+            )
+        except Exception as e:
+            logger.warn(
+                "TX [%s] Failed to send transaction: %s",
+                destination,
+                e,
             )
+            for p in pending_pdus:
+                logger.info("Failed to send event %s to %s", p.event_id,
+                            destination)
         finally:
             # We want to be *very* sure we delete this after we stop processing
             self.pending_transactions.pop(destination, None)
@@ -437,7 +439,7 @@ class TransactionQueue(object):
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures, limiter):
+                              pending_failures):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
@@ -447,132 +449,104 @@ class TransactionQueue(object):
 
         success = True
 
-        try:
-            logger.debug("TX [%s] _attempt_new_transaction", destination)
+        logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-            txn_id = str(self._next_txn_id)
+        txn_id = str(self._next_txn_id)
 
-            logger.debug(
-                "TX [%s] {%s} Attempting new transaction"
-                " (pdus: %d, edus: %d, failures: %d)",
-                destination, txn_id,
-                len(pdus),
-                len(edus),
-                len(failures)
-            )
+        logger.debug(
+            "TX [%s] {%s} Attempting new transaction"
+            " (pdus: %d, edus: %d, failures: %d)",
+            destination, txn_id,
+            len(pdus),
+            len(edus),
+            len(failures)
+        )
 
-            logger.debug("TX [%s] Persisting transaction...", destination)
+        logger.debug("TX [%s] Persisting transaction...", destination)
 
-            transaction = Transaction.create_new(
-                origin_server_ts=int(self.clock.time_msec()),
-                transaction_id=txn_id,
-                origin=self.server_name,
-                destination=destination,
-                pdus=pdus,
-                edus=edus,
-                pdu_failures=failures,
-            )
+        transaction = Transaction.create_new(
+            origin_server_ts=int(self.clock.time_msec()),
+            transaction_id=txn_id,
+            origin=self.server_name,
+            destination=destination,
+            pdus=pdus,
+            edus=edus,
+            pdu_failures=failures,
+        )
 
-            self._next_txn_id += 1
+        self._next_txn_id += 1
 
-            yield self.transaction_actions.prepare_to_send(transaction)
+        yield self.transaction_actions.prepare_to_send(transaction)
 
-            logger.debug("TX [%s] Persisted transaction", destination)
-            logger.info(
-                "TX [%s] {%s} Sending transaction [%s],"
-                " (PDUs: %d, EDUs: %d, failures: %d)",
-                destination, txn_id,
-                transaction.transaction_id,
-                len(pdus),
-                len(edus),
-                len(failures),
-            )
+        logger.debug("TX [%s] Persisted transaction", destination)
+        logger.info(
+            "TX [%s] {%s} Sending transaction [%s],"
+            " (PDUs: %d, EDUs: %d, failures: %d)",
+            destination, txn_id,
+            transaction.transaction_id,
+            len(pdus),
+            len(edus),
+            len(failures),
+        )
 
-            with limiter:
-                # Actually send the transaction
-
-                # FIXME (erikj): This is a bit of a hack to make the Pdu age
-                # keys work
-                def json_data_cb():
-                    data = transaction.get_dict()
-                    now = int(self.clock.time_msec())
-                    if "pdus" in data:
-                        for p in data["pdus"]:
-                            if "age_ts" in p:
-                                unsigned = p.setdefault("unsigned", {})
-                                unsigned["age"] = now - int(p["age_ts"])
-                                del p["age_ts"]
-                    return data
-
-                try:
-                    response = yield self.transport_layer.send_transaction(
-                        transaction, json_data_cb
-                    )
-                    code = 200
-
-                    if response:
-                        for e_id, r in response.get("pdus", {}).items():
-                            if "error" in r:
-                                logger.warn(
-                                    "Transaction returned error for %s: %s",
-                                    e_id, r,
-                                )
-                except HttpResponseException as e:
-                    code = e.code
-                    response = e.response
-
-                    if e.code in (401, 404, 429) or 500 <= e.code:
-                        logger.info(
-                            "TX [%s] {%s} got %d response",
-                            destination, txn_id, code
+        # Actually send the transaction
+
+        # FIXME (erikj): This is a bit of a hack to make the Pdu age
+        # keys work
+        def json_data_cb():
+            data = transaction.get_dict()
+            now = int(self.clock.time_msec())
+            if "pdus" in data:
+                for p in data["pdus"]:
+                    if "age_ts" in p:
+                        unsigned = p.setdefault("unsigned", {})
+                        unsigned["age"] = now - int(p["age_ts"])
+                        del p["age_ts"]
+            return data
+
+        try:
+            response = yield self.transport_layer.send_transaction(
+                transaction, json_data_cb
+            )
+            code = 200
+
+            if response:
+                for e_id, r in response.get("pdus", {}).items():
+                    if "error" in r:
+                        logger.warn(
+                            "Transaction returned error for %s: %s",
+                            e_id, r,
                         )
-                        raise e
+        except HttpResponseException as e:
+            code = e.code
+            response = e.response
 
+            if e.code in (401, 404, 429) or 500 <= e.code:
                 logger.info(
                     "TX [%s] {%s} got %d response",
                     destination, txn_id, code
                 )
+                raise e
 
-                logger.debug("TX [%s] Sent transaction", destination)
-                logger.debug("TX [%s] Marking as delivered...", destination)
-
-            yield self.transaction_actions.delivered(
-                transaction, code, response
-            )
+        logger.info(
+            "TX [%s] {%s} got %d response",
+            destination, txn_id, code
+        )
 
-            logger.debug("TX [%s] Marked as delivered", destination)
+        logger.debug("TX [%s] Sent transaction", destination)
+        logger.debug("TX [%s] Marking as delivered...", destination)
 
-            if code != 200:
-                for p in pdus:
-                    logger.info(
-                        "Failed to send event %s to %s", p.event_id, destination
-                    )
-                success = False
-        except RuntimeError as e:
-            # We capture this here as there as nothing actually listens
-            # for this finishing functions deferred.
-            logger.warn(
-                "TX [%s] Problem in _attempt_transaction: %s",
-                destination,
-                e,
-            )
+        yield self.transaction_actions.delivered(
+            transaction, code, response
+        )
 
-            success = False
+        logger.debug("TX [%s] Marked as delivered", destination)
 
+        if code != 200:
             for p in pdus:
-                logger.info("Failed to send event %s to %s", p.event_id, destination)
-        except Exception as e:
-            # We capture this here as there as nothing actually listens
-            # for this finishing functions deferred.
-            logger.warn(
-                "TX [%s] Problem in _attempt_transaction: %s",
-                destination,
-                e,
-            )
-
+                logger.info(
+                    "Failed to send event %s to %s", p.event_id, destination
+                )
             success = False
 
-            for p in pdus:
-                logger.info("Failed to send event %s to %s", p.event_id, destination)
-
         defer.returnValue(success)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index f49e8a2cc4..cc9bc7f14b 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -163,6 +163,7 @@ class TransportLayerClient(object):
             data=json_data,
             json_data_callback=json_data_callback,
             long_retries=True,
+            backoff_on_404=True,  # If we get a 404 the other side has gone
         )
 
         logger.debug(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index e40495d1ab..a33135de67 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, CodeMessageException
 from synapse.types import get_domain_from_id
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
 
@@ -121,15 +121,11 @@ class E2eKeysHandler(object):
         def do_remote_query(destination):
             destination_query = remote_queries_not_in_cache[destination]
             try:
-                limiter = yield get_retry_limiter(
-                    destination, self.clock, self.store
+                remote_result = yield self.federation.query_client_keys(
+                    destination,
+                    {"device_keys": destination_query},
+                    timeout=timeout
                 )
-                with limiter:
-                    remote_result = yield self.federation.query_client_keys(
-                        destination,
-                        {"device_keys": destination_query},
-                        timeout=timeout
-                    )
 
                 for user_id, keys in remote_result["device_keys"].items():
                     if user_id in destination_query:
@@ -239,18 +235,14 @@ class E2eKeysHandler(object):
         def claim_client_keys(destination):
             device_keys = remote_queries[destination]
             try:
-                limiter = yield get_retry_limiter(
-                    destination, self.clock, self.store
+                remote_result = yield self.federation.claim_client_keys(
+                    destination,
+                    {"one_time_keys": device_keys},
+                    timeout=timeout
                 )
-                with limiter:
-                    remote_result = yield self.federation.claim_client_keys(
-                        destination,
-                        {"one_time_keys": device_keys},
-                        timeout=timeout
-                    )
-                    for user_id, keys in remote_result["one_time_keys"].items():
-                        if user_id in device_keys:
-                            json_result[user_id] = keys
+                for user_id, keys in remote_result["one_time_keys"].items():
+                    if user_id in device_keys:
+                        json_result[user_id] = keys
             except CodeMessageException as e:
                 failures[destination] = {
                     "status": e.code, "message": e.message
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index f15903f862..b0885dc979 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+import synapse.util.retryutils
 from twisted.internet import defer, reactor, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web.client import readBody, HTTPConnectionPool, Agent
@@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object):
             reactor, MatrixFederationEndpointFactory(hs), pool=pool
         )
         self.clock = hs.get_clock()
+        self._store = hs.get_datastore()
         self.version_string = hs.version_string
         self._next_id = 1
 
@@ -106,133 +106,143 @@ class MatrixFederationHttpClient(object):
     def _request(self, destination, method, path,
                  body_callback, headers_dict={}, param_bytes=b"",
                  query_bytes=b"", retry_on_dns_fail=True,
-                 timeout=None, long_retries=False):
+                 timeout=None, long_retries=False, backoff_on_404=False):
         """ Creates and sends a request to the given server
         Args:
             destination (str): The remote server to send the HTTP request to.
             method (str): HTTP method
             path (str): The HTTP path
+            backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
             Deferred: resolves with the http response object on success.
 
             Fails with ``HTTPRequestException``: if we get an HTTP response
-            code >= 300.
+                code >= 300.
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+                to retry this server.
         """
+        limiter = yield synapse.util.retryutils.get_retry_limiter(
+            destination,
+            self.clock,
+            self._store,
+            backoff_on_404=backoff_on_404,
+        )
+
         destination = destination.encode("ascii")
         path_bytes = path.encode("ascii")
+        with limiter:
+            headers_dict[b"User-Agent"] = [self.version_string]
+            headers_dict[b"Host"] = [destination]
 
-        headers_dict[b"User-Agent"] = [self.version_string]
-        headers_dict[b"Host"] = [destination]
+            url_bytes = self._create_url(
+                destination, path_bytes, param_bytes, query_bytes
+            )
 
-        url_bytes = self._create_url(
-            destination, path_bytes, param_bytes, query_bytes
-        )
+            txn_id = "%s-O-%s" % (method, self._next_id)
+            self._next_id = (self._next_id + 1) % (sys.maxint - 1)
 
-        txn_id = "%s-O-%s" % (method, self._next_id)
-        self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+            outbound_logger.info(
+                "{%s} [%s] Sending request: %s %s",
+                txn_id, destination, method, url_bytes
+            )
 
-        outbound_logger.info(
-            "{%s} [%s] Sending request: %s %s",
-            txn_id, destination, method, url_bytes
-        )
+            # XXX: Would be much nicer to retry only at the transaction-layer
+            # (once we have reliable transactions in place)
+            if long_retries:
+                retries_left = MAX_LONG_RETRIES
+            else:
+                retries_left = MAX_SHORT_RETRIES
 
-        # XXX: Would be much nicer to retry only at the transaction-layer
-        # (once we have reliable transactions in place)
-        if long_retries:
-            retries_left = MAX_LONG_RETRIES
-        else:
-            retries_left = MAX_SHORT_RETRIES
+            http_url_bytes = urlparse.urlunparse(
+                ("", "", path_bytes, param_bytes, query_bytes, "")
+            )
 
-        http_url_bytes = urlparse.urlunparse(
-            ("", "", path_bytes, param_bytes, query_bytes, "")
-        )
+            log_result = None
+            try:
+                while True:
+                    producer = None
+                    if body_callback:
+                        producer = body_callback(method, http_url_bytes, headers_dict)
+
+                    try:
+                        def send_request():
+                            request_deferred = preserve_context_over_fn(
+                                self.agent.request,
+                                method,
+                                url_bytes,
+                                Headers(headers_dict),
+                                producer
+                            )
+
+                            return self.clock.time_bound_deferred(
+                                request_deferred,
+                                time_out=timeout / 1000. if timeout else 60,
+                            )
+
+                        response = yield preserve_context_over_fn(send_request)
+
+                        log_result = "%d %s" % (response.code, response.phrase,)
+                        break
+                    except Exception as e:
+                        if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                            logger.warn(
+                                "DNS Lookup failed to %s with %s",
+                                destination,
+                                e
+                            )
+                            log_result = "DNS Lookup failed to %s with %s" % (
+                                destination, e
+                            )
+                            raise
 
-        log_result = None
-        try:
-            while True:
-                producer = None
-                if body_callback:
-                    producer = body_callback(method, http_url_bytes, headers_dict)
-
-                try:
-                    def send_request():
-                        request_deferred = preserve_context_over_fn(
-                            self.agent.request,
+                        logger.warn(
+                            "{%s} Sending request failed to %s: %s %s: %s - %s",
+                            txn_id,
+                            destination,
                             method,
                             url_bytes,
-                            Headers(headers_dict),
-                            producer
+                            type(e).__name__,
+                            _flatten_response_never_received(e),
                         )
 
-                        return self.clock.time_bound_deferred(
-                            request_deferred,
-                            time_out=timeout / 1000. if timeout else 60,
+                        log_result = "%s - %s" % (
+                            type(e).__name__, _flatten_response_never_received(e),
                         )
 
-                    response = yield preserve_context_over_fn(send_request)
-
-                    log_result = "%d %s" % (response.code, response.phrase,)
-                    break
-                except Exception as e:
-                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                        logger.warn(
-                            "DNS Lookup failed to %s with %s",
-                            destination,
-                            e
-                        )
-                        log_result = "DNS Lookup failed to %s with %s" % (
-                            destination, e
-                        )
-                        raise
-
-                    logger.warn(
-                        "{%s} Sending request failed to %s: %s %s: %s - %s",
-                        txn_id,
-                        destination,
-                        method,
-                        url_bytes,
-                        type(e).__name__,
-                        _flatten_response_never_received(e),
-                    )
-
-                    log_result = "%s - %s" % (
-                        type(e).__name__, _flatten_response_never_received(e),
-                    )
-
-                    if retries_left and not timeout:
-                        if long_retries:
-                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
-                            delay = min(delay, 60)
-                            delay *= random.uniform(0.8, 1.4)
+                        if retries_left and not timeout:
+                            if long_retries:
+                                delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+                                delay = min(delay, 60)
+                                delay *= random.uniform(0.8, 1.4)
+                            else:
+                                delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+                                delay = min(delay, 2)
+                                delay *= random.uniform(0.8, 1.4)
+
+                            yield sleep(delay)
+                            retries_left -= 1
                         else:
-                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
-                            delay = min(delay, 2)
-                            delay *= random.uniform(0.8, 1.4)
-
-                        yield sleep(delay)
-                        retries_left -= 1
-                    else:
-                        raise
-        finally:
-            outbound_logger.info(
-                "{%s} [%s] Result: %s",
-                txn_id,
-                destination,
-                log_result,
-            )
+                            raise
+            finally:
+                outbound_logger.info(
+                    "{%s} [%s] Result: %s",
+                    txn_id,
+                    destination,
+                    log_result,
+                )
 
-        if 200 <= response.code < 300:
-            pass
-        else:
-            # :'(
-            # Update transactions table?
-            body = yield preserve_context_over_fn(readBody, response)
-            raise HttpResponseException(
-                response.code, response.phrase, body
-            )
+            if 200 <= response.code < 300:
+                pass
+            else:
+                # :'(
+                # Update transactions table?
+                body = yield preserve_context_over_fn(readBody, response)
+                raise HttpResponseException(
+                    response.code, response.phrase, body
+                )
 
-        defer.returnValue(response)
+            defer.returnValue(response)
 
     def sign_request(self, destination, method, url_bytes, headers_dict,
                      content=None):
@@ -261,7 +271,7 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def put_json(self, destination, path, data={}, json_data_callback=None,
-                 long_retries=False, timeout=None):
+                 long_retries=False, timeout=None, backoff_on_404=False):
         """ Sends the specifed json data using PUT
 
         Args:
@@ -276,11 +286,17 @@ class MatrixFederationHttpClient(object):
                 retry for a short or long time.
             timeout(int): How long to try (in ms) the destination for before
                 giving up. None indicates no timeout.
+            backoff_on_404 (bool): True if we should count a 404 response as
+                a failure of the server (and should therefore back off future
+                requests)
 
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body. On a 4xx or 5xx error response a
             CodeMessageException is raised.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         if not json_data_callback:
@@ -303,6 +319,7 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
             timeout=timeout,
+            backoff_on_404=backoff_on_404,
         )
 
         if 200 <= response.code < 300:
@@ -332,6 +349,9 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body. On a 4xx or 5xx error response a
             CodeMessageException is raised.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         def body_callback(method, url_bytes, headers_dict):
@@ -377,6 +397,9 @@ class MatrixFederationHttpClient(object):
 
             The result of the deferred is a tuple of `(code, response)`,
             where `response` is a dict representing the decoded JSON body.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
         logger.debug("get_json args: %s", args)
 
@@ -426,6 +449,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``HTTPRequestException`` if we get an HTTP response code
             >= 300
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         encoded_args = {}
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 153ef001ad..7e5a952584 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -124,7 +124,13 @@ class RetryDestinationLimiter(object):
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         valid_err_code = False
-        if exc_type is not None and issubclass(exc_type, CodeMessageException):
+        if exc_type is None:
+            valid_err_code = True
+        elif not issubclass(exc_type, Exception):
+            # avoid treating exceptions which don't derive from Exception as
+            # failures; this is mostly so as not to catch defer._DefGen.
+            valid_err_code = True
+        elif issubclass(exc_type, CodeMessageException):
             # Some error codes are perfectly fine for some APIs, whereas other
             # APIs may expect to never received e.g. a 404. It's important to
             # handle 404 as some remote servers will return a 404 when the HS
@@ -142,11 +148,13 @@ class RetryDestinationLimiter(object):
             else:
                 valid_err_code = False
 
-        if exc_type is None or valid_err_code:
+        if valid_err_code:
             # We connected successfully.
             if not self.retry_interval:
                 return
 
+            logger.debug("Connection to %s was successful; clearing backoff",
+                         self.destination)
             retry_last_ts = 0
             self.retry_interval = 0
         else:
@@ -160,6 +168,10 @@ class RetryDestinationLimiter(object):
             else:
                 self.retry_interval = self.min_retry_interval
 
+            logger.debug(
+                "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
+                self.destination, exc_type, exc_val, self.retry_interval
+            )
             retry_last_ts = int(self.clock.time_msec())
 
         @defer.inlineCallbacks
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index f88d2be7c5..dbe50383da 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -192,6 +192,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
                 ),
                 json_data_callback=ANY,
                 long_retries=True,
+                backoff_on_404=True,
             ),
             defer.succeed((200, "OK"))
         )
@@ -263,6 +264,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
                 ),
                 json_data_callback=ANY,
                 long_retries=True,
+                backoff_on_404=True,
             ),
             defer.succeed((200, "OK"))
         )
-- 
cgit 1.4.1


From b88a323ffbe165b8639a2ef5f5791ed65cde3e4b Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 12:03:46 +0000
Subject: Fix time_bound_deferred to throw the right exception

Due to a failure to instantiate DeferredTimedOutError, time_bound_deferred
would throw a CancelledError when the deferred timed out, which was rather
confusing.
---
 synapse/util/__init__.py | 10 ++++++----
 tests/util/test_clock.py | 33 +++++++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+), 4 deletions(-)
 create mode 100644 tests/util/test_clock.py

diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 30fc480108..98a5a26ac5 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
 
 class DeferredTimedOutError(SynapseError):
     def __init__(self):
-        super(SynapseError).__init__(504, "Timed out")
+        super(SynapseError, self).__init__(504, "Timed out")
 
 
 def unwrapFirstError(failure):
@@ -93,8 +93,10 @@ class Clock(object):
         ret_deferred = defer.Deferred()
 
         def timed_out_fn():
+            e = DeferredTimedOutError()
+
             try:
-                ret_deferred.errback(DeferredTimedOutError())
+                ret_deferred.errback(e)
             except:
                 pass
 
@@ -114,7 +116,7 @@ class Clock(object):
 
         ret_deferred.addBoth(cancel)
 
-        def sucess(res):
+        def success(res):
             try:
                 ret_deferred.callback(res)
             except:
@@ -128,7 +130,7 @@ class Clock(object):
             except:
                 pass
 
-        given_deferred.addCallbacks(callback=sucess, errback=err)
+        given_deferred.addCallbacks(callback=success, errback=err)
 
         timer = self.call_later(time_out, timed_out_fn)
 
diff --git a/tests/util/test_clock.py b/tests/util/test_clock.py
new file mode 100644
index 0000000000..9672603579
--- /dev/null
+++ b/tests/util/test_clock.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse import util
+from twisted.internet import defer
+from tests import unittest
+
+
+class ClockTestCase(unittest.TestCase):
+    @defer.inlineCallbacks
+    def test_time_bound_deferred(self):
+        # just a deferred which never resolves
+        slow_deferred = defer.Deferred()
+
+        clock = util.Clock()
+        time_bound = clock.time_bound_deferred(slow_deferred, 0.001)
+
+        try:
+            yield time_bound
+            self.fail("Expected timedout error, but got nothing")
+        except util.DeferredTimedOutError:
+            pass
-- 
cgit 1.4.1


From 5a16cb4bf036c6b1914d6c6248ed640c289b59e3 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 11:10:36 +0000
Subject: Ignore backoff history for invites, aliases, and roomdirs

Add a param to the federation client which lets us ignore historical backoff
data for federation queries, and set it for a handful of operations.
---
 synapse/federation/federation_client.py |  7 +++++--
 synapse/federation/transport/client.py  |  6 +++++-
 synapse/handlers/directory.py           |  1 +
 synapse/handlers/profile.py             |  6 ++++--
 synapse/http/matrixfederationclient.py  | 33 ++++++++++++++++++++++++++-------
 synapse/util/retryutils.py              | 13 +++++++++++--
 6 files changed, 52 insertions(+), 14 deletions(-)

diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index dc44727b36..deee0f4904 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -88,7 +88,7 @@ class FederationClient(FederationBase):
 
     @log_function
     def make_query(self, destination, query_type, args,
-                   retry_on_dns_fail=False):
+                   retry_on_dns_fail=False, ignore_backoff=False):
         """Sends a federation Query to a remote homeserver of the given type
         and arguments.
 
@@ -98,6 +98,8 @@ class FederationClient(FederationBase):
                 handler name used in register_query_handler().
             args (dict): Mapping of strings to strings containing the details
                 of the query request.
+            ignore_backoff (bool): true to ignore the historical backoff data
+                and try the request anyway.
 
         Returns:
             a Deferred which will eventually yield a JSON object from the
@@ -106,7 +108,8 @@ class FederationClient(FederationBase):
         sent_queries_counter.inc(query_type)
 
         return self.transport_layer.make_query(
-            destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
+            destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
+            ignore_backoff=ignore_backoff,
         )
 
     @log_function
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index cc9bc7f14b..15a03378f5 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -175,7 +175,8 @@ class TransportLayerClient(object):
 
     @defer.inlineCallbacks
     @log_function
-    def make_query(self, destination, query_type, args, retry_on_dns_fail):
+    def make_query(self, destination, query_type, args, retry_on_dns_fail,
+                   ignore_backoff=False):
         path = PREFIX + "/query/%s" % query_type
 
         content = yield self.client.get_json(
@@ -184,6 +185,7 @@ class TransportLayerClient(object):
             args=args,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=10000,
+            ignore_backoff=ignore_backoff,
         )
 
         defer.returnValue(content)
@@ -243,6 +245,7 @@ class TransportLayerClient(object):
             destination=destination,
             path=path,
             data=content,
+            ignore_backoff=True,
         )
 
         defer.returnValue(response)
@@ -270,6 +273,7 @@ class TransportLayerClient(object):
             destination=remote_server,
             path=path,
             args=args,
+            ignore_backoff=True,
         )
 
         defer.returnValue(response)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 1b5317edf5..943554ce98 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler):
                         "room_alias": room_alias.to_string(),
                     },
                     retry_on_dns_fail=False,
+                    ignore_backoff=True,
                 )
             except CodeMessageException as e:
                 logging.warn("Error retrieving alias")
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index abd1fb28cb..9bf638f818 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler):
                     args={
                         "user_id": target_user.to_string(),
                         "field": "displayname",
-                    }
+                    },
+                    ignore_backoff=True,
                 )
             except CodeMessageException as e:
                 if e.code != 404:
@@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler):
                     args={
                         "user_id": target_user.to_string(),
                         "field": "avatar_url",
-                    }
+                    },
+                    ignore_backoff=True,
                 )
             except CodeMessageException as e:
                 if e.code != 404:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b0885dc979..f9e32ef03d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -106,12 +106,16 @@ class MatrixFederationHttpClient(object):
     def _request(self, destination, method, path,
                  body_callback, headers_dict={}, param_bytes=b"",
                  query_bytes=b"", retry_on_dns_fail=True,
-                 timeout=None, long_retries=False, backoff_on_404=False):
+                 timeout=None, long_retries=False,
+                 ignore_backoff=False,
+                 backoff_on_404=False):
         """ Creates and sends a request to the given server
         Args:
             destination (str): The remote server to send the HTTP request to.
             method (str): HTTP method
             path (str): The HTTP path
+            ignore_backoff (bool): true to ignore the historical backoff data
+                and try the request anyway.
             backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
@@ -127,6 +131,7 @@ class MatrixFederationHttpClient(object):
             self.clock,
             self._store,
             backoff_on_404=backoff_on_404,
+            ignore_backoff=ignore_backoff,
         )
 
         destination = destination.encode("ascii")
@@ -271,7 +276,9 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def put_json(self, destination, path, data={}, json_data_callback=None,
-                 long_retries=False, timeout=None, backoff_on_404=False):
+                 long_retries=False, timeout=None,
+                 ignore_backoff=False,
+                 backoff_on_404=False):
         """ Sends the specifed json data using PUT
 
         Args:
@@ -286,6 +293,8 @@ class MatrixFederationHttpClient(object):
                 retry for a short or long time.
             timeout(int): How long to try (in ms) the destination for before
                 giving up. None indicates no timeout.
+            ignore_backoff (bool): true to ignore the historical backoff data
+                and try the request anyway.
             backoff_on_404 (bool): True if we should count a 404 response as
                 a failure of the server (and should therefore back off future
                 requests)
@@ -319,6 +328,7 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
             timeout=timeout,
+            ignore_backoff=ignore_backoff,
             backoff_on_404=backoff_on_404,
         )
 
@@ -331,7 +341,7 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}, long_retries=False,
-                  timeout=None):
+                  timeout=None, ignore_backoff=False):
         """ Sends the specifed json data using POST
 
         Args:
@@ -344,7 +354,8 @@ class MatrixFederationHttpClient(object):
                 retry for a short or long time.
             timeout(int): How long to try (in ms) the destination for before
                 giving up. None indicates no timeout.
-
+            ignore_backoff (bool): true to ignore the historical backoff data and
+                try the request anyway.
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body. On a 4xx or 5xx error response a
@@ -368,6 +379,7 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
             timeout=timeout,
+            ignore_backoff=ignore_backoff,
         )
 
         if 200 <= response.code < 300:
@@ -380,7 +392,7 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
-                 timeout=None):
+                 timeout=None, ignore_backoff=False):
         """ GETs some json from the given host homeserver and path
 
         Args:
@@ -392,6 +404,8 @@ class MatrixFederationHttpClient(object):
             timeout (int): How long to try (in ms) the destination for before
                 giving up. None indicates no timeout and that the request will
                 be retried.
+            ignore_backoff (bool): true to ignore the historical backoff data
+                and try the request anyway.
         Returns:
             Deferred: Succeeds when we get *any* HTTP response.
 
@@ -424,6 +438,7 @@ class MatrixFederationHttpClient(object):
             body_callback=body_callback,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
+            ignore_backoff=ignore_backoff,
         )
 
         if 200 <= response.code < 300:
@@ -436,13 +451,16 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def get_file(self, destination, path, output_stream, args={},
-                 retry_on_dns_fail=True, max_size=None):
+                 retry_on_dns_fail=True, max_size=None,
+                 ignore_backoff=False):
         """GETs a file from a given homeserver
         Args:
             destination (str): The remote server to send the HTTP request to.
             path (str): The HTTP path to GET.
             output_stream (file): File to write the response body to.
             args (dict): Optional dictionary used to create the query string.
+            ignore_backoff (bool): true to ignore the historical backoff data
+                and try the request anyway.
         Returns:
             Deferred: resolves with an (int,dict) tuple of the file length and
             a dict of the response headers.
@@ -473,7 +491,8 @@ class MatrixFederationHttpClient(object):
             path,
             query_bytes=query_bytes,
             body_callback=body_callback,
-            retry_on_dns_fail=retry_on_dns_fail
+            retry_on_dns_fail=retry_on_dns_fail,
+            ignore_backoff=ignore_backoff,
         )
 
         headers = dict(response.headers.getAllRawHeaders())
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 7e5a952584..7f5299bd32 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -35,7 +35,8 @@ class NotRetryingDestination(Exception):
 
 
 @defer.inlineCallbacks
-def get_retry_limiter(destination, clock, store, **kwargs):
+def get_retry_limiter(destination, clock, store, ignore_backoff=False,
+                      **kwargs):
     """For a given destination check if we have previously failed to
     send a request there and are waiting before retrying the destination.
     If we are not ready to retry the destination, this will raise a
@@ -43,6 +44,14 @@ def get_retry_limiter(destination, clock, store, **kwargs):
     that will mark the destination as down if an exception is thrown (excluding
     CodeMessageException with code < 500)
 
+    Args:
+        destination (str): name of homeserver
+        clock (synapse.util.clock): timing source
+        store (synapse.storage.transactions.TransactionStore): datastore
+        ignore_backoff (bool): true to ignore the historical backoff data and
+            try the request anyway. We will still update the next
+            retry_interval on success/failure.
+
     Example usage:
 
         try:
@@ -66,7 +75,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
 
         now = int(clock.time_msec())
 
-        if retry_last_ts + retry_interval > now:
+        if not ignore_backoff and retry_last_ts + retry_interval > now:
             raise NotRetryingDestination(
                 retry_last_ts=retry_last_ts,
                 retry_interval=retry_interval,
-- 
cgit 1.4.1


From 0bfea9a2bec8e3379ecd521c9f40dc392246a8b4 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 13:20:08 +0000
Subject: fix tests

---
 tests/handlers/test_directory.py | 1 +
 tests/handlers/test_profile.py   | 3 ++-
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py
index ceb9aa5765..5712773909 100644
--- a/tests/handlers/test_directory.py
+++ b/tests/handlers/test_directory.py
@@ -93,6 +93,7 @@ class DirectoryTestCase(unittest.TestCase):
                 "room_alias": "#another:remote",
             },
             retry_on_dns_fail=False,
+            ignore_backoff=True,
         )
 
     @defer.inlineCallbacks
diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py
index 979cebf600..2a203129ca 100644
--- a/tests/handlers/test_profile.py
+++ b/tests/handlers/test_profile.py
@@ -119,7 +119,8 @@ class ProfileTestCase(unittest.TestCase):
         self.mock_federation.make_query.assert_called_with(
             destination="remote",
             query_type="profile",
-            args={"user_id": "@alice:remote", "field": "displayname"}
+            args={"user_id": "@alice:remote", "field": "displayname"},
+            ignore_backoff=True,
         )
 
     @defer.inlineCallbacks
-- 
cgit 1.4.1


From e56c79c114db2332a25dbf4b95c351a4d2684771 Mon Sep 17 00:00:00 2001
From: pik <alexander.maznev@gmail.com>
Date: Thu, 23 Mar 2017 11:42:07 -0300
Subject: check_valid_filter using JSONSchema  * add invalid filter tests

Signed-off-by: pik <alexander.maznev@gmail.com>
---
 synapse/api/filtering.py    | 251 ++++++++++++++++++++++++++++----------------
 tests/api/test_filtering.py |  18 +++-
 2 files changed, 175 insertions(+), 94 deletions(-)

diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 47f0cf0fa9..6acf986351 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -15,10 +15,163 @@
 from synapse.api.errors import SynapseError
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, RoomID
-
 from twisted.internet import defer
 
 import ujson as json
+import jsonschema
+
+FILTER_SCHEMA = {
+    "additionalProperties": False,
+    "type": "object",
+    "properties": {
+        "limit": {
+            "type": "number"
+        },
+        "senders": {
+            "$ref": "#/definitions/user_id_array"
+        },
+        "not_senders": {
+            "$ref": "#/definitions/user_id_array"
+        },
+        # TODO: We don't limit event type values but we probably should...
+        # check types are valid event types
+        "types": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        },
+        "not_types": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
+
+ROOM_FILTER_SCHEMA = {
+    "additionalProperties": False,
+    "type": "object",
+    "properties": {
+        "not_rooms": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        },
+        "rooms": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        },
+        "ephemeral": {
+            "$ref": "#/definitions/room_event_filter"
+        },
+        "include_leave": {
+            "type": "boolean"
+        },
+        "state": {
+            "$ref": "#/definitions/room_event_filter"
+        },
+        "timeline": {
+            "$ref": "#/definitions/room_event_filter"
+        },
+        "accpount_data": {
+            "$ref": "#/definitions/room_event_filter"
+        },
+    }
+}
+
+ROOM_EVENT_FILTER_SCHEMA = {
+    "additionalProperties": False,
+    "type": "object",
+    "properties": {
+        "limit": {
+            "type": "number"
+        },
+        "senders": {
+            "$ref": "#/definitions/user_id_array"
+        },
+        "not_senders": {
+            "$ref": "#/definitions/user_id_array"
+        },
+        "types": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        },
+        "not_types": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        },
+        "rooms": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        },
+        "not_rooms": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        },
+        "contains_url": {
+            "type": "boolean"
+        }
+    }
+}
+
+USER_ID_ARRAY_SCHEMA = {
+    "type": "array",
+    "items": {
+        "type": "string",
+        "pattern": "^[A-Za-z0-9_]+:[A-Za-z0-9_-\.]+$"
+    }
+}
+
+USER_FILTER_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "description": "schema for a Sync filter",
+    "type": "object",
+    "definitions": {
+        "user_id_array": USER_ID_ARRAY_SCHEMA,
+        "filter": FILTER_SCHEMA,
+        "room_filter": ROOM_FILTER_SCHEMA,
+        "room_event_filter": ROOM_EVENT_FILTER_SCHEMA
+    },
+    "properties": {
+        "presence": {
+            "$ref": "#/definitions/filter"
+        },
+        "account_data": {
+            "$ref": "#/definitions/filter"
+        },
+        "room": {
+            "$ref": "#/definitions/room_filter"
+        },
+        # "event_format": {
+        #     "type": { "enum": [ "client", "federation" ] }
+        # },
+        "event_fields": {
+            "type": "array",
+            "items": {
+                "type": "string",
+                # Don't allow '\\' in event field filters. This makes matching
+                # events a lot easier as we can then use a negative lookbehind
+                # assertion to split '\.' If we allowed \\ then it would
+                # incorrectly split '\\.' See synapse.events.utils.serialize_event
+                "pattern": "^((?!\\\).)*$"
+            }
+        }
+    },
+    "additionalProperties": False
+}
 
 
 class Filtering(object):
@@ -53,98 +206,10 @@ class Filtering(object):
         # NB: Filters are the complete json blobs. "Definitions" are an
         # individual top-level key e.g. public_user_data. Filters are made of
         # many definitions.
-
-        top_level_definitions = [
-            "presence", "account_data"
-        ]
-
-        room_level_definitions = [
-            "state", "timeline", "ephemeral", "account_data"
-        ]
-
-        for key in top_level_definitions:
-            if key in user_filter_json:
-                self._check_definition(user_filter_json[key])
-
-        if "room" in user_filter_json:
-            self._check_definition_room_lists(user_filter_json["room"])
-            for key in room_level_definitions:
-                if key in user_filter_json["room"]:
-                    self._check_definition(user_filter_json["room"][key])
-
-        if "event_fields" in user_filter_json:
-            if type(user_filter_json["event_fields"]) != list:
-                raise SynapseError(400, "event_fields must be a list of strings")
-            for field in user_filter_json["event_fields"]:
-                if not isinstance(field, basestring):
-                    raise SynapseError(400, "Event field must be a string")
-                # Don't allow '\\' in event field filters. This makes matching
-                # events a lot easier as we can then use a negative lookbehind
-                # assertion to split '\.' If we allowed \\ then it would
-                # incorrectly split '\\.' See synapse.events.utils.serialize_event
-                if r'\\' in field:
-                    raise SynapseError(
-                        400, r'The escape character \ cannot itself be escaped'
-                    )
-
-    def _check_definition_room_lists(self, definition):
-        """Check that "rooms" and "not_rooms" are lists of room ids if they
-        are present
-
-        Args:
-            definition(dict): The filter definition
-        Raises:
-            SynapseError: If there was a problem with this definition.
-        """
-        # check rooms are valid room IDs
-        room_id_keys = ["rooms", "not_rooms"]
-        for key in room_id_keys:
-            if key in definition:
-                if type(definition[key]) != list:
-                    raise SynapseError(400, "Expected %s to be a list." % key)
-                for room_id in definition[key]:
-                    RoomID.from_string(room_id)
-
-    def _check_definition(self, definition):
-        """Check if the provided definition is valid.
-
-        This inspects not only the types but also the values to make sure they
-        make sense.
-
-        Args:
-            definition(dict): The filter definition
-        Raises:
-            SynapseError: If there was a problem with this definition.
-        """
-        # NB: Filters are the complete json blobs. "Definitions" are an
-        # individual top-level key e.g. public_user_data. Filters are made of
-        # many definitions.
-        if type(definition) != dict:
-            raise SynapseError(
-                400, "Expected JSON object, not %s" % (definition,)
-            )
-
-        self._check_definition_room_lists(definition)
-
-        # check senders are valid user IDs
-        user_id_keys = ["senders", "not_senders"]
-        for key in user_id_keys:
-            if key in definition:
-                if type(definition[key]) != list:
-                    raise SynapseError(400, "Expected %s to be a list." % key)
-                for user_id in definition[key]:
-                    UserID.from_string(user_id)
-
-        # TODO: We don't limit event type values but we probably should...
-        # check types are valid event types
-        event_keys = ["types", "not_types"]
-        for key in event_keys:
-            if key in definition:
-                if type(definition[key]) != list:
-                    raise SynapseError(400, "Expected %s to be a list." % key)
-                for event_type in definition[key]:
-                    if not isinstance(event_type, basestring):
-                        raise SynapseError(400, "Event type should be a string")
+        try:
+            jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA)
+        except jsonschema.ValidationError as e:
+            raise SynapseError(400, e.message)
 
 
 class FilterCollection(object):
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index 50e8607c14..ce4116ff56 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -23,6 +23,7 @@ from tests.utils import (
 
 from synapse.api.filtering import Filter
 from synapse.events import FrozenEvent
+from synapse.api.errors import SynapseError
 
 user_localpart = "test_user"
 
@@ -34,7 +35,6 @@ def MockEvent(**kwargs):
         kwargs["type"] = "fake_type"
     return FrozenEvent(kwargs)
 
-
 class FilteringTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
@@ -54,6 +54,22 @@ class FilteringTestCase(unittest.TestCase):
 
         self.datastore = hs.get_datastore()
 
+    def test_errors_on_invalid_filters(self):
+        invalid_filters = [
+            { "boom": {} },
+            { "account_data": "Hello World" },
+            { "event_fields": ["\\foo"] },
+            { "room": { "timeline" : { "limit" : 0 }, "state": { "not_bars": ["*"]} } },
+        ]
+        for filter in invalid_filters:
+            with self.assertRaises(SynapseError) as check_filter_error:
+                self.filtering.check_valid_filter(filter)
+                self.assertIsInstance(check_filter_error.exception, SynapseError)
+
+    def test_limits_are_applied(self):
+        #TODO
+        pass
+
     def test_definition_types_works_with_literals(self):
         definition = {
             "types": ["m.room.message", "org.matrix.foo.bar"]
-- 
cgit 1.4.1


From acafcf1c5b1c40fe14054a1d5bb8277a9f16a492 Mon Sep 17 00:00:00 2001
From: pik <alexander.maznev@gmail.com>
Date: Tue, 7 Mar 2017 20:54:02 -0300
Subject: Add valid filter tests, flake8, fix typo

Signed-off-by: pik <alexander.maznev@gmail.com>
---
 synapse/api/filtering.py    | 11 ++++-----
 tests/api/test_filtering.py | 54 ++++++++++++++++++++++++++++++++++++++++-----
 2 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 6acf986351..3d078e622e 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -78,7 +78,7 @@ ROOM_FILTER_SCHEMA = {
         "timeline": {
             "$ref": "#/definitions/room_event_filter"
         },
-        "accpount_data": {
+        "account_data": {
             "$ref": "#/definitions/room_event_filter"
         },
     }
@@ -131,7 +131,7 @@ USER_ID_ARRAY_SCHEMA = {
     "type": "array",
     "items": {
         "type": "string",
-        "pattern": "^[A-Za-z0-9_]+:[A-Za-z0-9_-\.]+$"
+        "pattern": "^@[A-Za-z0-9_]+:[A-Za-z0-9_\-\.]+$"
     }
 }
 
@@ -155,9 +155,10 @@ USER_FILTER_SCHEMA = {
         "room": {
             "$ref": "#/definitions/room_filter"
         },
-        # "event_format": {
-        #     "type": { "enum": [ "client", "federation" ] }
-        # },
+        "event_format": {
+            "type": "string",
+            "enum": ["client", "federation"]
+        },
         "event_fields": {
             "type": "array",
             "items": {
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index ce4116ff56..1ce1acb3cf 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -35,6 +35,7 @@ def MockEvent(**kwargs):
         kwargs["type"] = "fake_type"
     return FrozenEvent(kwargs)
 
+
 class FilteringTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
@@ -56,18 +57,61 @@ class FilteringTestCase(unittest.TestCase):
 
     def test_errors_on_invalid_filters(self):
         invalid_filters = [
-            { "boom": {} },
-            { "account_data": "Hello World" },
-            { "event_fields": ["\\foo"] },
-            { "room": { "timeline" : { "limit" : 0 }, "state": { "not_bars": ["*"]} } },
+            {"boom": {}},
+            {"account_data": "Hello World"},
+            {"event_fields": ["\\foo"]},
+            {"room": {"timeline": {"limit": 0}, "state": {"not_bars": ["*"]}}},
+            {"event_format": "other"}
         ]
         for filter in invalid_filters:
             with self.assertRaises(SynapseError) as check_filter_error:
                 self.filtering.check_valid_filter(filter)
                 self.assertIsInstance(check_filter_error.exception, SynapseError)
 
+    def test_valid_filters(self):
+        valid_filters = [
+            {
+                "room": {
+                    "timeline": {"limit": 20},
+                    "state": {"not_types": ["m.room.member"]},
+                    "ephemeral": {"limit": 0, "not_types": ["*"]},
+                    "include_leave": False,
+                    "rooms": ["#dee:pik-test"],
+                    "not_rooms": ["#gee:pik-test"],
+                    "account_data": {"limit": 0, "types": ["*"]}
+                }
+            },
+            {
+                "room": {
+                    "state": {
+                        "types": ["m.room.*"],
+                        "not_rooms": ["!726s6s6q:example.com"]
+                    },
+                    "timeline": {
+                        "limit": 10,
+                        "types": ["m.room.message"],
+                        "not_rooms": ["!726s6s6q:example.com"],
+                        "not_senders": ["@spam:example.com"]
+                    },
+                    "ephemeral": {
+                        "types": ["m.receipt", "m.typing"],
+                        "not_rooms": ["!726s6s6q:example.com"],
+                        "not_senders": ["@spam:example.com"]
+                    }
+                },
+                "presence": {
+                    "types": ["m.presence"],
+                    "not_senders": ["@alice:example.com"]
+                },
+                "event_format": "client",
+                "event_fields": ["type", "content", "sender"]
+            }
+        ]
+        for filter in valid_filters:
+            self.filtering.check_valid_filter(filter)
+
     def test_limits_are_applied(self):
-        #TODO
+        # TODO
         pass
 
     def test_definition_types_works_with_literals(self):
-- 
cgit 1.4.1


From 566641a0b5f04e444383d5e3a5493b22e551ff03 Mon Sep 17 00:00:00 2001
From: pik <alexander.maznev@gmail.com>
Date: Thu, 23 Mar 2017 11:42:41 -0300
Subject: use jsonschema.FormatChecker for RoomID and UserID strings  * use a
 valid filter in rest/client/v2_alpha test

Signed-off-by: pik <alexander.maznev@gmail.com>
---
 synapse/api/filtering.py                  | 45 ++++++++++++++++++-------------
 tests/api/test_filtering.py               | 15 ++++++++---
 tests/rest/client/v2_alpha/test_filter.py |  4 +--
 3 files changed, 40 insertions(+), 24 deletions(-)

diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 3d078e622e..83206348e5 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 
 import ujson as json
 import jsonschema
+from jsonschema import FormatChecker
 
 FILTER_SCHEMA = {
     "additionalProperties": False,
@@ -55,16 +56,10 @@ ROOM_FILTER_SCHEMA = {
     "type": "object",
     "properties": {
         "not_rooms": {
-            "type": "array",
-            "items": {
-                "type": "string"
-            }
+            "$ref": "#/definitions/room_id_array"
         },
         "rooms": {
-            "type": "array",
-            "items": {
-                "type": "string"
-            }
+            "$ref": "#/definitions/room_id_array"
         },
         "ephemeral": {
             "$ref": "#/definitions/room_event_filter"
@@ -110,16 +105,10 @@ ROOM_EVENT_FILTER_SCHEMA = {
             }
         },
         "rooms": {
-            "type": "array",
-            "items": {
-                "type": "string"
-            }
+            "$ref": "#/definitions/room_id_array"
         },
         "not_rooms": {
-            "type": "array",
-            "items": {
-                "type": "string"
-            }
+            "$ref": "#/definitions/room_id_array"
         },
         "contains_url": {
             "type": "boolean"
@@ -131,7 +120,15 @@ USER_ID_ARRAY_SCHEMA = {
     "type": "array",
     "items": {
         "type": "string",
-        "pattern": "^@[A-Za-z0-9_]+:[A-Za-z0-9_\-\.]+$"
+        "format": "matrix_user_id"
+    }
+}
+
+ROOM_ID_ARRAY_SCHEMA = {
+    "type": "array",
+    "items": {
+        "type": "string",
+        "format": "matrix_room_id"
     }
 }
 
@@ -140,6 +137,7 @@ USER_FILTER_SCHEMA = {
     "description": "schema for a Sync filter",
     "type": "object",
     "definitions": {
+        "room_id_array": ROOM_ID_ARRAY_SCHEMA,
         "user_id_array": USER_ID_ARRAY_SCHEMA,
         "filter": FILTER_SCHEMA,
         "room_filter": ROOM_FILTER_SCHEMA,
@@ -175,6 +173,16 @@ USER_FILTER_SCHEMA = {
 }
 
 
+@FormatChecker.cls_checks('matrix_room_id')
+def matrix_room_id_validator(room_id_str):
+    return RoomID.from_string(room_id_str)
+
+
+@FormatChecker.cls_checks('matrix_user_id')
+def matrix_user_id_validator(user_id_str):
+    return UserID.from_string(user_id_str)
+
+
 class Filtering(object):
 
     def __init__(self, hs):
@@ -208,7 +216,8 @@ class Filtering(object):
         # individual top-level key e.g. public_user_data. Filters are made of
         # many definitions.
         try:
-            jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA)
+            jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
+                                format_checker=FormatChecker())
         except jsonschema.ValidationError as e:
             raise SynapseError(400, e.message)
 
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index 1ce1acb3cf..dcceca7f3e 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -25,6 +25,8 @@ from synapse.api.filtering import Filter
 from synapse.events import FrozenEvent
 from synapse.api.errors import SynapseError
 
+import jsonschema
+
 user_localpart = "test_user"
 
 
@@ -61,7 +63,9 @@ class FilteringTestCase(unittest.TestCase):
             {"account_data": "Hello World"},
             {"event_fields": ["\\foo"]},
             {"room": {"timeline": {"limit": 0}, "state": {"not_bars": ["*"]}}},
-            {"event_format": "other"}
+            {"event_format": "other"},
+            {"room": {"not_rooms": ["#foo:pik-test"]}},
+            {"presence": {"senders": ["@bar;pik.test.com"]}}
         ]
         for filter in invalid_filters:
             with self.assertRaises(SynapseError) as check_filter_error:
@@ -76,8 +80,8 @@ class FilteringTestCase(unittest.TestCase):
                     "state": {"not_types": ["m.room.member"]},
                     "ephemeral": {"limit": 0, "not_types": ["*"]},
                     "include_leave": False,
-                    "rooms": ["#dee:pik-test"],
-                    "not_rooms": ["#gee:pik-test"],
+                    "rooms": ["!dee:pik-test"],
+                    "not_rooms": ["!gee:pik-test"],
                     "account_data": {"limit": 0, "types": ["*"]}
                 }
             },
@@ -108,7 +112,10 @@ class FilteringTestCase(unittest.TestCase):
             }
         ]
         for filter in valid_filters:
-            self.filtering.check_valid_filter(filter)
+            try:
+                self.filtering.check_valid_filter(filter)
+            except jsonschema.ValidationError as e:
+                self.fail(e)
 
     def test_limits_are_applied(self):
         # TODO
diff --git a/tests/rest/client/v2_alpha/test_filter.py b/tests/rest/client/v2_alpha/test_filter.py
index 3d27d03cbf..76b833e119 100644
--- a/tests/rest/client/v2_alpha/test_filter.py
+++ b/tests/rest/client/v2_alpha/test_filter.py
@@ -33,8 +33,8 @@ PATH_PREFIX = "/_matrix/client/v2_alpha"
 class FilterTestCase(unittest.TestCase):
 
     USER_ID = "@apple:test"
-    EXAMPLE_FILTER = {"type": ["m.*"]}
-    EXAMPLE_FILTER_JSON = '{"type": ["m.*"]}'
+    EXAMPLE_FILTER = {"room": {"timeline": {"types": ["m.room.message"]}}}
+    EXAMPLE_FILTER_JSON = '{"room": {"timeline": {"types": ["m.room.message"]}}}'
     TO_REGISTER = [filter]
 
     @defer.inlineCallbacks
-- 
cgit 1.4.1


From 250ce11ab9589b09f827e7f633ac340993eaac9a Mon Sep 17 00:00:00 2001
From: pik <alexander.maznev@gmail.com>
Date: Thu, 23 Mar 2017 11:42:47 -0300
Subject: Add jsonschema to python_dependencies.py

Signed-off-by: pik <alexander.maznev@gmail.com>
---
 synapse/python_dependencies.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index c4777b2a2b..ed7f1c89ad 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -19,6 +19,7 @@ from distutils.version import LooseVersion
 logger = logging.getLogger(__name__)
 
 REQUIREMENTS = {
+    "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
     "frozendict>=0.4": ["frozendict"],
     "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
     "canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"],
-- 
cgit 1.4.1


From 00957d1aa4b01a199ab2c3abf30032a0ca0b1e12 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Thu, 23 Mar 2017 17:53:49 +0000
Subject: User Cursor.__iter__ instead of fetchall

This prevents unnecessary construction of lists
---
 synapse/storage/_base.py              | 13 ++++++++-----
 synapse/storage/account_data.py       |  4 ++--
 synapse/storage/deviceinbox.py        | 10 +++++-----
 synapse/storage/devices.py            |  7 +++----
 synapse/storage/end_to_end_keys.py    |  4 ++--
 synapse/storage/event_federation.py   | 13 ++++++-------
 synapse/storage/event_push_actions.py |  2 +-
 synapse/storage/events.py             |  2 +-
 synapse/storage/prepare_database.py   |  2 +-
 synapse/storage/receipts.py           |  5 ++---
 synapse/storage/registration.py       |  2 +-
 synapse/storage/room.py               |  4 ++--
 synapse/storage/signatures.py         |  2 +-
 synapse/storage/state.py              |  3 +--
 synapse/storage/tags.py               |  4 ++--
 tests/storage/test_base.py            |  4 ++--
 16 files changed, 40 insertions(+), 41 deletions(-)

diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 13b106bba1..93d9ed5d62 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -73,6 +73,9 @@ class LoggingTransaction(object):
     def __setattr__(self, name, value):
         setattr(self.txn, name, value)
 
+    def __iter__(self):
+        return self.txn.__iter__()
+
     def execute(self, sql, *args):
         self._do_execute(self.txn.execute, sql, *args)
 
@@ -357,7 +360,7 @@ class SQLBaseStore(object):
         """
         col_headers = list(intern(column[0]) for column in cursor.description)
         results = list(
-            dict(zip(col_headers, row)) for row in cursor.fetchall()
+            dict(zip(col_headers, row)) for row in cursor
         )
         return results
 
@@ -579,7 +582,7 @@ class SQLBaseStore(object):
 
         txn.execute(sql, keyvalues.values())
 
-        return [r[0] for r in txn.fetchall()]
+        return [r[0] for r in txn]
 
     def _simple_select_onecol(self, table, keyvalues, retcol,
                               desc="_simple_select_onecol"):
@@ -901,14 +904,14 @@ class SQLBaseStore(object):
 
         txn = db_conn.cursor()
         txn.execute(sql, (int(max_value),))
-        rows = txn.fetchall()
-        txn.close()
 
         cache = {
             row[0]: int(row[1])
-            for row in rows
+            for row in txn
         }
 
+        txn.close()
+
         if cache:
             min_val = min(cache.values())
         else:
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 3fa226e92d..aa84ffc2b0 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -182,7 +182,7 @@ class AccountDataStore(SQLBaseStore):
             txn.execute(sql, (user_id, stream_id))
 
             global_account_data = {
-                row[0]: json.loads(row[1]) for row in txn.fetchall()
+                row[0]: json.loads(row[1]) for row in txn
             }
 
             sql = (
@@ -193,7 +193,7 @@ class AccountDataStore(SQLBaseStore):
             txn.execute(sql, (user_id, stream_id))
 
             account_data_by_room = {}
-            for row in txn.fetchall():
+            for row in txn:
                 room_account_data = account_data_by_room.setdefault(row[0], {})
                 room_account_data[row[1]] = json.loads(row[2])
 
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 7925cb5f1b..2714519d21 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -178,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 )
                 txn.execute(sql, (user_id,))
                 message_json = ujson.dumps(messages_by_device["*"])
-                for row in txn.fetchall():
+                for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
                     device = row[0]
@@ -195,7 +195,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 # TODO: Maybe this needs to be done in batches if there are
                 # too many local devices for a given user.
                 txn.execute(sql, [user_id] + devices)
-                for row in txn.fetchall():
+                for row in txn:
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
@@ -251,7 +251,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 user_id, device_id, last_stream_id, current_stream_id, limit
             ))
             messages = []
-            for row in txn.fetchall():
+            for row in txn:
                 stream_pos = row[0]
                 messages.append(ujson.loads(row[1]))
             if len(messages) < limit:
@@ -340,7 +340,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 " ORDER BY stream_id ASC"
             )
             txn.execute(sql, (last_pos, upper_pos))
-            rows.extend(txn.fetchall())
+            rows.extend(txn)
 
             return rows
 
@@ -384,7 +384,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 destination, last_stream_id, current_stream_id, limit
             ))
             messages = []
-            for row in txn.fetchall():
+            for row in txn:
                 stream_pos = row[0]
                 messages.append(ujson.loads(row[1]))
             if len(messages) < limit:
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index e545b62e39..6beeff8b00 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -333,13 +333,12 @@ class DeviceStore(SQLBaseStore):
         txn.execute(
             sql, (destination, from_stream_id, now_stream_id, False)
         )
-        rows = txn.fetchall()
 
-        if not rows:
+        # maps (user_id, device_id) -> stream_id
+        query_map = {(r[0], r[1]): r[2] for r in txn}
+        if not query_map:
             return (now_stream_id, [])
 
-        # maps (user_id, device_id) -> stream_id
-        query_map = {(r[0], r[1]): r[2] for r in rows}
         devices = self._get_e2e_device_keys_txn(
             txn, query_map.keys(), include_all_devices=True
         )
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index b9f1365f92..58bde65b6c 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -153,7 +153,7 @@ class EndToEndKeyStore(SQLBaseStore):
             )
             txn.execute(sql, (user_id, device_id))
             result = {}
-            for algorithm, key_count in txn.fetchall():
+            for algorithm, key_count in txn:
                 result[algorithm] = key_count
             return result
         return self.runInteraction(
@@ -174,7 +174,7 @@ class EndToEndKeyStore(SQLBaseStore):
                 user_result = result.setdefault(user_id, {})
                 device_result = user_result.setdefault(device_id, {})
                 txn.execute(sql, (user_id, device_id, algorithm))
-                for key_id, key_json in txn.fetchall():
+                for key_id, key_json in txn:
                     device_result[algorithm + ":" + key_id] = key_json
                     delete.append((user_id, device_id, algorithm, key_id))
             sql = (
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0d97de2fe7..43b5b49986 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -74,7 +74,7 @@ class EventFederationStore(SQLBaseStore):
                     base_sql % (",".join(["?"] * len(chunk)),),
                     chunk
                 )
-                new_front.update([r[0] for r in txn.fetchall()])
+                new_front.update([r[0] for r in txn])
 
             new_front -= results
 
@@ -110,7 +110,7 @@ class EventFederationStore(SQLBaseStore):
 
         txn.execute(sql, (room_id, False,))
 
-        return dict(txn.fetchall())
+        return dict(txn)
 
     def _get_oldest_events_in_room_txn(self, txn, room_id):
         return self._simple_select_onecol_txn(
@@ -152,7 +152,7 @@ class EventFederationStore(SQLBaseStore):
         txn.execute(sql, (room_id, ))
 
         results = []
-        for event_id, depth in txn.fetchall():
+        for event_id, depth in txn:
             hashes = self._get_event_reference_hashes_txn(txn, event_id)
             prev_hashes = {
                 k: encode_base64(v) for k, v in hashes.items()
@@ -334,8 +334,7 @@ class EventFederationStore(SQLBaseStore):
 
         def get_forward_extremeties_for_room_txn(txn):
             txn.execute(sql, (stream_ordering, room_id))
-            rows = txn.fetchall()
-            return [event_id for event_id, in rows]
+            return [event_id for event_id, in txn]
 
         return self.runInteraction(
             "get_forward_extremeties_for_room",
@@ -436,7 +435,7 @@ class EventFederationStore(SQLBaseStore):
                 (room_id, event_id, False, limit - len(event_results))
             )
 
-            for row in txn.fetchall():
+            for row in txn:
                 if row[1] not in event_results:
                     queue.put((-row[0], row[1]))
 
@@ -482,7 +481,7 @@ class EventFederationStore(SQLBaseStore):
                     (room_id, event_id, False, limit - len(event_results))
                 )
 
-                for e_id, in txn.fetchall():
+                for e_id, in txn:
                     new_front.add(e_id)
 
             new_front -= earliest_events
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 14543b4269..d6d8723b4a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -206,7 +206,7 @@ class EventPushActionsStore(SQLBaseStore):
                 " stream_ordering >= ? AND stream_ordering <= ?"
             )
             txn.execute(sql, (min_stream_ordering, max_stream_ordering))
-            return [r[0] for r in txn.fetchall()]
+            return [r[0] for r in txn]
         ret = yield self.runInteraction("get_push_action_users_in_range", f)
         defer.returnValue(ret)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3c8393bfe8..9f2045e8d8 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -834,7 +834,7 @@ class EventsStore(SQLBaseStore):
 
         have_persisted = {
             event_id: outlier
-            for event_id, outlier in txn.fetchall()
+            for event_id, outlier in txn
         }
 
         to_remove = set()
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index ed84db6b4b..6e623843d5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -356,7 +356,7 @@ def _get_or_create_schema_state(txn, database_engine):
             ),
             (current_version,)
         )
-        applied_deltas = [d for d, in txn.fetchall()]
+        applied_deltas = [d for d, in txn]
         return current_version, applied_deltas, upgraded
 
     return None
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 5cf41501ea..6b0f8c2787 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -313,10 +313,9 @@ class ReceiptsStore(SQLBaseStore):
         )
 
         txn.execute(sql, (room_id, receipt_type, user_id))
-        results = txn.fetchall()
 
-        if results and topological_ordering:
-            for to, so, _ in results:
+        if topological_ordering:
+            for to, so, _ in txn:
                 if int(to) > topological_ordering:
                     return False
                 elif int(to) == topological_ordering and int(so) >= stream_ordering:
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26be6060c3..ec2c52ab93 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -209,7 +209,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 " WHERE lower(name) = lower(?)"
             )
             txn.execute(sql, (user_id,))
-            return dict(txn.fetchall())
+            return dict(txn)
 
         return self.runInteraction("get_users_by_id_case_insensitive", f)
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 8a2fe2fdf5..e4c56cc175 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -396,7 +396,7 @@ class RoomStore(SQLBaseStore):
                     sql % ("AND appservice_id IS NULL",),
                     (stream_id,)
                 )
-            return dict(txn.fetchall())
+            return dict(txn)
         else:
             # We want to get from all lists, so we need to aggregate the results
 
@@ -422,7 +422,7 @@ class RoomStore(SQLBaseStore):
 
             results = {}
             # A room is visible if its visible on any list.
-            for room_id, visibility in txn.fetchall():
+            for room_id, visibility in txn:
                 results[room_id] = bool(visibility) or results.get(room_id, False)
 
             return results
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index e1dca927d7..67d5d9969a 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -72,7 +72,7 @@ class SignatureStore(SQLBaseStore):
             " WHERE event_id = ?"
         )
         txn.execute(query, (event_id, ))
-        return {k: v for k, v in txn.fetchall()}
+        return {k: v for k, v in txn}
 
     def _store_event_reference_hashes_txn(self, txn, events):
         """Store a hash for a PDU
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 1b42bea07a..e8bd02e773 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -373,10 +373,9 @@ class StateStore(SQLBaseStore):
                         " WHERE state_group = ? %s" % (where_clause,),
                         args
                     )
-                    rows = txn.fetchall()
                     results[group].update({
                         (typ, state_key): event_id
-                        for typ, state_key, event_id in rows
+                        for typ, state_key, event_id in txn
                         if (typ, state_key) not in results[group]
                     })
 
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 5a2c1aa59b..bff73f3f04 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -95,7 +95,7 @@ class TagsStore(SQLBaseStore):
             for stream_id, user_id, room_id in tag_ids:
                 txn.execute(sql, (user_id, room_id))
                 tags = []
-                for tag, content in txn.fetchall():
+                for tag, content in txn:
                     tags.append(json.dumps(tag) + ":" + content)
                 tag_json = "{" + ",".join(tags) + "}"
                 results.append((stream_id, user_id, room_id, tag_json))
@@ -132,7 +132,7 @@ class TagsStore(SQLBaseStore):
                 " WHERE user_id = ? AND stream_id > ?"
             )
             txn.execute(sql, (user_id, stream_id))
-            room_ids = [row[0] for row in txn.fetchall()]
+            room_ids = [row[0] for row in txn]
             return room_ids
 
         changed = self._account_data_stream_cache.has_entity_changed(
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index afbefb2e2d..91e971190c 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -89,7 +89,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_select_one_1col(self):
         self.mock_txn.rowcount = 1
-        self.mock_txn.fetchall.return_value = [("Value",)]
+        self.mock_txn.__iter__ = Mock(return_value=iter([("Value",)]))
 
         value = yield self.datastore._simple_select_one_onecol(
             table="tablename",
@@ -136,7 +136,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_select_list(self):
         self.mock_txn.rowcount = 3
-        self.mock_txn.fetchall.return_value = ((1,), (2,), (3,))
+        self.mock_txn.__iter__ = Mock(return_value=iter([(1,), (2,), (3,)]))
         self.mock_txn.description = (
             ("colA", None, None, None, None, None, None),
         )
-- 
cgit 1.4.1


From a2dfab12c5f0d27e38595763bb44a45b91374082 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 23 Mar 2017 18:46:17 +0000
Subject: Fix token request for addition of phone numbers

---
 synapse/rest/client/v2_alpha/account.py | 2 +-
 synapse/visibility.py                   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index aac76edf1c..4990b22b9f 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -268,7 +268,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
         if existingUid is not None:
             raise SynapseError(400, "MSISDN is already in use", Codes.THREEPID_IN_USE)
 
-        ret = yield self.identity_handler.requestEmailToken(**body)
+        ret = yield self.identity_handler.requestMsisdnToken(**body)
         defer.returnValue((200, ret))
 
 
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 199b16d827..31659156ae 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -134,6 +134,13 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
             if prev_membership not in MEMBERSHIP_PRIORITY:
                 prev_membership = "leave"
 
+            # Always allow the user to see their own leave events, otherwise
+            # they won't see the room disappear if they reject the invite
+            if membership == "leave" and (
+                prev_membership == "join" or prev_membership == "invite"
+            ):
+                return True
+
             new_priority = MEMBERSHIP_PRIORITY.index(membership)
             old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
             if old_priority < new_priority:
-- 
cgit 1.4.1


From 86e865d7d2f22bce2c5aa72f20938129d8277a4e Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 23 Mar 2017 18:48:30 +0000
Subject: Oops, remove unintentional change

---
 synapse/visibility.py | 7 -------
 1 file changed, 7 deletions(-)

diff --git a/synapse/visibility.py b/synapse/visibility.py
index 31659156ae..199b16d827 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -134,13 +134,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
             if prev_membership not in MEMBERSHIP_PRIORITY:
                 prev_membership = "leave"
 
-            # Always allow the user to see their own leave events, otherwise
-            # they won't see the room disappear if they reject the invite
-            if membership == "leave" and (
-                prev_membership == "join" or prev_membership == "invite"
-            ):
-                return True
-
             new_priority = MEMBERSHIP_PRIORITY.index(membership)
             old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
             if old_priority < new_priority:
-- 
cgit 1.4.1


From e1f1784f99dbb7815250e4963b6969230ea6e8d3 Mon Sep 17 00:00:00 2001
From: David Baker <dave@matrix.org>
Date: Thu, 23 Mar 2017 18:50:31 +0000
Subject: Fix rejection of invites not reaching sync

Always allow the user to see their own leave events, otherwise
they won't see the event if they reject an invite for a room whose
history visibility is set such that they cannot see events before
joining.
---
 synapse/visibility.py | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/synapse/visibility.py b/synapse/visibility.py
index 199b16d827..31659156ae 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -134,6 +134,13 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
             if prev_membership not in MEMBERSHIP_PRIORITY:
                 prev_membership = "leave"
 
+            # Always allow the user to see their own leave events, otherwise
+            # they won't see the room disappear if they reject the invite
+            if membership == "leave" and (
+                prev_membership == "join" or prev_membership == "invite"
+            ):
+                return True
+
             new_priority = MEMBERSHIP_PRIORITY.index(membership)
             old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
             if old_priority < new_priority:
-- 
cgit 1.4.1


From 13c8749ac959eb4efbbd0ce13d914ff3c1799d84 Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 13:48:19 +0000
Subject: Add another missing yield on check_device_registered

---
 synapse/rest/client/v2_alpha/register.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index dcd13b876f..3acf4eacdd 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -537,7 +537,7 @@ class RegisterRestServlet(RestServlet):
         # we have nowhere to store it.
         device_id = synapse.api.auth.GUEST_DEVICE_ID
         initial_display_name = params.get("initial_device_display_name")
-        self.device_handler.check_device_registered(
+        yield self.device_handler.check_device_registered(
             user_id, device_id, initial_display_name
         )
 
-- 
cgit 1.4.1


From a380f041c242269e61083c6540a013e5d0878c7b Mon Sep 17 00:00:00 2001
From: Richard van der Hoff <richard@matrix.org>
Date: Thu, 23 Mar 2017 10:03:47 +0000
Subject: try not to drop context after federation requests

preserve_context_over_fn uses a ContextPreservingDeferred, which only restores
context for the duration of its callbacks, which isn't really correct, and
means that subsequent operations in the same request can end up without their
logcontexts.
---
 synapse/http/matrixfederationclient.py | 28 ++++++++++++++++------------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index f9e32ef03d..62b4d7e93d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -21,7 +21,7 @@ from twisted.web._newclient import ResponseDone
 
 from synapse.http.endpoint import matrix_federation_endpoint
 from synapse.util.async import sleep
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util import logcontext
 import synapse.metrics
 
 from canonicaljson import encode_canonical_json
@@ -172,8 +172,7 @@ class MatrixFederationHttpClient(object):
 
                     try:
                         def send_request():
-                            request_deferred = preserve_context_over_fn(
-                                self.agent.request,
+                            request_deferred = self.agent.request(
                                 method,
                                 url_bytes,
                                 Headers(headers_dict),
@@ -185,7 +184,8 @@ class MatrixFederationHttpClient(object):
                                 time_out=timeout / 1000. if timeout else 60,
                             )
 
-                        response = yield preserve_context_over_fn(send_request)
+                        with logcontext.PreserveLoggingContext():
+                            response = yield send_request()
 
                         log_result = "%d %s" % (response.code, response.phrase,)
                         break
@@ -242,7 +242,8 @@ class MatrixFederationHttpClient(object):
             else:
                 # :'(
                 # Update transactions table?
-                body = yield preserve_context_over_fn(readBody, response)
+                with logcontext.PreserveLoggingContext():
+                    body = yield readBody(response)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -336,7 +337,8 @@ class MatrixFederationHttpClient(object):
             # We need to update the transactions table to say it was sent?
             check_content_type_is_json(response.headers)
 
-        body = yield preserve_context_over_fn(readBody, response)
+        with logcontext.PreserveLoggingContext():
+            body = yield readBody(response)
         defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
@@ -386,7 +388,8 @@ class MatrixFederationHttpClient(object):
             # We need to update the transactions table to say it was sent?
             check_content_type_is_json(response.headers)
 
-        body = yield preserve_context_over_fn(readBody, response)
+        with logcontext.PreserveLoggingContext():
+            body = yield readBody(response)
 
         defer.returnValue(json.loads(body))
 
@@ -445,7 +448,8 @@ class MatrixFederationHttpClient(object):
             # We need to update the transactions table to say it was sent?
             check_content_type_is_json(response.headers)
 
-        body = yield preserve_context_over_fn(readBody, response)
+        with logcontext.PreserveLoggingContext():
+            body = yield readBody(response)
 
         defer.returnValue(json.loads(body))
 
@@ -498,10 +502,10 @@ class MatrixFederationHttpClient(object):
         headers = dict(response.headers.getAllRawHeaders())
 
         try:
-            length = yield preserve_context_over_fn(
-                _readBodyToFile,
-                response, output_stream, max_size
-            )
+            with logcontext.PreserveLoggingContext():
+                length = yield _readBodyToFile(
+                    response, output_stream, max_size
+                )
         except:
             logger.exception("Failed to download body")
             raise
-- 
cgit 1.4.1


From e71940aa64dd678970299f4ea04ce11781e50f4e Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 24 Mar 2017 10:57:02 +0000
Subject: Use iter(items|values)

---
 synapse/storage/_base.py  | 12 +++++------
 synapse/storage/events.py | 32 ++++++++++++---------------
 synapse/storage/state.py  | 55 ++++++++++++++++++++++++-----------------------
 3 files changed, 48 insertions(+), 51 deletions(-)

diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 93d9ed5d62..c659004e8d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -135,7 +135,7 @@ class PerformanceCounters(object):
 
     def interval(self, interval_duration, limit=3):
         counters = []
-        for name, (count, cum_time) in self.current_counters.items():
+        for name, (count, cum_time) in self.current_counters.iteritems():
             prev_count, prev_time = self.previous_counters.get(name, (0, 0))
             counters.append((
                 (cum_time - prev_time) / interval_duration,
@@ -568,7 +568,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
         else:
             where = ""
 
@@ -715,7 +715,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.items():
+        for key, value in keyvalues.iteritems():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -756,7 +756,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
         else:
             where = ""
 
@@ -873,7 +873,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.items():
+        for key, value in keyvalues.iteritems():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -913,7 +913,7 @@ class SQLBaseStore(object):
         txn.close()
 
         if cache:
-            min_val = min(cache.values())
+            min_val = min(cache.itervalues())
         else:
             min_val = max_value
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 9f2045e8d8..5f4fcd9832 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -217,14 +217,14 @@ class EventsStore(SQLBaseStore):
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
         deferreds = []
-        for room_id, evs_ctxs in partitioned.items():
+        for room_id, evs_ctxs in partitioned.iteritems():
             d = preserve_fn(self._event_persist_queue.add_to_queue)(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
             )
             deferreds.append(d)
 
-        for room_id in partitioned.keys():
+        for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
         return preserve_context_over_deferred(
@@ -323,7 +323,7 @@ class EventsStore(SQLBaseStore):
                                 (event, context)
                             )
 
-                        for room_id, ev_ctx_rm in events_by_room.items():
+                        for room_id, ev_ctx_rm in events_by_room.iteritems():
                             # Work out new extremities by recursively adding and removing
                             # the new events.
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -453,10 +453,10 @@ class EventsStore(SQLBaseStore):
                 missing_event_ids,
             )
 
-            groups = set(event_to_groups.values())
+            groups = set(event_to_groups.itervalues())
             group_to_state = yield self._get_state_for_groups(groups)
 
-            state_sets.extend(group_to_state.values())
+            state_sets.extend(group_to_state.itervalues())
 
         if not new_latest_event_ids:
             current_state = {}
@@ -718,7 +718,7 @@ class EventsStore(SQLBaseStore):
 
     def _update_forward_extremities_txn(self, txn, new_forward_extremities,
                                         max_stream_order):
-        for room_id, new_extrem in new_forward_extremities.items():
+        for room_id, new_extrem in new_forward_extremities.iteritems():
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -736,7 +736,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremities.items()
+                for room_id, new_extrem in new_forward_extremities.iteritems()
                 for ev_id in new_extrem
             ],
         )
@@ -753,7 +753,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremities.items()
+                for room_id, new_extrem in new_forward_extremities.iteritems()
                 for event_id in new_extrem
             ]
         )
@@ -807,7 +807,7 @@ class EventsStore(SQLBaseStore):
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-        for room_id, depth in depth_updates.items():
+        for room_id, depth in depth_updates.iteritems():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
     def _update_outliers_txn(self, txn, events_and_contexts):
@@ -958,14 +958,10 @@ class EventsStore(SQLBaseStore):
             return
 
         def event_dict(event):
-            return {
-                k: v
-                for k, v in event.get_dict().items()
-                if k not in [
-                    "redacted",
-                    "redacted_because",
-                ]
-            }
+            d = event.get_dict()
+            d.pop("redacted", None)
+            d.pop("redacted_because", None)
+            return d
 
         self._simple_insert_many_txn(
             txn,
@@ -1998,7 +1994,7 @@ class EventsStore(SQLBaseStore):
                         "state_key": key[1],
                         "event_id": state_id,
                     }
-                    for key, state_id in curr_state.items()
+                    for key, state_id in curr_state.iteritems()
                 ],
             )
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e8bd02e773..aedd597ded 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -90,7 +90,7 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups)
 
         defer.returnValue(group_to_state)
@@ -108,17 +108,18 @@ class StateStore(SQLBaseStore):
 
         state_event_map = yield self.get_events(
             [
-                ev_id for group_ids in group_to_ids.values()
-                for ev_id in group_ids.values()
+                ev_id for group_ids in group_to_ids.itervalues()
+                for ev_id in group_ids.itervalues()
             ],
             get_prev_content=False
         )
 
         defer.returnValue({
             group: [
-                state_event_map[v] for v in event_id_map.values() if v in state_event_map
+                state_event_map[v] for v in event_id_map.itervalues()
+                if v in state_event_map
             ]
-            for group, event_id_map in group_to_ids.items()
+            for group, event_id_map in group_to_ids.iteritems()
         })
 
     def _have_persisted_state_group_txn(self, txn, state_group):
@@ -190,7 +191,7 @@ class StateStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in context.delta_ids.items()
+                        for key, state_id in context.delta_ids.iteritems()
                     ],
                 )
             else:
@@ -205,7 +206,7 @@ class StateStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in context.current_state_ids.items()
+                        for key, state_id in context.current_state_ids.iteritems()
                     ],
                 )
 
@@ -217,7 +218,7 @@ class StateStore(SQLBaseStore):
                     "state_group": state_group_id,
                     "event_id": event_id,
                 }
-                for event_id, state_group_id in state_groups.items()
+                for event_id, state_group_id in state_groups.iteritems()
             ],
         )
 
@@ -373,11 +374,11 @@ class StateStore(SQLBaseStore):
                         " WHERE state_group = ? %s" % (where_clause,),
                         args
                     )
-                    results[group].update({
-                        (typ, state_key): event_id
+                    results[group].update(
+                        ((typ, state_key), event_id)
                         for typ, state_key, event_id in txn
                         if (typ, state_key) not in results[group]
-                    })
+                    )
 
                     # If the lengths match then we must have all the types,
                     # so no need to go walk further down the tree.
@@ -414,21 +415,21 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         state_event_map = yield self.get_events(
-            [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
+            [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
             get_prev_content=False
         )
 
         event_to_state = {
             event_id: {
                 k: state_event_map[v]
-                for k, v in group_to_state[group].items()
+                for k, v in group_to_state[group].iteritems()
                 if v in state_event_map
             }
-            for event_id, group in event_to_groups.items()
+            for event_id, group in event_to_groups.iteritems()
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -451,12 +452,12 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         event_to_state = {
             event_id: group_to_state[group]
-            for event_id, group in event_to_groups.items()
+            for event_id, group in event_to_groups.iteritems()
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -568,7 +569,7 @@ class StateStore(SQLBaseStore):
         got_all = not (missing_types or types is None)
 
         return {
-            k: v for k, v in state_dict_ids.items()
+            k: v for k, v in state_dict_ids.iteritems()
             if include(k[0], k[1])
         }, missing_types, got_all
 
@@ -627,7 +628,7 @@ class StateStore(SQLBaseStore):
 
             # Now we want to update the cache with all the things we fetched
             # from the database.
-            for group, group_state_dict in group_to_state_dict.items():
+            for group, group_state_dict in group_to_state_dict.iteritems():
                 if types:
                     # We delibrately put key -> None mappings into the cache to
                     # cache absence of the key, on the assumption that if we've
@@ -642,10 +643,10 @@ class StateStore(SQLBaseStore):
                 else:
                     state_dict = results[group]
 
-                state_dict.update({
-                    (intern_string(k[0]), intern_string(k[1])): v
-                    for k, v in group_state_dict.items()
-                })
+                state_dict.update(
+                    ((intern_string(k[0]), intern_string(k[1])), v)
+                    for k, v in group_state_dict.iteritems()
+                )
 
                 self._state_group_cache.update(
                     cache_seq_num,
@@ -656,10 +657,10 @@ class StateStore(SQLBaseStore):
 
         # Remove all the entries with None values. The None values were just
         # used for bookkeeping in the cache.
-        for group, state_dict in results.items():
+        for group, state_dict in results.iteritems():
             results[group] = {
                 key: event_id
-                for key, event_id in state_dict.items()
+                for key, event_id in state_dict.iteritems()
                 if event_id
             }
 
@@ -748,7 +749,7 @@ class StateStore(SQLBaseStore):
                         # of keys
 
                         delta_state = {
-                            key: value for key, value in curr_state.items()
+                            key: value for key, value in curr_state.iteritems()
                             if prev_state.get(key, None) != value
                         }
 
@@ -788,7 +789,7 @@ class StateStore(SQLBaseStore):
                                     "state_key": key[1],
                                     "event_id": state_id,
                                 }
-                                for key, state_id in delta_state.items()
+                                for key, state_id in delta_state.iteritems()
                             ],
                         )
 
-- 
cgit 1.4.1


From d58b1ffe9424453526026e294ac9b6458d31eb9d Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 24 Mar 2017 11:07:02 +0000
Subject: Replace some calls to cursor_to_dict

cursor_to_dict can be surprisinglh expensive for large result sets, so lets
only call it when we need to.
---
 synapse/replication/slave/storage/events.py |  1 -
 synapse/storage/roommember.py               | 43 ++++++-----------------------
 synapse/storage/state.py                    |  8 +++---
 3 files changed, 13 insertions(+), 39 deletions(-)

diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a1e1e54e5b..d4db1e452e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -167,7 +167,6 @@ class SlavedEventStore(BaseSlavedStore):
     _get_rooms_for_user_where_membership_is_txn = (
         DataStore._get_rooms_for_user_where_membership_is_txn.__func__
     )
-    _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
     _get_state_for_groups = DataStore._get_state_for_groups.__func__
     _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
     _get_events_around_txn = DataStore._get_events_around_txn.__func__
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index e38d8927bf..23127d3a95 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -132,14 +132,17 @@ class RoomMemberStore(SQLBaseStore):
     @cached(max_entries=500000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
-
-            rows = self._get_members_rows_txn(
-                txn,
-                room_id=room_id,
-                membership=Membership.JOIN,
+            sql = (
+                "SELECT m.user_id FROM room_memberships as m"
+                " INNER JOIN current_state_events as c"
+                " ON m.event_id = c.event_id "
+                " AND m.room_id = c.room_id "
+                " AND m.user_id = c.state_key"
+                " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
             )
 
-            return [r["user_id"] for r in rows]
+            txn.execute(sql, (room_id, Membership.JOIN,))
+            return [r[0] for r in txn]
         return self.runInteraction("get_users_in_room", f)
 
     @cached()
@@ -246,34 +249,6 @@ class RoomMemberStore(SQLBaseStore):
 
         return results
 
-    def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
-        where_clause = "c.room_id = ?"
-        where_values = [room_id]
-
-        if membership:
-            where_clause += " AND m.membership = ?"
-            where_values.append(membership)
-
-        if user_id:
-            where_clause += " AND m.user_id = ?"
-            where_values.append(user_id)
-
-        sql = (
-            "SELECT m.* FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
-            " AND m.user_id = c.state_key"
-            " WHERE c.type = 'm.room.member' AND %(where)s"
-        ) % {
-            "where": where_clause,
-        }
-
-        txn.execute(sql, where_values)
-        rows = self.cursor_to_dict(txn)
-
-        return rows
-
     @cachedInlineCallbacks(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
         """Returns a set of room_ids the user is currently joined to
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index aedd597ded..314216f039 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -342,10 +342,10 @@ class StateStore(SQLBaseStore):
                     args.extend(where_args)
 
                     txn.execute(sql % (where_clause,), args)
-                    rows = self.cursor_to_dict(txn)
-                    for row in rows:
-                        key = (row["type"], row["state_key"])
-                        results[group][key] = row["event_id"]
+                    for row in txn:
+                        typ, state_key, event_id = row
+                        key = (typ, state_key)
+                        results[group][key] = event_id
         else:
             if types is not None:
                 where_clause = "AND (%s)" % (
-- 
cgit 1.4.1


From 7fc1f1e2b68ad45e74bec122c31e30efd64599af Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 24 Mar 2017 11:46:22 +0000
Subject: Cache hosts in room

---
 synapse/handlers/presence.py  |  3 +--
 synapse/storage/roommember.py | 10 ++++++++++
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 059260a8aa..1ede117c79 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -575,8 +575,7 @@ class PresenceHandler(object):
                 if not local_states:
                     continue
 
-                users = yield self.store.get_users_in_room(room_id)
-                hosts = set(get_domain_from_id(u) for u in users)
+                hosts = yield self.store.get_hosts_in_room(room_id)
 
                 for host in hosts:
                     hosts_to_states.setdefault(host, []).extend(local_states)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 23127d3a95..367dbbbcf6 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -129,6 +129,16 @@ class RoomMemberStore(SQLBaseStore):
         with self._stream_id_gen.get_next() as stream_ordering:
             yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
+    @cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
+    def get_hosts_in_room(self, room_id, cache_context):
+        """Returns the set of all hosts currently in the room
+        """
+        user_ids = yield self.get_users_in_room(
+            room_id, on_invalidate=cache_context.invalidate,
+        )
+        hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
+        defer.returnValue(hosts)
+
     @cached(max_entries=500000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
-- 
cgit 1.4.1


From 48e76979112f9a048a173626be112e4c40a1f489 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 24 Mar 2017 12:57:46 +0000
Subject: Add slave transaction store

---
 synapse/app/client_reader.py    | 2 ++
 synapse/app/media_repository.py | 2 ++
 2 files changed, 4 insertions(+)

diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index a821a6ce62..e4ea3ab933 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -29,6 +29,7 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
 from synapse.storage.client_ips import ClientIpStore
@@ -63,6 +64,7 @@ class ClientReaderSlavedStore(
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
+    TransactionStore,
     BaseSlavedStore,
     ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 3c7984a237..1444e69a42 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -24,6 +24,7 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
@@ -59,6 +60,7 @@ logger = logging.getLogger("synapse.app.media_repository")
 class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
+    TransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
     ClientIpStore,
-- 
cgit 1.4.1


From 09f79aaad02ee640b22e7762f28a0b6885b9593b Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 24 Mar 2017 13:21:08 +0000
Subject: Use presence replication stream to invalidate cache

Instead of using the cache invalidation replication stream to invalidate
the _get_presence_cache, we can instead rely on the presence replication
stream. This reduces the amount of replication traffic considerably.
---
 synapse/replication/slave/storage/presence.py | 1 +
 synapse/storage/presence.py                   | 4 ++--
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 40f6c9a386..e4a2414d78 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -57,5 +57,6 @@ class SlavedPresenceStore(BaseSlavedStore):
                 self.presence_stream_cache.entity_has_changed(
                     user_id, position
                 )
+                self._get_presence_for_user.invalidate((user_id,))
 
         return super(SlavedPresenceStore, self).process_replication(result)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4d1590d2b4..4edfe29585 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -85,8 +85,8 @@ class PresenceStore(SQLBaseStore):
                 self.presence_stream_cache.entity_has_changed,
                 state.user_id, stream_id,
             )
-            self._invalidate_cache_and_stream(
-                txn, self._get_presence_for_user, (state.user_id,)
+            txn.call_after(
+                self._get_presence_for_user, (state.user_id,)
             )
 
         # Actually insert new rows
-- 
cgit 1.4.1


From 987f4945b4dcac7ce55ddede05df4189a36936a3 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 24 Mar 2017 13:28:20 +0000
Subject: Actually call invalidate

---
 synapse/storage/presence.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4edfe29585..9e9d3c2591 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -86,7 +86,7 @@ class PresenceStore(SQLBaseStore):
                 state.user_id, stream_id,
             )
             txn.call_after(
-                self._get_presence_for_user, (state.user_id,)
+                self._get_presence_for_user.invalidate, (state.user_id,)
             )
 
         # Actually insert new rows
-- 
cgit 1.4.1