From c3eb7dd9c572cfcea831e4d5f41b5a34cb3c57e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Feb 2015 11:50:49 +0000 Subject: Add config option to set the soft fd limit on start --- synapse/app/homeserver.py | 20 +++++++++++++++++--- synapse/config/server.py | 7 +++++++ 2 files changed, 24 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index ea20de1434..07386f6f28 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -52,6 +52,7 @@ import synapse import logging import os import re +import resource import subprocess import sqlite3 import syweb @@ -269,6 +270,15 @@ def get_version_string(): return ("Synapse/%s" % (synapse.__version__,)).encode("ascii") +def change_resource_limit(soft_file_no): + try: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) + logger.info("Set file limit to: %d", soft_file_no) + except (ValueError, resource.error) as e: + logger.warn("Failed to set file limit: %s", e) + + def setup(): config = HomeServerConfig.load_config( "Synapse Homeserver", @@ -345,10 +355,11 @@ def setup(): if config.daemonize: print config.pid_file + daemon = Daemonize( app="synapse-homeserver", pid=config.pid_file, - action=run, + action=lambda: run(config), auto_close_fds=False, verbose=True, logger=logger, @@ -356,11 +367,14 @@ def setup(): daemon.start() else: - reactor.run() + run(config) -def run(): +def run(config): with LoggingContext("run"): + if config.soft_file_limit: + change_resource_limit(config.soft_file_limit) + reactor.run() diff --git a/synapse/config/server.py b/synapse/config/server.py index 31e44cc857..a3edf208e9 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -31,6 +31,7 @@ class ServerConfig(Config): self.webclient = True self.manhole = args.manhole self.no_tls = args.no_tls + self.soft_file_limit = args.soft_file_limit if not args.content_addr: host = args.server_name @@ -77,6 +78,12 @@ class ServerConfig(Config): "content repository") server_group.add_argument("--no-tls", action='store_true', help="Don't bind to the https port.") + server_group.add_argument("--soft-file-limit", type=int, default=0, + help="Set the limit on the number of file " + "descriptors synapse can use. Zero " + "is used to indicate synapse should " + "not change the limit from system " + "default.") def read_signing_key(self, signing_key_path): signing_keys = self.read_file(signing_key_path, "signing_key") -- cgit 1.5.1 From 939273c4b0936b95eb14763d8bf1027159fa4cb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Feb 2015 11:53:13 +0000 Subject: Rename resource variable so as to not shadow module import --- synapse/app/homeserver.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 07386f6f28..5f671dfd23 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -141,8 +141,8 @@ class SynapseHomeServer(HomeServer): # instead, we'll store a copy of this mapping so we can actually add # extra resources to existing nodes. See self._resource_id for the key. resource_mappings = {} - for (full_path, resource) in desired_tree: - logger.info("Attaching %s to path %s", resource, full_path) + for full_path, res in desired_tree: + logger.info("Attaching %s to path %s", res, full_path) last_resource = self.root_resource for path_seg in full_path.split('/')[1:-1]: if path_seg not in last_resource.listNames(): @@ -173,12 +173,12 @@ class SynapseHomeServer(HomeServer): child_name) child_resource = resource_mappings[child_res_id] # steal the children - resource.putChild(child_name, child_resource) + res.putChild(child_name, child_resource) # finally, insert the desired resource in the right place - last_resource.putChild(last_path_seg, resource) + last_resource.putChild(last_path_seg, res) res_id = self._resource_id(last_resource, last_path_seg) - resource_mappings[res_id] = resource + resource_mappings[res_id] = res return self.root_resource -- cgit 1.5.1 From 61959928bb4b6b0191d301f1b267af7290a61bd2 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 19 Feb 2015 14:58:07 +0000 Subject: Pull out the 'get_rooms_for_user' cache logic into a reüsable @cached decorator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- synapse/storage/roommember.py | 53 +++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 24 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 9bf608bc90..569bd55d0f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -33,6 +33,32 @@ RoomsForUser = namedtuple( ) +# TODO(paul): +# * Move this somewhere higher-level, shared; +# * more generic key management +# * export monitoring stats +# * maximum size; just evict things at random, or consider LRU? +def cached(orig): + cache = {} + + @defer.inlineCallbacks + def wrapped(self, key): + if key in cache: + defer.returnValue(cache[key]) + + ret = yield orig(self, key) + + cache[key] = ret; + defer.returnValue(ret) + + def invalidate(key): + if key in cache: + del cache[key] + + wrapped.invalidate = invalidate + return wrapped + + class RoomMemberStore(SQLBaseStore): def __init__(self, *args, **kw): @@ -103,7 +129,7 @@ class RoomMemberStore(SQLBaseStore): txn.execute(sql, (event.room_id, domain)) - self.invalidate_rooms_for_user(target_user_id) + self.get_rooms_for_user.invalidate(target_user_id) @defer.inlineCallbacks def get_room_member(self, user_id, room_id): @@ -247,33 +273,12 @@ class RoomMemberStore(SQLBaseStore): results = self._parse_events_txn(txn, rows) return results - # TODO(paul): Create a nice @cached decorator to do this - # @cached - # def get_foo(...) - # ... - # invalidate_foo = get_foo.invalidator - - @defer.inlineCallbacks + @cached def get_rooms_for_user(self, user_id): - # TODO(paul): put some performance counters in here so we can easily - # track what impact this cache is having - if user_id in self._user_rooms_cache: - defer.returnValue(self._user_rooms_cache[user_id]) - - rooms = yield self.get_rooms_for_user_where_membership_is( + return self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], ) - # TODO(paul): Consider applying a maximum size; just evict things at - # random, or consider LRU? - - self._user_rooms_cache[user_id] = rooms - defer.returnValue(rooms) - - def invalidate_rooms_for_user(self, user_id): - if user_id in self._user_rooms_cache: - del self._user_rooms_cache[user_id] - @defer.inlineCallbacks def user_rooms_intersect(self, user_id_list): """ Checks whether all the users whose IDs are given in a list share a -- cgit 1.5.1 From 077d20034278ea57c57d501de11bfb1f0c7f9603 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 19 Feb 2015 17:29:39 +0000 Subject: Move @cached decorator out into synapse.storage._base; add minimal docs --- synapse/storage/_base.py | 35 +++++++++++++++++++++++++++++++++++ synapse/storage/roommember.py | 28 +--------------------------- 2 files changed, 36 insertions(+), 27 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index be9934c66f..fd275039be 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -35,6 +35,41 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +# TODO(paul): +# * Move this somewhere higher-level, shared; +# * more generic key management +# * export monitoring stats +# * maximum size; just evict things at random, or consider LRU? +def cached(orig): + """ A method decorator that applies a memoizing cache around the function. + + The function is presumed to take one additional argument, which is used as + the key for the cache. Cache hits are served directly from the cache; + misses use the function body to generate the value. + + The wrapped function has an additional member, a callable called + "invalidate". This can be used to remove individual entries from the cache. + """ + cache = {} + + @defer.inlineCallbacks + def wrapped(self, key): + if key in cache: + defer.returnValue(cache[key]) + + ret = yield orig(self, key) + + cache[key] = ret; + defer.returnValue(ret) + + def invalidate(key): + if key in cache: + del cache[key] + + wrapped.invalidate = invalidate + return wrapped + + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging to the .execute() method.""" diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 569bd55d0f..b8fcc1927e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -17,7 +17,7 @@ from twisted.internet import defer from collections import namedtuple -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.constants import Membership from synapse.types import UserID @@ -33,32 +33,6 @@ RoomsForUser = namedtuple( ) -# TODO(paul): -# * Move this somewhere higher-level, shared; -# * more generic key management -# * export monitoring stats -# * maximum size; just evict things at random, or consider LRU? -def cached(orig): - cache = {} - - @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: - defer.returnValue(cache[key]) - - ret = yield orig(self, key) - - cache[key] = ret; - defer.returnValue(ret) - - def invalidate(key): - if key in cache: - del cache[key] - - wrapped.invalidate = invalidate - return wrapped - - class RoomMemberStore(SQLBaseStore): def __init__(self, *args, **kw): -- cgit 1.5.1 From ebc3db295bfe6d0c43bf45b8fcd7fa6bbc429375 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 19 Feb 2015 18:36:02 +0000 Subject: Take named arguments to @cached() decorator, add a 'max_entries' limit --- synapse/storage/_base.py | 39 +++++++++++-------- synapse/storage/roommember.py | 2 +- tests/storage/test__base.py | 89 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 16 deletions(-) create mode 100644 tests/storage/test__base.py (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index fd275039be..61657d36ed 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -39,8 +39,8 @@ transaction_logger = logging.getLogger("synapse.storage.txn") # * Move this somewhere higher-level, shared; # * more generic key management # * export monitoring stats -# * maximum size; just evict things at random, or consider LRU? -def cached(orig): +# * consider other eviction strategies - LRU? +def cached(max_entries=1000): """ A method decorator that applies a memoizing cache around the function. The function is presumed to take one additional argument, which is used as @@ -50,24 +50,33 @@ def cached(orig): The wrapped function has an additional member, a callable called "invalidate". This can be used to remove individual entries from the cache. """ - cache = {} + def wrap(orig): + cache = {} - @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: - defer.returnValue(cache[key]) + @defer.inlineCallbacks + def wrapped(self, key): + if key in cache: + defer.returnValue(cache[key]) + + ret = yield orig(self, key) + + while len(cache) > max_entries: + # TODO(paul): This feels too biased. However, a random index + # would be a bit inefficient, walking the list of keys just + # to ignore most of them? + del cache[cache.keys()[0]] - ret = yield orig(self, key) + cache[key] = ret; + defer.returnValue(ret) - cache[key] = ret; - defer.returnValue(ret) + def invalidate(key): + if key in cache: + del cache[key] - def invalidate(key): - if key in cache: - del cache[key] + wrapped.invalidate = invalidate + return wrapped - wrapped.invalidate = invalidate - return wrapped + return wrap class LoggingTransaction(object): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index b8fcc1927e..33a832483e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -247,7 +247,7 @@ class RoomMemberStore(SQLBaseStore): results = self._parse_events_txn(txn, rows) return results - @cached + @cached() def get_rooms_for_user(self, user_id): return self.get_rooms_for_user_where_membership_is( user_id, membership_list=[Membership.JOIN], diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py new file mode 100644 index 0000000000..057f798640 --- /dev/null +++ b/tests/storage/test__base.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import unittest +from twisted.internet import defer + +from synapse.storage._base import cached + + +class CacheDecoratorTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def test_passthrough(self): + @cached() + def func(self, key): + return key + + self.assertEquals((yield func(self, "foo")), "foo") + self.assertEquals((yield func(self, "bar")), "bar") + + @defer.inlineCallbacks + def test_hit(self): + callcount = [0] + + @cached() + def func(self, key): + callcount[0] += 1 + return key + + yield func(self, "foo") + + self.assertEquals(callcount[0], 1) + + self.assertEquals((yield func(self, "foo")), "foo") + self.assertEquals(callcount[0], 1) + + @defer.inlineCallbacks + def test_invalidate(self): + callcount = [0] + + @cached() + def func(self, key): + callcount[0] += 1 + return key + + yield func(self, "foo") + + self.assertEquals(callcount[0], 1) + + func.invalidate("foo") + + yield func(self, "foo") + + self.assertEquals(callcount[0], 2) + + @defer.inlineCallbacks + def test_max_entries(self): + callcount = [0] + + @cached(max_entries=10) + def func(self, key): + callcount[0] += 1 + return key + + for k in range(0,12): + yield func(self, k) + + self.assertEquals(callcount[0], 12) + + # There must have been at least 2 evictions, meaning if we calculate + # all 12 values again, we must get called at least 2 more times + for k in range(0,12): + yield func(self, k) + + self.assertTrue(callcount[0] >= 14, + msg="Expected callcount >= 14, got %d" % (callcount[0])) -- cgit 1.5.1 From 55022d6ca5bfe3e99fd3144e291906063885ce12 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 19 Feb 2015 18:38:09 +0000 Subject: Remove a TODO note --- synapse/storage/_base.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 61657d36ed..78ba5f25ea 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -36,7 +36,6 @@ transaction_logger = logging.getLogger("synapse.storage.txn") # TODO(paul): -# * Move this somewhere higher-level, shared; # * more generic key management # * export monitoring stats # * consider other eviction strategies - LRU? -- cgit 1.5.1 From 7c56210f204ef735444129c4f7b2a393eb7539ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Feb 2015 16:09:44 +0000 Subject: By default set soft limit to hard limit --- synapse/app/homeserver.py | 8 ++++++-- synapse/config/server.py | 10 +++++----- 2 files changed, 11 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 5f671dfd23..89af091f10 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -273,7 +273,12 @@ def get_version_string(): def change_resource_limit(soft_file_no): try: soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + + if not soft_file_no: + soft_file_no = hard + resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) + logger.info("Set file limit to: %d", soft_file_no) except (ValueError, resource.error) as e: logger.warn("Failed to set file limit: %s", e) @@ -372,8 +377,7 @@ def setup(): def run(config): with LoggingContext("run"): - if config.soft_file_limit: - change_resource_limit(config.soft_file_limit) + change_resource_limit(config.soft_file_limit) reactor.run() diff --git a/synapse/config/server.py b/synapse/config/server.py index a3edf208e9..4e4892d40b 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -79,11 +79,11 @@ class ServerConfig(Config): server_group.add_argument("--no-tls", action='store_true', help="Don't bind to the https port.") server_group.add_argument("--soft-file-limit", type=int, default=0, - help="Set the limit on the number of file " - "descriptors synapse can use. Zero " - "is used to indicate synapse should " - "not change the limit from system " - "default.") + help="Set the soft limit on the number of " + "file descriptors synapse can use. " + "Zero is used to indicate synapse " + "should set the soft limit to the hard" + "limit.") def read_signing_key(self, signing_key_path): signing_keys = self.read_file(signing_key_path, "signing_key") -- cgit 1.5.1 From 4631b737fdb08185d514e69f0e6860c0860768b5 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Feb 2015 14:38:44 +0000 Subject: Squash out the now-redundant ApplicationServicesCache object class --- synapse/storage/appservice.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d941b1f387..dc3666efd4 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -23,23 +23,11 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) -class ApplicationServiceCache(object): - """Caches ApplicationServices and provides utility functions on top. - - This class is designed to be invoked on incoming events in order to avoid - hammering the database every time to extract a list of application service - regexes. - """ - - def __init__(self): - self.services = [] - - class ApplicationServiceStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceStore, self).__init__(hs) - self.cache = ApplicationServiceCache() + self.services_cache = [] self.cache_defer = self._populate_cache() @defer.inlineCallbacks @@ -56,7 +44,7 @@ class ApplicationServiceStore(SQLBaseStore): token, ) # update cache TODO: Should this be in the txn? - for service in self.cache.services: + for service in self.services_cache: if service.token == token: service.url = None service.namespaces = None @@ -110,13 +98,13 @@ class ApplicationServiceStore(SQLBaseStore): ) # update cache TODO: Should this be in the txn? - for (index, cache_service) in enumerate(self.cache.services): + for (index, cache_service) in enumerate(self.services_cache): if service.token == cache_service.token: - self.cache.services[index] = service + self.services_cache[index] = service logger.info("Updated: %s", service) return # new entry - self.cache.services.append(service) + self.services_cache.append(service) logger.info("Updated(new): %s", service) def _update_app_service_txn(self, txn, service): @@ -160,7 +148,7 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def get_app_services(self): yield self.cache_defer # make sure the cache is ready - defer.returnValue(self.cache.services) + defer.returnValue(self.services_cache) @defer.inlineCallbacks def get_app_service_by_token(self, token, from_cache=True): @@ -176,7 +164,7 @@ class ApplicationServiceStore(SQLBaseStore): yield self.cache_defer # make sure the cache is ready if from_cache: - for service in self.cache.services: + for service in self.services_cache: if service.token == token: defer.returnValue(service) return @@ -235,7 +223,7 @@ class ApplicationServiceStore(SQLBaseStore): # TODO get last successful txn id f.e. service for service in services.values(): logger.info("Found application service: %s", service) - self.cache.services.append(ApplicationService( + self.services_cache.append(ApplicationService( token=service["token"], url=service["url"], namespaces=service["namespaces"], -- cgit 1.5.1 From 22399d3d8f5931c4302a9df37ebde94f5d1fc277 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 23 Feb 2015 15:14:56 +0000 Subject: Add RegisterFallbackResource to /_matrix/static/client/register Try to keep both forms of registration logic (native/fallback) close together for sanity. --- synapse/api/urls.py | 1 + synapse/app/homeserver.py | 2 ++ synapse/rest/client/v1/__init__.py | 8 ++++++++ synapse/rest/client/v1/register.py | 21 +++++++++++++++++++++ 4 files changed, 32 insertions(+) (limited to 'synapse') diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 9485719332..3d43674625 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -18,6 +18,7 @@ CLIENT_PREFIX = "/_matrix/client/api/v1" CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha" FEDERATION_PREFIX = "/_matrix/federation/v1" +STATIC_PREFIX = "/_matrix/static" WEB_CLIENT_PREFIX = "/_matrix/client" CONTENT_REPO_PREFIX = "/_matrix/content" SERVER_KEY_PREFIX = "/_matrix/key/v1" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index ea20de1434..137293bd69 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -125,6 +125,8 @@ class SynapseHomeServer(HomeServer): (MEDIA_PREFIX, self.get_resource_for_media_repository()), (APP_SERVICE_PREFIX, self.get_resource_for_app_services()), ] + desired_tree += self.get_resource_for_client().get_extra_resources(self) + if web_client: logger.info("Adding the web client.") desired_tree.append((WEB_CLIENT_PREFIX, diff --git a/synapse/rest/client/v1/__init__.py b/synapse/rest/client/v1/__init__.py index 21876b3487..1ebdeadca5 100644 --- a/synapse/rest/client/v1/__init__.py +++ b/synapse/rest/client/v1/__init__.py @@ -28,6 +28,14 @@ class ClientV1RestResource(JsonResource): JsonResource.__init__(self, hs) self.register_servlets(self, hs) + def get_extra_resources(self, hs): + # some parts of client v1 need to produce HTML as the output (e.g. + # fallback pages) but we can only produce JSON output. In an effort to + # keep similar logic close together, we'll call through to any servlet + # which requires HTML output. + register_resources = register.get_prefixes_and_resources(hs) + return register_resources + @staticmethod def register_servlets(client_resource, hs): room.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index f5acfb945f..eee567a583 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -18,10 +18,12 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, Codes from synapse.api.constants import LoginType +from synapse.api.urls import STATIC_PREFIX from base import ClientV1RestServlet, client_path_pattern import synapse.util.stringutils as stringutils from synapse.util.async import run_on_reactor +from twisted.web.resource import Resource from hashlib import sha1 import hmac @@ -305,6 +307,16 @@ class RegisterRestServlet(ClientV1RestServlet): }) +class RegisterFallbackResource(Resource): + + def __init__(self, hs): + Resource.__init__(self) # Resource is an old-style class :( + self.hs = hs + + def render_GET(self, request): + return "NOT_YET_IMPLEMENTED" + + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -315,5 +327,14 @@ def _parse_json(request): raise SynapseError(400, "Content not JSON.") +def get_prefixes_and_resources(hs): + return [ + ( + STATIC_PREFIX + "/client/register", + RegisterFallbackResource(hs) + ) + ] + + def register_servlets(hs, http_server): RegisterRestServlet(hs).register(http_server) -- cgit 1.5.1 From 0696dfd94b5d32679009368508d26ae1a6630994 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 23 Feb 2015 15:35:09 +0000 Subject: Actually treat this as static content, not random Resources. --- synapse/app/homeserver.py | 8 ++++++-- synapse/rest/client/v1/__init__.py | 8 -------- synapse/rest/client/v1/register.py | 21 --------------------- synapse/server.py | 1 + 4 files changed, 7 insertions(+), 31 deletions(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 137293bd69..688bf8d967 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -36,7 +36,8 @@ from synapse.http.server_key_resource import LocalKey from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.api.urls import ( CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX + SERVER_KEY_PREFIX, MEDIA_PREFIX, CLIENT_V2_ALPHA_PREFIX, APP_SERVICE_PREFIX, + STATIC_PREFIX ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory @@ -81,6 +82,9 @@ class SynapseHomeServer(HomeServer): webclient_path = os.path.join(syweb_path, "webclient") return File(webclient_path) # TODO configurable? + def build_resource_for_static_content(self): + return File("static") + def build_resource_for_content_repo(self): return ContentRepoResource( self, self.upload_dir, self.auth, self.content_addr @@ -124,8 +128,8 @@ class SynapseHomeServer(HomeServer): (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), (MEDIA_PREFIX, self.get_resource_for_media_repository()), (APP_SERVICE_PREFIX, self.get_resource_for_app_services()), + (STATIC_PREFIX, self.get_resource_for_static_content()) ] - desired_tree += self.get_resource_for_client().get_extra_resources(self) if web_client: logger.info("Adding the web client.") diff --git a/synapse/rest/client/v1/__init__.py b/synapse/rest/client/v1/__init__.py index 1ebdeadca5..21876b3487 100644 --- a/synapse/rest/client/v1/__init__.py +++ b/synapse/rest/client/v1/__init__.py @@ -28,14 +28,6 @@ class ClientV1RestResource(JsonResource): JsonResource.__init__(self, hs) self.register_servlets(self, hs) - def get_extra_resources(self, hs): - # some parts of client v1 need to produce HTML as the output (e.g. - # fallback pages) but we can only produce JSON output. In an effort to - # keep similar logic close together, we'll call through to any servlet - # which requires HTML output. - register_resources = register.get_prefixes_and_resources(hs) - return register_resources - @staticmethod def register_servlets(client_resource, hs): room.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index eee567a583..f5acfb945f 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -18,12 +18,10 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, Codes from synapse.api.constants import LoginType -from synapse.api.urls import STATIC_PREFIX from base import ClientV1RestServlet, client_path_pattern import synapse.util.stringutils as stringutils from synapse.util.async import run_on_reactor -from twisted.web.resource import Resource from hashlib import sha1 import hmac @@ -307,16 +305,6 @@ class RegisterRestServlet(ClientV1RestServlet): }) -class RegisterFallbackResource(Resource): - - def __init__(self, hs): - Resource.__init__(self) # Resource is an old-style class :( - self.hs = hs - - def render_GET(self, request): - return "NOT_YET_IMPLEMENTED" - - def _parse_json(request): try: content = json.loads(request.content.read()) @@ -327,14 +315,5 @@ def _parse_json(request): raise SynapseError(400, "Content not JSON.") -def get_prefixes_and_resources(hs): - return [ - ( - STATIC_PREFIX + "/client/register", - RegisterFallbackResource(hs) - ) - ] - - def register_servlets(hs, http_server): RegisterRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index ba2b2593f1..cb8610a1b4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -73,6 +73,7 @@ class BaseHomeServer(object): 'resource_for_client', 'resource_for_client_v2_alpha', 'resource_for_federation', + 'resource_for_static_content', 'resource_for_web_client', 'resource_for_content_repo', 'resource_for_server_key', -- cgit 1.5.1 From e76d485e29498fce7412423e7a5b6ac6bc287ec3 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Feb 2015 15:41:54 +0000 Subject: Allow @cached-wrapped functions to have a prefill method for setting entries --- synapse/storage/_base.py | 23 +++++++++++++++-------- tests/storage/test__base.py | 14 ++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 78ba5f25ea..4b1ec687c9 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -48,24 +48,30 @@ def cached(max_entries=1000): The wrapped function has an additional member, a callable called "invalidate". This can be used to remove individual entries from the cache. + + The wrapped function has another additional callable, called "prefill", + which can be used to insert values into the cache specifically, without + calling the calculation function. """ def wrap(orig): cache = {} - @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: - defer.returnValue(cache[key]) - - ret = yield orig(self, key) - + def prefill(key, value): while len(cache) > max_entries: # TODO(paul): This feels too biased. However, a random index # would be a bit inefficient, walking the list of keys just # to ignore most of them? del cache[cache.keys()[0]] - cache[key] = ret; + cache[key] = value + + @defer.inlineCallbacks + def wrapped(self, key): + if key in cache: + defer.returnValue(cache[key]) + + ret = yield orig(self, key) + prefill(key, ret) defer.returnValue(ret) def invalidate(key): @@ -73,6 +79,7 @@ def cached(max_entries=1000): del cache[key] wrapped.invalidate = invalidate + wrapped.prefill = prefill return wrapped return wrap diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 057f798640..fb306cb784 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -87,3 +87,17 @@ class CacheDecoratorTestCase(unittest.TestCase): self.assertTrue(callcount[0] >= 14, msg="Expected callcount >= 14, got %d" % (callcount[0])) + + @defer.inlineCallbacks + def test_prefill(self): + callcount = [0] + + @cached() + def func(self, key): + callcount[0] += 1 + return key + + func.prefill("foo", 123) + + self.assertEquals((yield func(self, "foo")), 123) + self.assertEquals(callcount[0], 0) -- cgit 1.5.1 From 357fba2c24067796ce89f25636a2541bc9a10752 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Feb 2015 15:57:41 +0000 Subject: RoomMemberStore no longer needs a _user_rooms_cache member --- synapse/storage/roommember.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 33a832483e..58aa376c20 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,11 +35,6 @@ RoomsForUser = namedtuple( class RoomMemberStore(SQLBaseStore): - def __init__(self, *args, **kw): - super(RoomMemberStore, self).__init__(*args, **kw) - - self._user_rooms_cache = {} - def _store_room_member_txn(self, txn, event): """Store a room member in the database. """ -- cgit 1.5.1 From 044d813ef7bb67e7b680087d82f931e4c780218f Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Feb 2015 16:04:40 +0000 Subject: Use the @cached decorator to implement the destination_retry_timings cache --- synapse/storage/transactions.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index e06ef35690..6cac8d01ac 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, Table +from ._base import SQLBaseStore, Table, cached from collections import namedtuple @@ -28,10 +28,6 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - # a write-through cache of DestinationsTable.EntryType indexed by - # destination string - destination_retry_cache = {} - def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -211,6 +207,7 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) + @cached() def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. @@ -221,9 +218,6 @@ class TransactionStore(SQLBaseStore): None if not retrying Otherwise a DestinationsTable.EntryType for the retry scheme """ - if destination in self.destination_retry_cache: - return defer.succeed(self.destination_retry_cache[destination]) - return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) @@ -250,7 +244,9 @@ class TransactionStore(SQLBaseStore): retry_interval (int) - how long until next retry in ms """ - self.destination_retry_cache[destination] = ( + # As this is the new value, we might as well prefill the cache + self.get_destination_retry_timings.prefill( + destination, DestinationsTable.EntryType( destination, retry_last_ts, -- cgit 1.5.1 From a09e59a69830ba99d65f54b2385c3cab341accb0 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Feb 2015 16:55:57 +0000 Subject: Pull the _get_event_cache.setdefault() call out of the try block, as it doesn't need to be there and is confusing --- synapse/storage/_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4b1ec687c9..84f222b3db 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -636,8 +636,9 @@ class SQLBaseStore(object): start_time = time.time() * 1000 update_counter = self._get_event_counters.update + cache = self._get_event_cache.setdefault(event_id, {}) + try: - cache = self._get_event_cache.setdefault(event_id, {}) # Separate cache entries for each way to invoke _get_event_txn return cache[(check_redacted, get_prev_content, allow_rejected)] except KeyError: -- cgit 1.5.1 From 28d8614f4879a4e5276c1c14190ad1b4ca069350 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 23 Feb 2015 17:36:37 +0000 Subject: Trailing comma --- synapse/app/homeserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 688bf8d967..2d6f6ac89d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -128,7 +128,7 @@ class SynapseHomeServer(HomeServer): (SERVER_KEY_PREFIX, self.get_resource_for_server_key()), (MEDIA_PREFIX, self.get_resource_for_media_repository()), (APP_SERVICE_PREFIX, self.get_resource_for_app_services()), - (STATIC_PREFIX, self.get_resource_for_static_content()) + (STATIC_PREFIX, self.get_resource_for_static_content()), ] if web_client: -- cgit 1.5.1 From 74048bdd41108e0c98e034b6ebc5890fd3bcf92b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 23 Feb 2015 18:17:43 +0000 Subject: Remove unused import --- synapse/storage/transactions.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6cac8d01ac..0b8a3b7a07 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,8 +17,6 @@ from ._base import SQLBaseStore, Table, cached from collections import namedtuple -from twisted.internet import defer - import logging logger = logging.getLogger(__name__) -- cgit 1.5.1 From 27080698e7e9c7d8bb8dfe473f888abdfc48fee7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 23 Feb 2015 18:19:13 +0000 Subject: Fix code style warning --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 84f222b3db..42edb56c36 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -79,7 +79,7 @@ def cached(max_entries=1000): del cache[key] wrapped.invalidate = invalidate - wrapped.prefill = prefill + wrapped.prefill = prefill return wrapped return wrap -- cgit 1.5.1 From f53fcbce9789186c1c42fe2f93ba46e3d8720b1b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Feb 2015 18:29:26 +0000 Subject: Use cache.pop() instead of a separate membership test + del [] --- synapse/storage/_base.py | 3 +-- tests/storage/test__base.py | 7 +++++++ 2 files changed, 8 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 42edb56c36..da698cb3b8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -75,8 +75,7 @@ def cached(max_entries=1000): defer.returnValue(ret) def invalidate(key): - if key in cache: - del cache[key] + cache.pop(key, None) wrapped.invalidate = invalidate wrapped.prefill = prefill diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index fb306cb784..55d22f665a 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -66,6 +66,13 @@ class CacheDecoratorTestCase(unittest.TestCase): self.assertEquals(callcount[0], 2) + def test_invalidate_missing(self): + @cached() + def func(self, key): + return key + + func.invalidate("what") + @defer.inlineCallbacks def test_max_entries(self): callcount = [0] -- cgit 1.5.1 From 9640510de206d673e4c0c9cbd1cef219bcd488b2 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Feb 2015 18:41:58 +0000 Subject: Use OrderedDict for @cached backing store, so we can evict the oldest key unbiased --- synapse/storage/_base.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index da698cb3b8..c98dd36aed 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -23,7 +23,7 @@ from synapse.util.lrucache import LruCache from twisted.internet import defer -import collections +from collections import namedtuple, OrderedDict import simplejson as json import sys import time @@ -54,14 +54,11 @@ def cached(max_entries=1000): calling the calculation function. """ def wrap(orig): - cache = {} + cache = OrderedDict() def prefill(key, value): while len(cache) > max_entries: - # TODO(paul): This feels too biased. However, a random index - # would be a bit inefficient, walking the list of keys just - # to ignore most of them? - del cache[cache.keys()[0]] + cache.popitem(last=False) cache[key] = value @@ -836,7 +833,7 @@ class JoinHelper(object): for table in self.tables: res += [f for f in table.fields if f not in res] - self.EntryType = collections.namedtuple("JoinHelperEntry", res) + self.EntryType = namedtuple("JoinHelperEntry", res) def get_fields(self, **prefixes): """Get a string representing a list of fields for use in SELECT -- cgit 1.5.1 From 443ba4eecc59e3dbddfaf5b925408b40e3581800 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 24 Feb 2015 15:00:12 +0000 Subject: %s for strings otherwise you end up sending 'u"foo"' --- synapse/handlers/directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 20ab9e269c..11d20a5d2d 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -160,7 +160,7 @@ class DirectoryHandler(BaseHandler): if not room_id: raise SynapseError( 404, - "Room alias %r not found" % (room_alias.to_string(),), + "Room alias %s not found" % (room_alias.to_string(),), Codes.NOT_FOUND ) -- cgit 1.5.1 From 255f989c7bc20d17ec37fed00cb2d107553d6f73 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 24 Feb 2015 20:57:58 +0000 Subject: turns uris config options should append since it's a list --- synapse/config/voip.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/config/voip.py b/synapse/config/voip.py index a2b822719f..65162d21b7 100644 --- a/synapse/config/voip.py +++ b/synapse/config/voip.py @@ -28,7 +28,7 @@ class VoipConfig(Config): super(VoipConfig, cls).add_arguments(parser) group = parser.add_argument_group("voip") group.add_argument( - "--turn-uris", type=str, default=None, + "--turn-uris", type=str, default=None, action='append', help="The public URIs of the TURN server to give to clients" ) group.add_argument( -- cgit 1.5.1 From a025055643638ddb612ac16cf8deff8ae19ab1e0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 25 Feb 2015 14:02:38 +0000 Subject: SYWEB-278 Don't allow rules with no rule_id. --- synapse/rest/client/v1/push_rule.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index b012f31084..73ba0494e6 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -214,7 +214,7 @@ def _rule_spec_from_path(path): template = path[0] path = path[1:] - if len(path) == 0: + if len(path) == 0 or len(path[0]) == 0: raise UnrecognizedRequestError() rule_id = path[0] -- cgit 1.5.1 From 2d20466f9a1349c97d5a3822eb4ee64f19bbdf27 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 15:00:59 +0000 Subject: Add stub functions and work out execution flow to implement AS event stream polling. --- synapse/handlers/events.py | 3 --- synapse/handlers/room.py | 34 +++++++++++++++++++++++++--------- synapse/storage/appservice.py | 19 +++++++++++++++++++ synapse/storage/stream.py | 21 +++++++++++++++++++++ 4 files changed, 65 insertions(+), 12 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 025e7e7e62..8d5f5c8499 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - if pagin_config.from_token is None: - pagin_config.from_token = None - rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 914742d913..a8b0c95636 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler): def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]): """Returns a list of roomids that the user has any of the given membership states in.""" - rooms = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user.to_string(), membership_list=membership_list + + app_service = yield self.store.get_app_service_by_user_id( + user.to_string() ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + else: + rooms = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user.to_string(), membership_list=membership_list + ) # For some reason the list of events contains duplicates # TODO(paul): work out why because I really don't think it should @@ -559,13 +566,22 @@ class RoomEventSource(object): to_key = yield self.get_current_key() - events, end_key = yield self.store.get_room_events_stream( - user_id=user.to_string(), - from_key=from_key, - to_key=to_key, - room_id=None, - limit=limit, - ) + app_service = self.store.get_app_service_by_user_id(user.to_string()) + if app_service: + events, end_key = yield self.store.get_appservice_room_stream( + service=app_service, + from_key=from_key, + to_key=to_key, + limit=limit, + ) + else: + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_key, + to_key=to_key, + room_id=None, + limit=limit, + ) defer.returnValue((events, end_key)) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index dc3666efd4..435ccfd6fc 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.appservice import ApplicationService +from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -150,6 +151,16 @@ class ApplicationServiceStore(SQLBaseStore): yield self.cache_defer # make sure the cache is ready defer.returnValue(self.services_cache) + @defer.inlineCallbacks + def get_app_service_by_user_id(self, user_id): + yield self.cache_defer # make sure the cache is ready + + for service in self.services_cache: + if service.sender == user_id: + defer.returnValue(service) + return + defer.returnValue(None) + @defer.inlineCallbacks def get_app_service_by_token(self, token, from_cache=True): """Get the application service with the given token. @@ -173,6 +184,14 @@ class ApplicationServiceStore(SQLBaseStore): # TODO: The from_cache=False impl # TODO: This should be JOINed with the application_services_regex table. + @defer.inlineCallbacks + def get_app_service_rooms(self, service): + logger.info("get_app_service_rooms -> %s", service) + + # TODO stub + yield self.cache_defer + defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) + @defer.inlineCallbacks def _populate_cache(self): """Populates the ApplicationServiceCache from the database.""" diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3ccb6f8a61..aa3c9f8c9c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -127,6 +127,27 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + + def get_appservice_room_stream(self, service, from_key, to_key, limit=0): + # NB this lives here instead of appservice.py so we can reuse the + # 'private' StreamToken class in this file. + logger.info("get_appservice_room_stream -> %s", service) + + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) + + if from_key == to_key: + return defer.succeed(([], to_key)) + + # TODO stub + return defer.succeed(([], to_key)) + @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): -- cgit 1.5.1 From 2b8ca84296b228b7cef09244605e4f2760349538 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:15:25 +0000 Subject: Add support for extracting matching room_ids and room_aliases for a given AS. --- synapse/storage/appservice.py | 50 +++++++++++++++++++++++++++++++++++++++++-- synapse/storage/directory.py | 23 ++++++++++++++++++++ synapse/storage/room.py | 11 ++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 435ccfd6fc..c8f0ce44f4 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -153,6 +153,19 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def get_app_service_by_user_id(self, user_id): + """Retrieve an application service from their user ID. + + All application services have associated with them a particular user ID. + There is no distinguishing feature on the user ID which indicates it + represents an application service. This function allows you to map from + a user ID to an application service. + + Args: + user_id(str): The user ID to see if it is an application service. + Returns: + synapse.appservice.ApplicationService or None. + """ + yield self.cache_defer # make sure the cache is ready for service in self.services_cache: @@ -163,7 +176,7 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def get_app_service_by_token(self, token, from_cache=True): - """Get the application service with the given token. + """Get the application service with the given appservice token. Args: token (str): The application service token. @@ -186,10 +199,43 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def get_app_service_rooms(self, service): - logger.info("get_app_service_rooms -> %s", service) + """Get a list of RoomsForUser for this application service. + + Application services may be "interested" in lots of rooms depending on + the room ID, the room aliases, or the members in the room. This function + takes all of these into account and returns a list of RoomsForUser which + represent the entire list of room IDs that this application service + wants to know about. + + Args: + service: The application service to get a room list for. + Returns: + A list of RoomsForUser. + """ + # FIXME: This is assuming that this store has methods from + # RoomStore, DirectoryStore, which is a bad assumption to + # make as it makes testing trickier and coupling less obvious. + + # get all rooms matching the room ID regex. + room_entries = yield self.get_all_rooms() # RoomEntry list + matching_room_id_list = [ + r.room_id for r in room_entries if + service.is_interested_in_room(r.room_id) + ] + + # resolve room IDs for matching room alias regex. + room_alias_mappings = yield self.get_all_associations() + matching_alias_list = [ + r.room_id for r in room_alias_mappings if + service.is_interested_in_alias(r.room_alias) + ] + + # get all rooms for every user for this AS. # TODO stub yield self.cache_defer + + defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) @defer.inlineCallbacks diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 68b7d59693..e13b336934 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -134,6 +134,29 @@ class DirectoryStore(SQLBaseStore): return room_id + @defer.inlineCallbacks + def get_all_associations(self): + """Retrieve the entire list of room alias -> room ID pairings. + + Returns: + A list of RoomAliasMappings. + """ + results = self._simple_select_list( + "room_aliases", + None, + ["room_alias", "room_id"] + ) + # TODO(kegan): It feels wrong to be specifying no servers here, but + # equally this function isn't required to obtain all servers so + # retrieving them "just for the sake of it" also seems wrong, but we + # want to conform to passing Objects around and not dicts.. + return [ + RoomAliasMapping( + room_id=r["room_id"], room_alias=r["room_alias"], servers="" + ) for r in results + ] + + def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 750b17a45f..3a64693404 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -71,6 +71,17 @@ class RoomStore(SQLBaseStore): RoomsTable.decode_single_result, query, room_id, ) + def get_all_rooms(self): + """Retrieve all the rooms. + + Returns: + A list of namedtuples containing the room information. + """ + query = RoomsTable.select_statement() + return self._execute( + RoomsTable.decode_results, query, + ) + @defer.inlineCallbacks def get_rooms(self, is_public): """Retrieve a list of all public rooms. -- cgit 1.5.1 From 2c79c4dc7f638f1cb823903a2f8bb1005fda4a2c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:37:14 +0000 Subject: Fix alias query. --- synapse/storage/directory.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index e13b336934..70c8c8ccd3 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -141,20 +141,19 @@ class DirectoryStore(SQLBaseStore): Returns: A list of RoomAliasMappings. """ - results = self._simple_select_list( - "room_aliases", - None, - ["room_alias", "room_id"] + results = yield self._execute_and_decode( + "SELECT room_id, room_alias FROM room_aliases" ) + # TODO(kegan): It feels wrong to be specifying no servers here, but # equally this function isn't required to obtain all servers so # retrieving them "just for the sake of it" also seems wrong, but we # want to conform to passing Objects around and not dicts.. - return [ + defer.returnValue([ RoomAliasMapping( room_id=r["room_id"], room_alias=r["room_alias"], servers="" ) for r in results - ] + ]) def get_aliases_for_room(self, room_id): -- cgit 1.5.1 From 978ce87c86e60fd49f078d5bea79715abea6d236 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:37:48 +0000 Subject: Comment unused variables. --- synapse/storage/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index aa3c9f8c9c..3c8f3320f1 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -139,8 +139,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_id = _StreamToken.parse_stream_token(from_key) - to_id = _StreamToken.parse_stream_token(to_key) + # from_id = _StreamToken.parse_stream_token(from_key) + # to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: return defer.succeed(([], to_key)) -- cgit 1.5.1 From 29267cf9d7fbacdfcccaaef9160657f24b9aca14 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:42:28 +0000 Subject: PEP8 and pyflakes --- synapse/storage/appservice.py | 8 +++----- synapse/storage/directory.py | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c8f0ce44f4..017b6d1e86 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -229,12 +229,10 @@ class ApplicationServiceStore(SQLBaseStore): r.room_id for r in room_alias_mappings if service.is_interested_in_alias(r.room_alias) ] + logging.debug(matching_alias_list) + logging.debug(matching_room_id_list) - # get all rooms for every user for this AS. - - # TODO stub - yield self.cache_defer - + # TODO get all rooms for every user for this AS. defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 70c8c8ccd3..e391239a3c 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -155,7 +155,6 @@ class DirectoryStore(SQLBaseStore): ) for r in results ]) - def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", -- cgit 1.5.1 From 92478e96d6f6992146102599ca96b8dcacbf3895 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 14:35:28 +0000 Subject: Finish impl to extract all room IDs an AS may be interested in when polling the event stream. --- synapse/storage/appservice.py | 35 +++++++++++++++++++++++++++++------ synapse/storage/registration.py | 7 +++++++ 2 files changed, 36 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 017b6d1e86..d0632d55d1 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -213,8 +213,9 @@ class ApplicationServiceStore(SQLBaseStore): A list of RoomsForUser. """ # FIXME: This is assuming that this store has methods from - # RoomStore, DirectoryStore, which is a bad assumption to - # make as it makes testing trickier and coupling less obvious. + # RoomStore, DirectoryStore, RegistrationStore, RoomMemberStore which is + # a bad assumption to make as it makes testing trickier and coupling + # less obvious. # get all rooms matching the room ID regex. room_entries = yield self.get_all_rooms() # RoomEntry list @@ -229,12 +230,34 @@ class ApplicationServiceStore(SQLBaseStore): r.room_id for r in room_alias_mappings if service.is_interested_in_alias(r.room_alias) ] - logging.debug(matching_alias_list) - logging.debug(matching_room_id_list) + room_ids_matching_alias_or_id = set( + matching_room_id_list + matching_alias_list + ) - # TODO get all rooms for every user for this AS. + # get all rooms for every user for this AS. This is scoped to users on + # this HS only. + user_list = yield self.get_all_users() + user_list = [ + u["name"] for u in user_list if + service.is_interested_in_user(u["name"]) + ] + rooms_for_user_matching_user_id = [] # RoomsForUser list + for user_id in user_list: + rooms_for_user = yield self.get_rooms_for_user(user_id) + rooms_for_user_matching_user_id += rooms_for_user + rooms_for_user_matching_user_id = set(rooms_for_user_matching_user_id) + + # make RoomsForUser tuples for room ids and aliases which are not in the + # main rooms_for_user_list - e.g. they are rooms which do not have AS + # registered users in it. + known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id] + missing_rooms_for_user = [ + RoomsForUser(r, service.sender, "join") for r in + room_ids_matching_alias_or_id if r not in known_room_ids + ] + rooms_for_user_matching_user_id |= set(missing_rooms_for_user) - defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) + defer.returnValue(rooms_for_user_matching_user_id) @defer.inlineCallbacks def _populate_cache(self): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 029b07cc66..7aff3dbd33 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -92,6 +92,13 @@ class RegistrationStore(SQLBaseStore): query, user_id ) + def get_all_users(self): + query = ("SELECT users.name FROM users") + return self._execute( + self.cursor_to_dict, + query + ) + def get_user_by_token(self, token): """Get a user from the given access token. -- cgit 1.5.1 From 93d90765c4b09bc870fc91c6ddcf21fd4389659d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Feb 2015 16:15:26 +0000 Subject: Initial implementation of federation server rate limiting --- synapse/federation/transport/__init__.py | 12 ++- synapse/federation/transport/server.py | 175 ++++++++++++++++++++++++++++++- 2 files changed, 182 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 6800ac46c5..7028ca6947 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -21,7 +21,7 @@ support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ -from .server import TransportLayerServer +from .server import TransportLayerServer, FederationRateLimiter from .client import TransportLayerClient @@ -55,8 +55,18 @@ class TransportLayer(TransportLayerServer, TransportLayerClient): send requests """ self.keyring = homeserver.get_keyring() + self.clock = homeserver.get_clock() self.server_name = server_name self.server = server self.client = client self.request_handler = None self.received_handler = None + + self.ratelimiter = FederationRateLimiter( + self.clock, + window_size=10000, + sleep_limit=10, + sleep_msec=500, + reject_limit=50, + concurrent_requests=3, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2ffb37aa18..a9e625f127 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,9 +16,11 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, SynapseError, LimitExceededError +from synapse.util.async import sleep from synapse.util.logutils import log_function +import collections import logging import simplejson as json import re @@ -27,6 +29,163 @@ import re logger = logging.getLogger(__name__) +class FederationRateLimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.ratelimiters = {} + + def ratelimit(self, host): + return self.ratelimiters.setdefault( + host, + PerHostRatelimiter( + clock=self.clock, + window_size=self.window_size, + sleep_limit=self.sleep_limit, + sleep_msec=self.sleep_msec, + reject_limit=self.reject_limit, + concurrent_requests=self.concurrent_requests, + ) + ).ratelimit() + + +class PerHostRatelimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.sleeping_requests = set() + self.ready_request_queue = collections.OrderedDict() + self.current_processing = set() + self.request_times = [] + + def is_empty(self): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + return not ( + self.ready_request_queue + or self.sleeping_requests + or self.current_processing + or self.request_times + ) + + def ratelimit(self): + request_id = object() + + def on_enter(): + return self._on_enter(request_id) + + def on_exit(exc_type, exc_val, exc_tb): + return self._on_exit(request_id) + + return ContextManagerFunction(on_enter, on_exit) + + def _on_enter(self, request_id): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + if queue_size > self.reject_limit: + raise LimitExceededError( + retry_after_ms=int( + self.window_size / self.sleep_limit + ), + ) + + self.request_times.append(time_now) + + def queue_request(): + if len(self.current_processing) > self.concurrent_requests: + logger.debug("Ratelimit [%s]: Queue req", id(request_id)) + queue_defer = defer.Deferred() + self.ready_request_queue[request_id] = queue_defer + return queue_defer + else: + return defer.succeed(None) + + logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + + if len(self.request_times) > self.sleep_limit: + logger.debug("Ratelimit [%s]: sleeping req", id(request_id)) + ret_defer = sleep(self.sleep_msec/1000.0) + + self.sleeping_requests.add(request_id) + + def on_wait_finished(_): + logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + self.sleeping_requests.discard(request_id) + queue_defer = queue_request() + return queue_defer + + ret_defer.addBoth(on_wait_finished) + else: + ret_defer = queue_request() + + def on_start(r): + logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + self.current_processing.add(request_id) + return r + + def on_err(r): + self.current_processing.discard(request_id) + return r + + def on_both(r): + # Ensure that we've properly cleaned up. + self.sleeping_requests.discard(request_id) + self.ready_request_queue.pop(request_id, None) + return r + + ret_defer.addCallbacks(on_start, on_err) + ret_defer.addBoth(on_both) + return ret_defer + + def _on_exit(self, request_id): + logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + self.current_processing.discard(request_id) + try: + request_id, deferred = self.ready_request_queue.popitem() + self.current_processing.add(request_id) + deferred.callback(None) + except KeyError: + pass + + +class ContextManagerFunction(object): + def __init__(self, on_enter, on_exit): + self.on_enter = on_enter + self.on_exit = on_exit + + def __enter__(self): + if self.on_enter: + return self.on_enter() + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.on_exit: + return self.on_exit(exc_type, exc_val, exc_tb) + + class TransportLayerServer(object): """Handles incoming federation HTTP requests""" @@ -98,15 +257,23 @@ class TransportLayerServer(object): def new_handler(request, *args, **kwargs): try: (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) + with self.ratelimiter.ratelimit(origin) as d: + yield d + response = yield handler( + origin, content, request.args, *args, **kwargs + ) except: logger.exception("_authenticate_request failed") raise defer.returnValue(response) return new_handler + def rate_limit_origin(self, handler): + def new_handler(origin, *args, **kwargs): + response = yield handler(origin, *args, **kwargs) + defer.returnValue(response) + return new_handler() + @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. -- cgit 1.5.1 From dcec7175dc30754603618351b59bc72ff41d305b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 16:23:01 +0000 Subject: Finish impl to get new events for AS. ASes should now be able to poll /events --- synapse/handlers/room.py | 4 ++- synapse/storage/stream.py | 62 +++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 58 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a8b0c95636..80f7ee3f12 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -566,7 +566,9 @@ class RoomEventSource(object): to_key = yield self.get_current_key() - app_service = self.store.get_app_service_by_user_id(user.to_string()) + app_service = yield self.store.get_app_service_by_user_id( + user.to_string() + ) if app_service: events, end_key = yield self.store.get_appservice_room_stream( service=app_service, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3c8f3320f1..6946e9fe70 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the # 'private' StreamToken class in this file. - logger.info("get_appservice_room_stream -> %s", service) - if limit: limit = max(limit, MAX_STREAM_SIZE) else: limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - # from_id = _StreamToken.parse_stream_token(from_key) - # to_id = _StreamToken.parse_stream_token(to_key) + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: - return defer.succeed(([], to_key)) + defer.returnValue(([], to_key)) + return + + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is used + # for the Notifier listener rooms). We can't reasonably make a SQL + # query for these room IDs, so we'll pull all the events between from/to + # and filter in python. + rooms_for_as = yield self.get_app_service_rooms(service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + # select all the events between from/to with a sensible limit + sql = ( + "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e " + "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "limit": limit + } + + def f(txn): + txn.execute(sql, (from_id.stream, to_id.stream,)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + # apply the filter on the room id list + [ + r["event_id"] for r in rows + if r["room_id"] in room_ids_for_as + ], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + return ret, key - # TODO stub - return defer.succeed(([], to_key)) + results = yield self.runInteraction("get_appservice_room_stream", f) + defer.returnValue(results) @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, -- cgit 1.5.1 From f0995436e7951448d8be3c372f4002845a111a7d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 17:21:17 +0000 Subject: Check for membership invite events correctly. --- synapse/storage/stream.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6946e9fe70..5d01ecf200 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,12 +36,14 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore +from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function from collections import namedtuple import logging +import simplejson logger = logging.getLogger(__name__) @@ -159,13 +161,30 @@ class StreamStore(SQLBaseStore): # select all the events between from/to with a sensible limit sql = ( - "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e " + "SELECT e.event_id, e.room_id, e.type, e.unrecognized_keys, " + "e.stream_ordering FROM events AS e " "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "limit": limit } + + def app_service_interested(row): + if row["room_id"] in room_ids_for_as: + return True + + if row["type"] == EventTypes.Member: + # load up the content to inspect if some user the AS is + # interested in was invited to a room. We'll be passing this + # through _get_events_txn later, so ignore the fact that this + # may be a redacted event. + event_content = simplejson.loads(row["unrecognized_keys"]) + if (service.is_interested_in_user( + event_content.get("state_key"))): + return True + return False + def f(txn): txn.execute(sql, (from_id.stream, to_id.stream,)) @@ -176,7 +195,7 @@ class StreamStore(SQLBaseStore): # apply the filter on the room id list [ r["event_id"] for r in rows - if r["room_id"] in room_ids_for_as + if app_service_interested(r) ], get_prev_content=True ) -- cgit 1.5.1 From 1cc77145d483c4a6cea75fafbe83a1661b8b61cd Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 27 Feb 2015 09:39:12 +0000 Subject: Notify appservices of invites mid-poll. This requires the notifier to have knowledge of appservice listeners so it can do the regex checks on incoming invites to see if the state_key matches. It isn't enough to just rely on the room listeners and store.get_app_service_rooms as the room will initially not exist or won't be on the ASes radar due to having none of its users in the room. --- synapse/notifier.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index 2475f3ffbe..09d23e79b8 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -36,8 +36,10 @@ class _NotificationListener(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, from_token, limit, timeout, deferred): + def __init__(self, user, rooms, from_token, limit, timeout, deferred, + appservice=None): self.user = user + self.appservice = appservice self.from_token = from_token self.limit = limit self.timeout = timeout @@ -65,6 +67,10 @@ class _NotificationListener(object): lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: + notifier.appservice_to_listeners.get( + self.appservice, set() + ).discard(self) class Notifier(object): @@ -79,6 +85,7 @@ class Notifier(object): self.rooms_to_listeners = {} self.user_to_listeners = {} + self.appservice_to_listeners = {} self.event_sources = hs.get_event_sources() @@ -114,6 +121,17 @@ class Notifier(object): for user in extra_users: listeners |= self.user_to_listeners.get(user, set()).copy() + for appservice in self.appservice_to_listeners: + # TODO (kegan): Redundant appservice listener checks? + # App services will already be in the rooms_to_listeners set, but + # that isn't enough. They need to be checked here in order to + # receive *invites* for users they are interested in. Does this + # make the rooms_to_listeners check somewhat obselete? + if appservice.is_interested(event): + listeners |= self.appservice_to_listeners.get( + appservice, set() + ).copy() + logger.debug("on_new_room_event listeners %s", listeners) # TODO (erikj): Can we make this more efficient by hitting the @@ -280,6 +298,10 @@ class Notifier(object): if not from_token: from_token = yield self.event_sources.get_current_token() + appservice = yield self.hs.get_datastore().get_app_service_by_user_id( + user.to_string() + ) + listener = _NotificationListener( user, rooms, @@ -287,6 +309,7 @@ class Notifier(object): limit, timeout, deferred, + appservice=appservice ) def _timeout_listener(): @@ -319,6 +342,11 @@ class Notifier(object): self.user_to_listeners.setdefault(listener.user, set()).add(listener) + if listener.appservice: + self.appservice_to_listeners.setdefault( + listener.appservice, set() + ).add(listener) + @defer.inlineCallbacks @log_function def _check_for_updates(self, listener): -- cgit 1.5.1 From 806a6c886aaa695a7dbfd35f71b9cc59941b8366 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 27 Feb 2015 09:48:57 +0000 Subject: PEP8 --- synapse/storage/stream.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5d01ecf200..09417bd147 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -169,7 +169,6 @@ class StreamStore(SQLBaseStore): "limit": limit } - def app_service_interested(row): if row["room_id"] in room_ids_for_as: return True -- cgit 1.5.1 From 9dc9118e552bfddaa7579b4ded8b1a0da7eff0e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Feb 2015 15:16:47 +0000 Subject: Document FederationRateLimiter --- synapse/federation/transport/server.py | 59 +++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a9e625f127..390e54b9fb 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -32,6 +32,21 @@ logger = logging.getLogger(__name__) class FederationRateLimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ self.clock = clock self.window_size = window_size @@ -43,9 +58,23 @@ class FederationRateLimiter(object): self.ratelimiters = {} def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ return self.ratelimiters.setdefault( host, - PerHostRatelimiter( + _PerHostRatelimiter( clock=self.clock, window_size=self.window_size, sleep_limit=self.sleep_limit, @@ -56,7 +85,7 @@ class FederationRateLimiter(object): ).ratelimit() -class PerHostRatelimiter(object): +class _PerHostRatelimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): self.clock = clock @@ -123,17 +152,25 @@ class PerHostRatelimiter(object): else: return defer.succeed(None) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) if len(self.request_times) > self.sleep_limit: - logger.debug("Ratelimit [%s]: sleeping req", id(request_id)) + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) ret_defer = sleep(self.sleep_msec/1000.0) self.sleeping_requests.add(request_id) def on_wait_finished(_): - logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) self.sleeping_requests.discard(request_id) queue_defer = queue_request() return queue_defer @@ -143,7 +180,10 @@ class PerHostRatelimiter(object): ret_defer = queue_request() def on_start(r): - logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) self.current_processing.add(request_id) return r @@ -162,7 +202,10 @@ class PerHostRatelimiter(object): return ret_defer def _on_exit(self, request_id): - logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) self.current_processing.discard(request_id) try: request_id, deferred = self.ready_request_queue.popitem() -- cgit 1.5.1 From 0554d0708225afe13d141bd00e3aaca2509f3f78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Feb 2015 15:41:52 +0000 Subject: Move federation rate limiting out of transport layer --- synapse/federation/transport/__init__.py | 4 +- synapse/federation/transport/server.py | 204 +--------------------------- synapse/util/ratelimitutils.py | 226 +++++++++++++++++++++++++++++++ 3 files changed, 230 insertions(+), 204 deletions(-) create mode 100644 synapse/util/ratelimitutils.py (limited to 'synapse') diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 7028ca6947..f0283b5105 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -21,9 +21,11 @@ support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ -from .server import TransportLayerServer, FederationRateLimiter +from .server import TransportLayerServer from .client import TransportLayerClient +from synapse.util.ratelimitutils import FederationRateLimiter + class TransportLayer(TransportLayerServer, TransportLayerClient): """This is a basic implementation of the transport layer that translates diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 390e54b9fb..fce9c0195e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,11 +16,9 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError, LimitExceededError -from synapse.util.async import sleep +from synapse.api.errors import Codes, SynapseError from synapse.util.logutils import log_function -import collections import logging import simplejson as json import re @@ -29,206 +27,6 @@ import re logger = logging.getLogger(__name__) -class FederationRateLimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): - """ - Args: - clock (Clock) - window_size (int): The window size in milliseconds. - sleep_limit (int): The number of requests received in the last - `window_size` milliseconds before we artificially start - delaying processing of requests. - sleep_msec (int): The number of milliseconds to delay processing - of incoming requests by. - reject_limit (int): The maximum number of requests that are can be - queued for processing before we start rejecting requests with - a 429 Too Many Requests response. - concurrent_requests (int): The number of concurrent requests to - process. - """ - self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - - self.ratelimiters = {} - - def ratelimit(self, host): - """Used to ratelimit an incoming request from given host - - Example usage: - - with rate_limiter.ratelimit(origin) as wait_deferred: - yield wait_deferred - # Handle request ... - - Args: - host (str): Origin of incoming request. - - Returns: - _PerHostRatelimiter - """ - return self.ratelimiters.setdefault( - host, - _PerHostRatelimiter( - clock=self.clock, - window_size=self.window_size, - sleep_limit=self.sleep_limit, - sleep_msec=self.sleep_msec, - reject_limit=self.reject_limit, - concurrent_requests=self.concurrent_requests, - ) - ).ratelimit() - - -class _PerHostRatelimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): - self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - - self.sleeping_requests = set() - self.ready_request_queue = collections.OrderedDict() - self.current_processing = set() - self.request_times = [] - - def is_empty(self): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - return not ( - self.ready_request_queue - or self.sleeping_requests - or self.current_processing - or self.request_times - ) - - def ratelimit(self): - request_id = object() - - def on_enter(): - return self._on_enter(request_id) - - def on_exit(exc_type, exc_val, exc_tb): - return self._on_exit(request_id) - - return ContextManagerFunction(on_enter, on_exit) - - def _on_enter(self, request_id): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) - if queue_size > self.reject_limit: - raise LimitExceededError( - retry_after_ms=int( - self.window_size / self.sleep_limit - ), - ) - - self.request_times.append(time_now) - - def queue_request(): - if len(self.current_processing) > self.concurrent_requests: - logger.debug("Ratelimit [%s]: Queue req", id(request_id)) - queue_defer = defer.Deferred() - self.ready_request_queue[request_id] = queue_defer - return queue_defer - else: - return defer.succeed(None) - - logger.debug( - "Ratelimit [%s]: len(self.request_times)=%d", - id(request_id), len(self.request_times), - ) - - if len(self.request_times) > self.sleep_limit: - logger.debug( - "Ratelimit [%s]: sleeping req", - id(request_id), - ) - ret_defer = sleep(self.sleep_msec/1000.0) - - self.sleeping_requests.add(request_id) - - def on_wait_finished(_): - logger.debug( - "Ratelimit [%s]: Finished sleeping", - id(request_id), - ) - self.sleeping_requests.discard(request_id) - queue_defer = queue_request() - return queue_defer - - ret_defer.addBoth(on_wait_finished) - else: - ret_defer = queue_request() - - def on_start(r): - logger.debug( - "Ratelimit [%s]: Processing req", - id(request_id), - ) - self.current_processing.add(request_id) - return r - - def on_err(r): - self.current_processing.discard(request_id) - return r - - def on_both(r): - # Ensure that we've properly cleaned up. - self.sleeping_requests.discard(request_id) - self.ready_request_queue.pop(request_id, None) - return r - - ret_defer.addCallbacks(on_start, on_err) - ret_defer.addBoth(on_both) - return ret_defer - - def _on_exit(self, request_id): - logger.debug( - "Ratelimit [%s]: Processed req", - id(request_id), - ) - self.current_processing.discard(request_id) - try: - request_id, deferred = self.ready_request_queue.popitem() - self.current_processing.add(request_id) - deferred.callback(None) - except KeyError: - pass - - -class ContextManagerFunction(object): - def __init__(self, on_enter, on_exit): - self.on_enter = on_enter - self.on_exit = on_exit - - def __enter__(self): - if self.on_enter: - return self.on_enter() - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.on_exit: - return self.on_exit(exc_type, exc_val, exc_tb) - - class TransportLayerServer(object): """Handles incoming federation HTTP requests""" diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py new file mode 100644 index 0000000000..259d5f6f88 --- /dev/null +++ b/synapse/util/ratelimitutils.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import LimitExceededError + +from synapse.util.async import sleep + +import collections +import logging + + +logger = logging.getLogger(__name__) + + +class FederationRateLimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.ratelimiters = {} + + def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ + return self.ratelimiters.setdefault( + host, + _PerHostRatelimiter( + clock=self.clock, + window_size=self.window_size, + sleep_limit=self.sleep_limit, + sleep_msec=self.sleep_msec, + reject_limit=self.reject_limit, + concurrent_requests=self.concurrent_requests, + ) + ).ratelimit() + + +class _PerHostRatelimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.sleeping_requests = set() + self.ready_request_queue = collections.OrderedDict() + self.current_processing = set() + self.request_times = [] + + def is_empty(self): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + return not ( + self.ready_request_queue + or self.sleeping_requests + or self.current_processing + or self.request_times + ) + + def ratelimit(self): + request_id = object() + + def on_enter(): + return self._on_enter(request_id) + + def on_exit(exc_type, exc_val, exc_tb): + return self._on_exit(request_id) + + return ContextManagerFunction(on_enter, on_exit) + + def _on_enter(self, request_id): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + if queue_size > self.reject_limit: + raise LimitExceededError( + retry_after_ms=int( + self.window_size / self.sleep_limit + ), + ) + + self.request_times.append(time_now) + + def queue_request(): + if len(self.current_processing) > self.concurrent_requests: + logger.debug("Ratelimit [%s]: Queue req", id(request_id)) + queue_defer = defer.Deferred() + self.ready_request_queue[request_id] = queue_defer + return queue_defer + else: + return defer.succeed(None) + + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) + + if len(self.request_times) > self.sleep_limit: + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) + ret_defer = sleep(self.sleep_msec/1000.0) + + self.sleeping_requests.add(request_id) + + def on_wait_finished(_): + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) + self.sleeping_requests.discard(request_id) + queue_defer = queue_request() + return queue_defer + + ret_defer.addBoth(on_wait_finished) + else: + ret_defer = queue_request() + + def on_start(r): + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) + self.current_processing.add(request_id) + return r + + def on_err(r): + self.current_processing.discard(request_id) + return r + + def on_both(r): + # Ensure that we've properly cleaned up. + self.sleeping_requests.discard(request_id) + self.ready_request_queue.pop(request_id, None) + return r + + ret_defer.addCallbacks(on_start, on_err) + ret_defer.addBoth(on_both) + return ret_defer + + def _on_exit(self, request_id): + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) + self.current_processing.discard(request_id) + try: + request_id, deferred = self.ready_request_queue.popitem() + self.current_processing.add(request_id) + deferred.callback(None) + except KeyError: + pass + + +class ContextManagerFunction(object): + def __init__(self, on_enter, on_exit): + self.on_enter = on_enter + self.on_exit = on_exit + + def __enter__(self): + if self.on_enter: + return self.on_enter() + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.on_exit: + return self.on_exit(exc_type, exc_val, exc_tb) -- cgit 1.5.1 From ebc48306662cf8719a0ea64e2955bf8d5e037a8e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 09:53:00 +0000 Subject: PR tweaks: set earlier on and use 'as json' for compat --- synapse/storage/appservice.py | 18 +++++++----------- synapse/storage/registration.py | 2 +- synapse/storage/stream.py | 8 ++++---- 3 files changed, 12 insertions(+), 16 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d0632d55d1..c6ca2ab04e 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -219,20 +219,17 @@ class ApplicationServiceStore(SQLBaseStore): # get all rooms matching the room ID regex. room_entries = yield self.get_all_rooms() # RoomEntry list - matching_room_id_list = [ + matching_room_list = set([ r.room_id for r in room_entries if service.is_interested_in_room(r.room_id) - ] + ]) # resolve room IDs for matching room alias regex. room_alias_mappings = yield self.get_all_associations() - matching_alias_list = [ + matching_room_list |= set([ r.room_id for r in room_alias_mappings if service.is_interested_in_alias(r.room_alias) - ] - room_ids_matching_alias_or_id = set( - matching_room_id_list + matching_alias_list - ) + ]) # get all rooms for every user for this AS. This is scoped to users on # this HS only. @@ -241,11 +238,10 @@ class ApplicationServiceStore(SQLBaseStore): u["name"] for u in user_list if service.is_interested_in_user(u["name"]) ] - rooms_for_user_matching_user_id = [] # RoomsForUser list + rooms_for_user_matching_user_id = set() # RoomsForUser list for user_id in user_list: rooms_for_user = yield self.get_rooms_for_user(user_id) - rooms_for_user_matching_user_id += rooms_for_user - rooms_for_user_matching_user_id = set(rooms_for_user_matching_user_id) + rooms_for_user_matching_user_id |= set(rooms_for_user) # make RoomsForUser tuples for room ids and aliases which are not in the # main rooms_for_user_list - e.g. they are rooms which do not have AS @@ -253,7 +249,7 @@ class ApplicationServiceStore(SQLBaseStore): known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id] missing_rooms_for_user = [ RoomsForUser(r, service.sender, "join") for r in - room_ids_matching_alias_or_id if r not in known_room_ids + matching_room_list if r not in known_room_ids ] rooms_for_user_matching_user_id |= set(missing_rooms_for_user) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7aff3dbd33..9c92575c7f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,7 +93,7 @@ class RegistrationStore(SQLBaseStore): ) def get_all_users(self): - query = ("SELECT users.name FROM users") + query = "SELECT users.name FROM users" return self._execute( self.cursor_to_dict, query diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09417bd147..bad427288d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,7 @@ from synapse.util.logutils import log_function from collections import namedtuple import logging -import simplejson +import simplejson as json logger = logging.getLogger(__name__) @@ -178,7 +178,7 @@ class StreamStore(SQLBaseStore): # interested in was invited to a room. We'll be passing this # through _get_events_txn later, so ignore the fact that this # may be a redacted event. - event_content = simplejson.loads(row["unrecognized_keys"]) + event_content = json.loads(row["unrecognized_keys"]) if (service.is_interested_in_user( event_content.get("state_key"))): return True @@ -202,7 +202,7 @@ class StreamStore(SQLBaseStore): self._set_before_and_after(ret, rows) if rows: - key = "s%d" % max([r["stream_ordering"] for r in rows]) + key = "s%d" % max(r["stream_ordering"] for r in rows) else: # Assume we didn't get anything because there was nothing to @@ -271,7 +271,7 @@ class StreamStore(SQLBaseStore): self._set_before_and_after(ret, rows) if rows: - key = "s%d" % max([r["stream_ordering"] for r in rows]) + key = "s%d" % max(r["stream_ordering"] for r in rows) else: # Assume we didn't get anything because there was nothing to -- cgit 1.5.1 From 3d73383d185b41b9986366da8123255e3a8ce1e0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 10:16:24 +0000 Subject: Modify _simple_select_list to allow an empty WHERE clause. Use it for get_all_rooms and get_all_users. --- synapse/storage/_base.py | 22 +++++++++++++++------- synapse/storage/appservice.py | 4 ++-- synapse/storage/registration.py | 7 ++----- synapse/storage/room.py | 5 ++--- 4 files changed, 21 insertions(+), 17 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c98dd36aed..3725c9795d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -450,7 +450,8 @@ class SQLBaseStore(object): Args: table : string giving the table name - keyvalues : dict of column names and values to select the rows with + keyvalues : dict of column names and values to select the rows with, + or None to not apply a WHERE clause. retcols : list of strings giving the names of the columns to return """ return self.runInteraction( @@ -469,13 +470,20 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the rows with retcols : list of strings giving the names of the columns to return """ - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k, ) for k in keyvalues) - ) + if keyvalues: + sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k, ) for k in keyvalues) + ) + txn.execute(sql, keyvalues.values()) + else: + sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table + ) + txn.execute(sql) - txn.execute(sql, keyvalues.values()) return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c6ca2ab04e..0e3eab9422 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -220,8 +220,8 @@ class ApplicationServiceStore(SQLBaseStore): # get all rooms matching the room ID regex. room_entries = yield self.get_all_rooms() # RoomEntry list matching_room_list = set([ - r.room_id for r in room_entries if - service.is_interested_in_room(r.room_id) + r["room_id"] for r in room_entries if + service.is_interested_in_room(r["room_id"]) ]) # resolve room IDs for matching room alias regex. diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9c92575c7f..54cd15bc0e 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,11 +93,8 @@ class RegistrationStore(SQLBaseStore): ) def get_all_users(self): - query = "SELECT users.name FROM users" - return self._execute( - self.cursor_to_dict, - query - ) + return self._simple_select_list( + table="users", keyvalues=None, retcols=["name"]) def get_user_by_token(self, token): """Get a user from the given access token. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 3a64693404..6bd0b22ae5 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -77,9 +77,8 @@ class RoomStore(SQLBaseStore): Returns: A list of namedtuples containing the room information. """ - query = RoomsTable.select_statement() - return self._execute( - RoomsTable.decode_results, query, + return self._simple_select_list( + table="rooms", keyvalues=None, retcols=["room_id"] ) @defer.inlineCallbacks -- cgit 1.5.1 From b216b3689248094989168c340b60f500c93772a7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 10:41:35 +0000 Subject: JOIN state_events rather than parsing unrecognized_keys to pull out member state_keys --- synapse/storage/appservice.py | 2 +- synapse/storage/stream.py | 14 ++++---------- 2 files changed, 5 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 0e3eab9422..3a267d0442 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -218,7 +218,7 @@ class ApplicationServiceStore(SQLBaseStore): # less obvious. # get all rooms matching the room ID regex. - room_entries = yield self.get_all_rooms() # RoomEntry list + room_entries = yield self.get_all_rooms() matching_room_list = set([ r["room_id"] for r in room_entries if service.is_interested_in_room(r["room_id"]) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index bad427288d..865cb13e9e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,6 @@ from synapse.util.logutils import log_function from collections import namedtuple import logging -import simplejson as json logger = logging.getLogger(__name__) @@ -161,8 +160,9 @@ class StreamStore(SQLBaseStore): # select all the events between from/to with a sensible limit sql = ( - "SELECT e.event_id, e.room_id, e.type, e.unrecognized_keys, " - "e.stream_ordering FROM events AS e " + "SELECT e.event_id, e.room_id, e.type, s.state_key, " + "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON " + "e.event_id = s.event_id " "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { @@ -174,13 +174,7 @@ class StreamStore(SQLBaseStore): return True if row["type"] == EventTypes.Member: - # load up the content to inspect if some user the AS is - # interested in was invited to a room. We'll be passing this - # through _get_events_txn later, so ignore the fact that this - # may be a redacted event. - event_content = json.loads(row["unrecognized_keys"]) - if (service.is_interested_in_user( - event_content.get("state_key"))): + if service.is_interested_in_user(row.get("state_key")): return True return False -- cgit 1.5.1 From 377ae369c1275fabdac46fa00c0b2ba238467435 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 11:20:51 +0000 Subject: Wrap all of get_app_service_rooms in a txn. --- synapse/storage/appservice.py | 38 +++++++++++++++++++++++----------- synapse/storage/directory.py | 21 ------------------- synapse/storage/registration.py | 4 ---- synapse/storage/room.py | 10 --------- synapse/storage/roommember.py | 36 +++++++++++++++++--------------- synapse/storage/stream.py | 46 ++++++++++++++++++++--------------------- 6 files changed, 67 insertions(+), 88 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 3a267d0442..97481d113b 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -15,6 +15,7 @@ import logging from twisted.internet import defer +from synapse.api.constants import Membership from synapse.api.errors import StoreError from synapse.appservice import ApplicationService from synapse.storage.roommember import RoomsForUser @@ -197,7 +198,6 @@ class ApplicationServiceStore(SQLBaseStore): # TODO: The from_cache=False impl # TODO: This should be JOINed with the application_services_regex table. - @defer.inlineCallbacks def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -212,35 +212,49 @@ class ApplicationServiceStore(SQLBaseStore): Returns: A list of RoomsForUser. """ - # FIXME: This is assuming that this store has methods from - # RoomStore, DirectoryStore, RegistrationStore, RoomMemberStore which is - # a bad assumption to make as it makes testing trickier and coupling - # less obvious. + return self.runInteraction( + "get_app_service_rooms", + self._get_app_service_rooms_txn, + service, + ) + def _get_app_service_rooms_txn(self, txn, service): # get all rooms matching the room ID regex. - room_entries = yield self.get_all_rooms() + room_entries = self._simple_select_list_txn( + txn=txn, table="rooms", keyvalues=None, retcols=["room_id"] + ) matching_room_list = set([ r["room_id"] for r in room_entries if service.is_interested_in_room(r["room_id"]) ]) # resolve room IDs for matching room alias regex. - room_alias_mappings = yield self.get_all_associations() + room_alias_mappings = self._simple_select_list_txn( + txn=txn, table="room_aliases", keyvalues=None, + retcols=["room_id", "room_alias"] + ) matching_room_list |= set([ - r.room_id for r in room_alias_mappings if - service.is_interested_in_alias(r.room_alias) + r["room_id"] for r in room_alias_mappings if + service.is_interested_in_alias(r["room_alias"]) ]) # get all rooms for every user for this AS. This is scoped to users on # this HS only. - user_list = yield self.get_all_users() + user_list = self._simple_select_list_txn( + txn=txn, table="users", keyvalues=None, retcols=["name"] + ) user_list = [ u["name"] for u in user_list if service.is_interested_in_user(u["name"]) ] rooms_for_user_matching_user_id = set() # RoomsForUser list for user_id in user_list: - rooms_for_user = yield self.get_rooms_for_user(user_id) + # FIXME: This assumes this store is linked with RoomMemberStore :( + rooms_for_user = self._get_rooms_for_user_where_membership_is_txn( + txn=txn, + user_id=user_id, + membership_list=[Membership.JOIN] + ) rooms_for_user_matching_user_id |= set(rooms_for_user) # make RoomsForUser tuples for room ids and aliases which are not in the @@ -253,7 +267,7 @@ class ApplicationServiceStore(SQLBaseStore): ] rooms_for_user_matching_user_id |= set(missing_rooms_for_user) - defer.returnValue(rooms_for_user_matching_user_id) + return rooms_for_user_matching_user_id @defer.inlineCallbacks def _populate_cache(self): diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index e391239a3c..68b7d59693 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -134,27 +134,6 @@ class DirectoryStore(SQLBaseStore): return room_id - @defer.inlineCallbacks - def get_all_associations(self): - """Retrieve the entire list of room alias -> room ID pairings. - - Returns: - A list of RoomAliasMappings. - """ - results = yield self._execute_and_decode( - "SELECT room_id, room_alias FROM room_aliases" - ) - - # TODO(kegan): It feels wrong to be specifying no servers here, but - # equally this function isn't required to obtain all servers so - # retrieving them "just for the sake of it" also seems wrong, but we - # want to conform to passing Objects around and not dicts.. - defer.returnValue([ - RoomAliasMapping( - room_id=r["room_id"], room_alias=r["room_alias"], servers="" - ) for r in results - ]) - def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 54cd15bc0e..029b07cc66 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -92,10 +92,6 @@ class RegistrationStore(SQLBaseStore): query, user_id ) - def get_all_users(self): - return self._simple_select_list( - table="users", keyvalues=None, retcols=["name"]) - def get_user_by_token(self, token): """Get a user from the given access token. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 6bd0b22ae5..750b17a45f 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -71,16 +71,6 @@ class RoomStore(SQLBaseStore): RoomsTable.decode_single_result, query, room_id, ) - def get_all_rooms(self): - """Retrieve all the rooms. - - Returns: - A list of namedtuples containing the room information. - """ - return self._simple_select_list( - table="rooms", keyvalues=None, retcols=["room_id"] - ) - @defer.inlineCallbacks def get_rooms(self, is_public): """Retrieve a list of all public rooms. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 58aa376c20..3d0172d09b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -180,6 +180,14 @@ class RoomMemberStore(SQLBaseStore): if not membership_list: return defer.succeed(None) + return self.runInteraction( + "get_rooms_for_user_where_membership_is", + self._get_rooms_for_user_where_membership_is_txn, + user_id, membership_list + ) + + def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, + membership_list): where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) @@ -187,24 +195,18 @@ class RoomMemberStore(SQLBaseStore): args = [user_id] args.extend(membership_list) - def f(txn): - sql = ( - "SELECT m.room_id, m.sender, m.membership" - " FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE %s" - ) % (where_clause,) - - txn.execute(sql, args) - return [ - RoomsForUser(**r) for r in self.cursor_to_dict(txn) - ] + sql = ( + "SELECT m.room_id, m.sender, m.membership" + " FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %s" + ) % (where_clause,) - return self.runInteraction( - "get_rooms_for_user_where_membership_is", - f - ) + txn.execute(sql, args) + return [ + RoomsForUser(**r) for r in self.cursor_to_dict(txn) + ] def get_joined_hosts_for_room(self, room_id): return self._simple_select_onecol( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 865cb13e9e..09bc522210 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -146,18 +146,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], to_key)) return - # Logic: - # - We want ALL events which match the AS room_id regex - # - We want ALL events which match the rooms represented by the AS - # room_alias regex - # - We want ALL events for rooms that AS users have joined. - # This is currently supported via get_app_service_rooms (which is used - # for the Notifier listener rooms). We can't reasonably make a SQL - # query for these room IDs, so we'll pull all the events between from/to - # and filter in python. - rooms_for_as = yield self.get_app_service_rooms(service) - room_ids_for_as = [r.room_id for r in rooms_for_as] - # select all the events between from/to with a sensible limit sql = ( "SELECT e.event_id, e.room_id, e.type, s.state_key, " @@ -169,20 +157,32 @@ class StreamStore(SQLBaseStore): "limit": limit } - def app_service_interested(row): - if row["room_id"] in room_ids_for_as: - return True - - if row["type"] == EventTypes.Member: - if service.is_interested_in_user(row.get("state_key")): - return True - return False - def f(txn): + # pull out all the events between the tokens txn.execute(sql, (from_id.stream, to_id.stream,)) - rows = self.cursor_to_dict(txn) + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is + # used for the Notifier listener rooms). We can't reasonably make a + # SQL query for these room IDs, so we'll pull all the events between + # from/to and filter in python. + rooms_for_as = self._get_app_service_rooms_txn(txn, service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + def app_service_interested(row): + if row["room_id"] in room_ids_for_as: + return True + + if row["type"] == EventTypes.Member: + if service.is_interested_in_user(row.get("state_key")): + return True + return False + ret = self._get_events_txn( txn, # apply the filter on the room id list @@ -197,7 +197,6 @@ class StreamStore(SQLBaseStore): if rows: key = "s%d" % max(r["stream_ordering"] for r in rows) - else: # Assume we didn't get anything because there was nothing to # get. @@ -266,7 +265,6 @@ class StreamStore(SQLBaseStore): if rows: key = "s%d" % max(r["stream_ordering"] for r in rows) - else: # Assume we didn't get anything because there was nothing to # get. -- cgit 1.5.1 From cb97ea3ec236c23c745e59c3a857503dd8dc3410 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 11:23:46 +0000 Subject: PEP8 --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3d0172d09b..65ffb4627f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -187,7 +187,7 @@ class RoomMemberStore(SQLBaseStore): ) def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, - membership_list): + membership_list): where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) -- cgit 1.5.1 From 9d9b230501915c326136567349b0995623c48a21 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:33:45 +0000 Subject: Make the federation server ratelimiting configurable. --- synapse/config/ratelimiting.py | 36 ++++++++++++++++++++++++++++++++ synapse/federation/transport/__init__.py | 10 ++++----- 2 files changed, 41 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 17c7e64ce7..862c07ef8c 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -22,6 +22,12 @@ class RatelimitConfig(Config): self.rc_messages_per_second = args.rc_messages_per_second self.rc_message_burst_count = args.rc_message_burst_count + self.federation_rc_window_size = args.federation_rc_window_size + self.federation_rc_sleep_limit = args.federation_rc_sleep_limit + self.federation_rc_sleep_delay = args.federation_rc_sleep_delay + self.federation_rc_reject_limit = args.federation_rc_reject_limit + self.federation_rc_concurrent = args.federation_rc_concurrent + @classmethod def add_arguments(cls, parser): super(RatelimitConfig, cls).add_arguments(parser) @@ -34,3 +40,33 @@ class RatelimitConfig(Config): "--rc-message-burst-count", type=float, default=10, help="number of message a client can send before being throttled" ) + + rc_group.add_argument( + "--federation-rc-window-size", type=int, default=10000, + help="The federation window size in milliseconds", + ) + + rc_group.add_argument( + "--federation-rc-sleep-limit", type=int, default=10, + help="The number of federation requests from a single server" + " in a window before the server will delay processing the" + " request.", + ) + + rc_group.add_argument( + "--federation-rc-sleep-delay", type=int, default=500, + help="The duration in milliseconds to delay processing events from" + " remote servers by if they go over the sleep limit.", + ) + + rc_group.add_argument( + "--federation-rc-reject-limit", type=int, default=50, + help="The maximum number of concurrent federation requests allowed" + " from a single server", + ) + + rc_group.add_argument( + "--federation-rc-concurrent", type=int, default=3, + help="The number of federation requests to concurrently process" + " from a single server", + ) diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index f0283b5105..2a671b9aec 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -66,9 +66,9 @@ class TransportLayer(TransportLayerServer, TransportLayerClient): self.ratelimiter = FederationRateLimiter( self.clock, - window_size=10000, - sleep_limit=10, - sleep_msec=500, - reject_limit=50, - concurrent_requests=3, + window_size=homeserver.config.federation_rc_window_size, + sleep_limit=homeserver.config.federation_rc_sleep_limit, + sleep_msec=homeserver.config.federation_rc_sleep_delay, + reject_limit=homeserver.config.federation_rc_reject_limit, + concurrent_requests=homeserver.config.federation_rc_concurrent, ) -- cgit 1.5.1 From 23d9bd1d745a037202bb9a134cdb848eb65a01e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:39:40 +0000 Subject: Process transactions serially. Since the events received in a transaction are ordered, later events might depend on earlier events and so we shouldn't blindly process them in parellel. --- synapse/federation/federation_server.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 22b9663831..7ee37fb34d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -112,17 +112,23 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): - dl = [] + results = [] + for pdu in pdu_list: d = self._handle_new_pdu(transaction.origin, pdu) def handle_failure(failure): failure.trap(FederationError) self.send_failure(failure.value, transaction.origin) + return failure d.addErrback(handle_failure) - dl.append(d) + try: + yield d + results.append({}) + except Exception as e: + results.append({"error": str(e)}) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -135,21 +141,11 @@ class FederationServer(FederationBase): for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) - results = yield defer.DeferredList(dl, consumeErrors=True) - - ret = [] - for r in results: - if r[0]: - ret.append({}) - else: - logger.exception(r[1]) - ret.append({"error": str(r[1].value)}) - - logger.debug("Returning: %s", str(ret)) + logger.debug("Returning: %s", str(results)) response = { "pdus": dict(zip( - (p.event_id for p in pdu_list), ret + (p.event_id for p in pdu_list), results )), } -- cgit 1.5.1 From 29481690c5b296a1c8aee3068d32ef083ef227f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:50:43 +0000 Subject: If we're yielding don't add errback --- synapse/federation/federation_server.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7ee37fb34d..bc9bac809a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -117,16 +117,12 @@ class FederationServer(FederationBase): for pdu in pdu_list: d = self._handle_new_pdu(transaction.origin, pdu) - def handle_failure(failure): - failure.trap(FederationError) - self.send_failure(failure.value, transaction.origin) - return failure - - d.addErrback(handle_failure) - try: yield d results.append({}) + except FederationError as e: + self.send_failure(e, transaction.origin) + results.append({"error": str(e)}) except Exception as e: results.append({"error": str(e)}) -- cgit 1.5.1 From 3077cb291590a7ba3b24a3d1a9985d65980924fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 13:32:44 +0000 Subject: Use contextlib.contextmanager instead of a custom class --- synapse/util/ratelimitutils.py | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) (limited to 'synapse') diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 259d5f6f88..d4457af950 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -20,6 +20,7 @@ from synapse.api.errors import LimitExceededError from synapse.util.async import sleep import collections +import contextlib import logging @@ -112,16 +113,19 @@ class _PerHostRatelimiter(object): or self.request_times ) + @contextlib.contextmanager def ratelimit(self): - request_id = object() - - def on_enter(): - return self._on_enter(request_id) - - def on_exit(exc_type, exc_val, exc_tb): - return self._on_exit(request_id) + # `contextlib.contextmanager` takes a generator and turns it into a + # context manager. The generator should only yield once with a value + # to be returned by manager. + # Exceptions will be reraised at the yield. - return ContextManagerFunction(on_enter, on_exit) + request_id = object() + ret = self._on_enter(request_id) + try: + yield ret + finally: + self._on_exit(request_id) def _on_enter(self, request_id): time_now = self.clock.time_msec() @@ -210,17 +214,3 @@ class _PerHostRatelimiter(object): deferred.callback(None) except KeyError: pass - - -class ContextManagerFunction(object): - def __init__(self, on_enter, on_exit): - self.on_enter = on_enter - self.on_exit = on_exit - - def __enter__(self): - if self.on_enter: - return self.on_enter() - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.on_exit: - return self.on_exit(exc_type, exc_val, exc_tb) -- cgit 1.5.1