From d073cb7ead62314ff14c59a9aaeac4f5f8470dd6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Nov 2016 16:29:51 +0000 Subject: Add Limiter: limit concurrent access to resource --- synapse/util/async.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 347fb1e380..2a680fff5e 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -197,6 +197,51 @@ class Linearizer(object): defer.returnValue(_ctx_manager()) +class Limiter(object): + """Limits concurrent access to resources based on a key. Useful to ensure + only a few thing happen at a time on a given resource. + + Example: + + with (yield limiter.queue("test_key")): + # do some work. + + """ + def __init__(self, max_count): + """ + Args: + max_count(int): The maximum number of concurrent access + """ + self.max_count = max_count + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + entry = self.key_to_defer.setdefault(key, [0, []]) + + if entry[0] >= self.max_count: + new_defer = defer.Deferred() + entry[1].append(new_defer) + with PreserveLoggingContext(): + yield new_defer + + entry[0] += 1 + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + entry[0] -= 1 + try: + entry[1].pop(0).callback(None) + except IndexError: + if entry[0] == 0: + self.key_to_defer.pop(key, None) + + defer.returnValue(_ctx_manager()) + + class ReadWriteLock(object): """A deferred style read write lock. -- cgit 1.5.1 From 64038b806cff6fea0a190b3d310225ede8abc5af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Nov 2016 10:42:08 +0000 Subject: Comments --- synapse/handlers/message.py | 2 ++ synapse/util/async.py | 13 +++++++++++++ 2 files changed, 15 insertions(+) (limited to 'synapse/util') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 90519e33e3..4d0515ddfb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -50,6 +50,8 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. self.limiter = Limiter(max_count=5) @defer.inlineCallbacks diff --git a/synapse/util/async.py b/synapse/util/async.py index 2a680fff5e..16ed183d4c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -213,12 +213,21 @@ class Limiter(object): max_count(int): The maximum number of concurrent access """ self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing + # the second element is a list of deferreds for the things blocked from + # executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): entry = self.key_to_defer.setdefault(key, [0, []]) + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() entry[1].append(new_defer) @@ -232,10 +241,14 @@ class Limiter(object): try: yield finally: + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them entry[0] -= 1 try: entry[1].pop(0).callback(None) except IndexError: + # If nothing else is executing for this key then remove it + # from the map if entry[0] == 0: self.key_to_defer.pop(key, None) -- cgit 1.5.1 From d56c39cf24e1650f0513196bf245c29e5163a836 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 Nov 2016 13:03:19 +0000 Subject: Use external ldap auth pacakge --- synapse/config/password_auth_providers.py | 20 +- synapse/handlers/auth.py | 2 + synapse/python_dependencies.py | 4 +- synapse/util/ldap_auth_provider.py | 369 ------------------------------ 4 files changed, 17 insertions(+), 378 deletions(-) delete mode 100644 synapse/util/ldap_auth_provider.py (limited to 'synapse/util') diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py index 1f438d2bb3..83762d089a 100644 --- a/synapse/config/password_auth_providers.py +++ b/synapse/config/password_auth_providers.py @@ -27,17 +27,23 @@ class PasswordAuthProviderConfig(Config): ldap_config = config.get("ldap_config", {}) self.ldap_enabled = ldap_config.get("enabled", False) if self.ldap_enabled: - from synapse.util.ldap_auth_provider import LdapAuthProvider + from ldap_auth_provider import LdapAuthProvider parsed_config = LdapAuthProvider.parse_config(ldap_config) self.password_providers.append((LdapAuthProvider, parsed_config)) providers = config.get("password_providers", []) for provider in providers: - # We need to import the module, and then pick the class out of - # that, so we split based on the last dot. - module, clz = provider['module'].rsplit(".", 1) - module = importlib.import_module(module) - provider_class = getattr(module, clz) + # This is for backwards compat when the ldap auth provider resided + # in this package. + if provider['module'] == "synapse.util.ldap_auth_provider.LdapAuthProvider": + from ldap_auth_provider import LdapAuthProvider + provider_class = LdapAuthProvider + else: + # We need to import the module, and then pick the class out of + # that, so we split based on the last dot. + module, clz = provider['module'].rsplit(".", 1) + module = importlib.import_module(module) + provider_class = getattr(module, clz) try: provider_config = provider_class.parse_config(provider["config"]) @@ -50,7 +56,7 @@ class PasswordAuthProviderConfig(Config): def default_config(self, **kwargs): return """\ # password_providers: - # - module: "synapse.util.ldap_auth_provider.LdapAuthProvider" + # - module: "ldap_auth_provider.LdapAuthProvider" # config: # enabled: true # uri: "ldap://ldap.example.com:389" diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3851b35889..a2866af431 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -61,6 +61,8 @@ class AuthHandler(BaseHandler): for module, config in hs.config.password_providers ] + logger.info("Extra password_providers: %r", self.password_providers) + self.hs = hs # FIXME better possibility to access registrationHandler later? self.device_handler = hs.get_device_handler() diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 59bc084b11..3742a25b37 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -49,8 +49,8 @@ CONDITIONAL_REQUIREMENTS = { "Jinja2>=2.8": ["Jinja2>=2.8"], "bleach>=1.4.2": ["bleach>=1.4.2"], }, - "ldap": { - "ldap3>=1.0": ["ldap3>=1.0"], + "matrix-synapse-ldap3": { + "matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"], }, "psutil": { "psutil>=2.0.0": ["psutil>=2.0.0"], diff --git a/synapse/util/ldap_auth_provider.py b/synapse/util/ldap_auth_provider.py deleted file mode 100644 index 1b989248fb..0000000000 --- a/synapse/util/ldap_auth_provider.py +++ /dev/null @@ -1,369 +0,0 @@ - -from twisted.internet import defer - -from synapse.config._base import ConfigError -from synapse.types import UserID - -import ldap3 -import ldap3.core.exceptions - -import logging - -try: - import ldap3 - import ldap3.core.exceptions -except ImportError: - ldap3 = None - pass - - -logger = logging.getLogger(__name__) - - -class LDAPMode(object): - SIMPLE = "simple", - SEARCH = "search", - - LIST = (SIMPLE, SEARCH) - - -class LdapAuthProvider(object): - __version__ = "0.1" - - def __init__(self, config, account_handler): - self.account_handler = account_handler - - if not ldap3: - raise RuntimeError( - 'Missing ldap3 library. This is required for LDAP Authentication.' - ) - - self.ldap_mode = config.mode - self.ldap_uri = config.uri - self.ldap_start_tls = config.start_tls - self.ldap_base = config.base - self.ldap_attributes = config.attributes - if self.ldap_mode == LDAPMode.SEARCH: - self.ldap_bind_dn = config.bind_dn - self.ldap_bind_password = config.bind_password - self.ldap_filter = config.filter - - @defer.inlineCallbacks - def check_password(self, user_id, password): - """ Attempt to authenticate a user against an LDAP Server - and register an account if none exists. - - Returns: - True if authentication against LDAP was successful - """ - localpart = UserID.from_string(user_id).localpart - - try: - server = ldap3.Server(self.ldap_uri) - logger.debug( - "Attempting LDAP connection with %s", - self.ldap_uri - ) - - if self.ldap_mode == LDAPMode.SIMPLE: - result, conn = self._ldap_simple_bind( - server=server, localpart=localpart, password=password - ) - logger.debug( - 'LDAP authentication method simple bind returned: %s (conn: %s)', - result, - conn - ) - if not result: - defer.returnValue(False) - elif self.ldap_mode == LDAPMode.SEARCH: - result, conn = self._ldap_authenticated_search( - server=server, localpart=localpart, password=password - ) - logger.debug( - 'LDAP auth method authenticated search returned: %s (conn: %s)', - result, - conn - ) - if not result: - defer.returnValue(False) - else: - raise RuntimeError( - 'Invalid LDAP mode specified: {mode}'.format( - mode=self.ldap_mode - ) - ) - - try: - logger.info( - "User authenticated against LDAP server: %s", - conn - ) - except NameError: - logger.warn( - "Authentication method yielded no LDAP connection, aborting!" - ) - defer.returnValue(False) - - # check if user with user_id exists - if (yield self.account_handler.check_user_exists(user_id)): - # exists, authentication complete - conn.unbind() - defer.returnValue(True) - - else: - # does not exist, fetch metadata for account creation from - # existing ldap connection - query = "({prop}={value})".format( - prop=self.ldap_attributes['uid'], - value=localpart - ) - - if self.ldap_mode == LDAPMode.SEARCH and self.ldap_filter: - query = "(&{filter}{user_filter})".format( - filter=query, - user_filter=self.ldap_filter - ) - logger.debug( - "ldap registration filter: %s", - query - ) - - conn.search( - search_base=self.ldap_base, - search_filter=query, - attributes=[ - self.ldap_attributes['name'], - self.ldap_attributes['mail'] - ] - ) - - if len(conn.response) == 1: - attrs = conn.response[0]['attributes'] - mail = attrs[self.ldap_attributes['mail']][0] - name = attrs[self.ldap_attributes['name']][0] - - # create account - user_id, access_token = ( - yield self.account_handler.register(localpart=localpart) - ) - - # TODO: bind email, set displayname with data from ldap directory - - logger.info( - "Registration based on LDAP data was successful: %d: %s (%s, %)", - user_id, - localpart, - name, - mail - ) - - defer.returnValue(True) - else: - if len(conn.response) == 0: - logger.warn("LDAP registration failed, no result.") - else: - logger.warn( - "LDAP registration failed, too many results (%s)", - len(conn.response) - ) - - defer.returnValue(False) - - defer.returnValue(False) - - except ldap3.core.exceptions.LDAPException as e: - logger.warn("Error during ldap authentication: %s", e) - defer.returnValue(False) - - @staticmethod - def parse_config(config): - class _LdapConfig(object): - pass - - ldap_config = _LdapConfig() - - ldap_config.enabled = config.get("enabled", False) - - ldap_config.mode = LDAPMode.SIMPLE - - # verify config sanity - _require_keys(config, [ - "uri", - "base", - "attributes", - ]) - - ldap_config.uri = config["uri"] - ldap_config.start_tls = config.get("start_tls", False) - ldap_config.base = config["base"] - ldap_config.attributes = config["attributes"] - - if "bind_dn" in config: - ldap_config.mode = LDAPMode.SEARCH - _require_keys(config, [ - "bind_dn", - "bind_password", - ]) - - ldap_config.bind_dn = config["bind_dn"] - ldap_config.bind_password = config["bind_password"] - ldap_config.filter = config.get("filter", None) - - # verify attribute lookup - _require_keys(config['attributes'], [ - "uid", - "name", - "mail", - ]) - - return ldap_config - - def _ldap_simple_bind(self, server, localpart, password): - """ Attempt a simple bind with the credentials - given by the user against the LDAP server. - - Returns True, LDAP3Connection - if the bind was successful - Returns False, None - if an error occured - """ - - try: - # bind with the the local users ldap credentials - bind_dn = "{prop}={value},{base}".format( - prop=self.ldap_attributes['uid'], - value=localpart, - base=self.ldap_base - ) - conn = ldap3.Connection(server, bind_dn, password, - authentication=ldap3.AUTH_SIMPLE) - logger.debug( - "Established LDAP connection in simple bind mode: %s", - conn - ) - - if self.ldap_start_tls: - conn.start_tls() - logger.debug( - "Upgraded LDAP connection in simple bind mode through StartTLS: %s", - conn - ) - - if conn.bind(): - # GOOD: bind okay - logger.debug("LDAP Bind successful in simple bind mode.") - return True, conn - - # BAD: bind failed - logger.info( - "Binding against LDAP failed for '%s' failed: %s", - localpart, conn.result['description'] - ) - conn.unbind() - return False, None - - except ldap3.core.exceptions.LDAPException as e: - logger.warn("Error during LDAP authentication: %s", e) - return False, None - - def _ldap_authenticated_search(self, server, localpart, password): - """ Attempt to login with the preconfigured bind_dn - and then continue searching and filtering within - the base_dn - - Returns (True, LDAP3Connection) - if a single matching DN within the base was found - that matched the filter expression, and with which - a successful bind was achieved - - The LDAP3Connection returned is the instance that was used to - verify the password not the one using the configured bind_dn. - Returns (False, None) - if an error occured - """ - - try: - conn = ldap3.Connection( - server, - self.ldap_bind_dn, - self.ldap_bind_password - ) - logger.debug( - "Established LDAP connection in search mode: %s", - conn - ) - - if self.ldap_start_tls: - conn.start_tls() - logger.debug( - "Upgraded LDAP connection in search mode through StartTLS: %s", - conn - ) - - if not conn.bind(): - logger.warn( - "Binding against LDAP with `bind_dn` failed: %s", - conn.result['description'] - ) - conn.unbind() - return False, None - - # construct search_filter like (uid=localpart) - query = "({prop}={value})".format( - prop=self.ldap_attributes['uid'], - value=localpart - ) - if self.ldap_filter: - # combine with the AND expression - query = "(&{query}{filter})".format( - query=query, - filter=self.ldap_filter - ) - logger.debug( - "LDAP search filter: %s", - query - ) - conn.search( - search_base=self.ldap_base, - search_filter=query - ) - - if len(conn.response) == 1: - # GOOD: found exactly one result - user_dn = conn.response[0]['dn'] - logger.debug('LDAP search found dn: %s', user_dn) - - # unbind and simple bind with user_dn to verify the password - # Note: do not use rebind(), for some reason it did not verify - # the password for me! - conn.unbind() - return self._ldap_simple_bind(server, localpart, password) - else: - # BAD: found 0 or > 1 results, abort! - if len(conn.response) == 0: - logger.info( - "LDAP search returned no results for '%s'", - localpart - ) - else: - logger.info( - "LDAP search returned too many (%s) results for '%s'", - len(conn.response), localpart - ) - conn.unbind() - return False, None - - except ldap3.core.exceptions.LDAPException as e: - logger.warn("Error during LDAP authentication: %s", e) - return False, None - - -def _require_keys(config, required): - missing = [key for key in required if key not in config] - if missing: - raise ConfigError( - "LDAP enabled but missing required config values: {}".format( - ", ".join(missing) - ) - ) -- cgit 1.5.1 From f8ee66250a16cb9dd3af01fb1150ff18cfebbc39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Nov 2016 15:46:44 +0000 Subject: Handle sending events and device messages over federation --- synapse/app/federation_sender.py | 31 ++++++++++-------- synapse/federation/send_queue.py | 38 ++++++++++++++++++---- synapse/federation/transaction_queue.py | 32 ++++++++++++++++++ synapse/handlers/message.py | 13 +------- synapse/notifier.py | 2 ++ synapse/replication/resource.py | 2 +- synapse/replication/slave/storage/deviceinbox.py | 15 ++++++--- synapse/replication/slave/storage/events.py | 11 +++++++ synapse/replication/slave/storage/transactions.py | 4 +-- synapse/storage/deviceinbox.py | 26 ++++++++------- synapse/storage/prepare_database.py | 2 +- .../delta/39/device_federation_stream_idx.sql | 16 +++++++++ synapse/storage/stream.py | 31 ++++++++++++++++++ synapse/util/jsonobject.py | 17 ++++++++-- 14 files changed, 185 insertions(+), 55 deletions(-) create mode 100644 synapse/storage/schema/delta/39/device_federation_stream_idx.sql (limited to 'synapse/util') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 7a4fec4a66..32113c175c 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -127,13 +127,6 @@ class FederationSenderServer(HomeServer): replication_url = self.config.worker_replication_url send_handler = self._get_send_handler() - def replicate(results): - stream = results.get("events") - if stream: - # max_stream_id = stream["position"] - # TODO - pass - while True: try: args = store.stream_positions() @@ -142,7 +135,6 @@ class FederationSenderServer(HomeServer): result = yield http_client.get_json(replication_url, args=args) yield store.process_replication(result) send_handler.process_replication(result) - replicate(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(30) @@ -242,16 +234,17 @@ class FederationSenderHandler(object): return {"federation": self._latest_room_serial} def process_replication(self, result): - stream = result.get("federation") - if stream: - self._latest_room_serial = int(stream["position"]) + fed_stream = result.get("federation") + if fed_stream: + self._latest_room_serial = int(fed_stream["position"]) presence_to_send = {} keyed_edus = {} edus = {} failures = {} + device_destinations = set() - for row in stream["rows"]: + for row in fed_stream["rows"]: position, typ, content_js = row content = json.loads(content_js) @@ -264,7 +257,9 @@ class FederationSenderHandler(object): key = content["key"] edu = Edu(**content["edu"]) - keyed_edus.setdefault(edu.destination, {})[key] = edu + keyed_edus.setdefault( + edu.destination, {} + )[(edu.destination, tuple(key))] = edu elif typ == send_queue.EDU_TYPE: edu = Edu(**content) @@ -274,6 +269,8 @@ class FederationSenderHandler(object): failure = content["failure"] failures.setdefault(destination, []).append(failure) + elif typ == send_queue.DEVICE_MESSAGE_TYPE: + device_destinations.add(content["destination"]) else: raise Exception("Unrecognised federation type: %r", typ) @@ -296,6 +293,14 @@ class FederationSenderHandler(object): for failure in failure_list: self.federation_sender.send_failure(destination, failure) + for destination in device_destinations: + self.federation_sender.send_device_messages(destination) + + event_stream = result.get("events") + if event_stream: + latest_pos = event_stream["position"] + self.federation_sender.notify_new_events(latest_pos) + if __name__ == '__main__': with LoggingContext("main"): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index d439be050a..3fc625c4dd 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -23,11 +23,13 @@ PRESENCE_TYPE = "p" KEYED_EDU_TYPE = "k" EDU_TYPE = "e" FAILURE_TYPE = "f" +DEVICE_MESSAGE_TYPE = "d" class FederationRemoteSendQueue(object): def __init__(self, hs): + self.server_name = hs.hostname self.clock = hs.get_clock() # TODO: Add metrics for size of lists below @@ -45,6 +47,8 @@ class FederationRemoteSendQueue(object): self.pos = 1 self.pos_time = sorteddict() + self.device_messages = sorteddict() + self.clock.looping_call(self._clear_queue, 30 * 1000) def _next_pos(self): @@ -111,6 +115,15 @@ class FederationRemoteSendQueue(object): for key in keys[:i]: del self.failures[key] + # Delete things out of device map + keys = self.device_messages.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.device_messages[key] + + def notify_new_events(self, current_id): + pass + def send_edu(self, destination, edu_type, content, key=None): pos = self._next_pos() @@ -122,6 +135,7 @@ class FederationRemoteSendQueue(object): ) if key: + assert isinstance(key, tuple) self.keyed_edu[(destination, key)] = edu self.keyed_edu_changed[pos] = (destination, key) else: @@ -148,9 +162,9 @@ class FederationRemoteSendQueue(object): # This gets sent down a separate path pass - def notify_new_device_message(self, destination): - # TODO - pass + def send_device_messages(self, destination): + pos = self._next_pos() + self.device_messages[pos] = destination def get_current_token(self): return self.pos - 1 @@ -188,11 +202,11 @@ class FederationRemoteSendQueue(object): i = keys.bisect_right(token) keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) - for (pos, edu_key) in keyed_edus: + for (pos, (destination, edu_key)) in keyed_edus: rows.append( (pos, KEYED_EDU_TYPE, ujson.dumps({ "key": edu_key, - "edu": self.keyed_edu[edu_key].get_dict(), + "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), })) ) @@ -202,7 +216,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict()))) + rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) # Fetch changed failures keys = self.failures.keys() @@ -210,11 +224,21 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:]) for (pos, (destination, failure)) in failures: - rows.append((pos, None, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, ujson.dumps({ "destination": destination, "failure": failure, }))) + # Fetch changed device messages + keys = self.device_messages.keys() + i = keys.bisect_right(token) + device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + + for (pos, destination) in device_messages: + rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + "destination": destination, + }))) + # Sort rows based on pos rows.sort() diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5d4f244377..aa664beead 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state import synapse.metrics @@ -54,6 +55,7 @@ class TransactionQueue(object): self.server_name = hs.hostname self.store = hs.get_datastore() + self.state = hs.get_state_handler() self.transaction_actions = TransactionActions(self.store) self.transport_layer = hs.get_federation_transport_client() @@ -103,6 +105,9 @@ class TransactionQueue(object): self._order = 1 + self._is_processing = False + self._last_token = 0 + def can_send_to(self, destination): """Can we send messages to the given server? @@ -123,6 +128,33 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") + @defer.inlineCallbacks + def notify_new_events(self, current_id): + if self._is_processing: + return + + try: + self._is_processing = True + while True: + self._last_token, events = yield self.store.get_all_new_events_stream( + self._last_token, current_id, limit=20, + ) + + if not events: + break + + for event in events: + users_in_room = yield self.state.get_current_user_in_room( + event.room_id, latest_event_ids=[event.event_id], + ) + + destinations = [ + get_domain_from_id(user_id) for user_id in users_in_room + ] + self.send_pdu(event, destinations) + finally: + self._is_processing = False + def send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81df45177a..fd09397226 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.push.action_generator import ActionGenerator from synapse.types import ( - UserID, RoomAlias, RoomStreamToken, get_domain_from_id + UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock from synapse.util.logcontext import preserve_fn @@ -599,13 +599,6 @@ class MessageHandler(BaseHandler): event_stream_id, max_stream_id ) - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = [ - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ] - @defer.inlineCallbacks def _notify(): yield run_on_reactor() @@ -618,7 +611,3 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - - preserve_fn(federation_handler.handle_new_event)( - event, destinations=destinations, - ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48653ae843..d528d1c1e0 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -143,6 +143,7 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.federation_sender = hs.get_federation_sender() self.state_handler = hs.get_state_handler() self.clock.looping_call( @@ -219,6 +220,7 @@ class Notifier(object): """Notify any user streams that are interested in this room event""" # poke any interested application service. self.appservice_handler.notify_interested_services(room_stream_id) + self.federation_sender.notify_new_events(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a77312ae34..e708811326 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -453,7 +453,7 @@ class ReplicationResource(Resource): ) upto_token = _position_from_rows(to_device_rows, current_position) writer.write_header_and_rows("to_device", to_device_rows, ( - "position", "user_id", "device_id", "message_json" + "position", "entity", ), position=upto_token) @defer.inlineCallbacks diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 373212d42d..cc860f9f9b 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -38,6 +38,7 @@ class SlavedDeviceInboxStore(BaseSlavedStore): get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__ delete_messages_for_device = DataStore.delete_messages_for_device.__func__ + delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__ def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() @@ -50,9 +51,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore): self._device_inbox_id_gen.advance(int(stream["position"])) for row in stream["rows"]: stream_id = row[0] - user_id = row[1] - self._device_inbox_stream_cache.entity_has_changed( - user_id, stream_id - ) + entity = row[1] + + if entity.startswith("@"): + self._device_inbox_stream_cache.entity_has_changed( + entity, stream_id + ) + else: + self._device_federation_outbox_stream_cache.entity_has_changed( + entity, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 0c26e96e98..ef8713b55d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -26,6 +26,11 @@ from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache import ujson as json +import logging + + +logger = logging.getLogger(__name__) + # So, um, we want to borrow a load of functions intended for reading from # a DataStore, but we don't want to take functions that either write to the @@ -180,6 +185,8 @@ class SlavedEventStore(BaseSlavedStore): EventFederationStore.__dict__["_get_forward_extremeties_for_room"] ) + get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() @@ -194,6 +201,10 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("events") if stream: self._stream_id_gen.advance(int(stream["position"])) + + if stream["rows"]: + logger.info("Got %d event rows", len(stream["rows"])) + for row in stream["rows"]: self._process_replication_row( row, backfilled=False, state_resets=state_resets diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index c459301b76..d92cea4ab1 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -25,8 +25,8 @@ class TransactionStore(BaseSlavedStore): ].orig _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ - def prep_send_transaction(self, transaction_id, destination, origin_server_ts): - return [] + prep_send_transaction = DataStore.prep_send_transaction.__func__ + delivered_txn = DataStore.delivered_txn.__func__ # For now, don't record the destination rety timings def set_destination_retry_timings(*args, **kwargs): diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index f640e73714..87398d60bc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore): return defer.succeed([]) def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id FROM device_inbox" + "SELECT stream_id, user_id" + " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY stream_id" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_pos, current_pos, limit)) - stream_ids = txn.fetchall() - if not stream_ids: - return [] - max_stream_id_in_limit = stream_ids[-1] + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() sql = ( - "SELECT stream_id, user_id, device_id, message_json" - " FROM device_inbox" + "SELECT stream_id, destination" + " FROM device_federation_outbox" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" ) - txn.execute(sql, (last_pos, max_stream_id_in_limit)) - return txn.fetchall() + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn.fetchall()) + + return rows return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6576a30098..e46ae6502e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 38 +SCHEMA_VERSION = 39 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql new file mode 100644 index 0000000000..00be801e90 --- /dev/null +++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 888b1cb35d..f34cb78f9a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -765,3 +765,34 @@ class StreamStore(SQLBaseStore): "token": end_token, }, } + + @defer.inlineCallbacks + def get_all_new_events_stream(self, from_id, current_id, limit): + """Get all new events""" + + def get_all_new_events_stream_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e" + " WHERE" + " ? < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (from_id, current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_all_new_events_stream", get_all_new_events_stream_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py index 3fd5c3d9fd..d668e5a6b8 100644 --- a/synapse/util/jsonobject.py +++ b/synapse/util/jsonobject.py @@ -76,15 +76,26 @@ class JsonEncodedObject(object): d.update(self.unrecognized_keys) return d + def get_internal_dict(self): + d = { + k: _encode(v, internal=True) for (k, v) in self.__dict__.items() + if k in self.valid_keys + } + d.update(self.unrecognized_keys) + return d + def __str__(self): return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) -def _encode(obj): +def _encode(obj, internal=False): if type(obj) is list: - return [_encode(o) for o in obj] + return [_encode(o, internal=internal) for o in obj] if isinstance(obj, JsonEncodedObject): - return obj.get_dict() + if internal: + return obj.get_internal_dict() + else: + return obj.get_dict() return obj -- cgit 1.5.1 From 90565d015e97a494f516cc6f06596ca5c6d490ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Nov 2016 17:45:44 +0000 Subject: Invalidate retry cache in both directions --- synapse/replication/expire_cache.py | 60 +++++++++++++++++++++++ synapse/replication/resource.py | 2 + synapse/replication/slave/storage/_base.py | 19 +++++++ synapse/replication/slave/storage/transactions.py | 9 ++-- synapse/storage/transactions.py | 48 +++++++++++++----- synapse/util/retryutils.py | 21 ++++---- 6 files changed, 132 insertions(+), 27 deletions(-) create mode 100644 synapse/replication/expire_cache.py (limited to 'synapse/util') diff --git a/synapse/replication/expire_cache.py b/synapse/replication/expire_cache.py new file mode 100644 index 0000000000..c05a50d7a6 --- /dev/null +++ b/synapse/replication/expire_cache.py @@ -0,0 +1,60 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET + + +class ExpireCacheResource(Resource): + """ + HTTP endpoint for expiring storage caches. + + POST /_synapse/replication/expire_cache HTTP/1.1 + Content-Type: application/json + + { + "invalidate": [ + { + "name": "func_name", + "keys": ["key1", "key2"] + } + ] + } + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.store = hs.get_datastore() + self.version_string = hs.version_string + self.clock = hs.get_clock() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + for row in content["invalidate"]: + name = row["name"] + keys = tuple(row["keys"]) + + getattr(self.store, name).invalidate(keys) + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index e708811326..b05ca62710 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource from synapse.replication.presence_resource import PresenceResource +from synapse.replication.expire_cache import ExpireCacheResource from synapse.api.errors import SynapseError from twisted.web.resource import Resource @@ -124,6 +125,7 @@ class ReplicationResource(Resource): self.putChild("remove_pushers", PusherResource(hs)) self.putChild("syncing_users", PresenceResource(hs)) + self.putChild("expire_cache", ExpireCacheResource(hs)) def render_GET(self, request): self._async_render_GET(request) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index f19540d6bb..18076e0f3b 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore): else: self._cache_id_gen = None + self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache" + self.http_client = hs.get_simple_http_client() + def stream_positions(self): pos = {} if self._cache_id_gen: @@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore): logger.info("Got unexpected cache_func: %r", cache_func) self._cache_id_gen.advance(int(stream["position"])) return defer.succeed(None) + + def _invalidate_cache_and_stream(self, txn, cache_func, keys): + txn.call_after(cache_func.invalidate, keys) + txn.call_after(self._send_invalidation_poke, cache_func, keys) + + @defer.inlineCallbacks + def _send_invalidation_poke(self, cache_func, keys): + try: + yield self.http_client.post_json_get_json(self.expire_cache_url, { + "invalidate": [{ + "name": cache_func.__name__, + "keys": list(keys), + }] + }) + except: + logger.exception("Failed to poke on expire_cache") diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index d92cea4ab1..fbb58f35da 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.storage.transactions import TransactionStore @@ -22,12 +21,10 @@ from synapse.storage.transactions import TransactionStore class TransactionStore(BaseSlavedStore): get_destination_retry_timings = TransactionStore.__dict__[ "get_destination_retry_timings" - ].orig + ] _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__ + _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__ prep_send_transaction = DataStore.prep_send_transaction.__func__ delivered_txn = DataStore.delivered_txn.__func__ - - # For now, don't record the destination rety timings - def set_destination_retry_timings(*args, **kwargs): - return defer.succeed(None) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index adab520c78..ee2efb0d36 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import cached from twisted.internet import defer @@ -200,25 +201,48 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) + self.database_engine.lock_table(txn, "destinations") - self._simple_upsert_txn( + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + + # We need to be careful here as the data may have changed from under us + # due to a worker setting the timings. + + prev_row = self._simple_select_one_txn( txn, - "destinations", + table="destinations", keyvalues={ "destination": destination, }, - values={ - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - }, - insertion_values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } + retcols=("retry_last_ts", "retry_interval"), + allow_none=True, ) + if not prev_row: + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) + elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + self._simple_update_one_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + updatevalues={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + ) + def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 49527f4d21..46ef5a8ec7 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -121,12 +121,6 @@ class RetryDestinationLimiter(object): pass def __exit__(self, exc_type, exc_val, exc_tb): - def err(failure): - logger.exception( - "Failed to store set_destination_retry_timings", - failure.value - ) - valid_err_code = False if exc_type is not None and issubclass(exc_type, CodeMessageException): valid_err_code = 0 <= exc_val.code < 500 @@ -151,6 +145,15 @@ class RetryDestinationLimiter(object): retry_last_ts = int(self.clock.time_msec()) - self.store.set_destination_retry_timings( - self.destination, retry_last_ts, self.retry_interval - ).addErrback(err) + @defer.inlineCallbacks + def store_retry_timings(): + try: + yield self.store.set_destination_retry_timings( + self.destination, retry_last_ts, self.retry_interval + ) + except: + logger.exception( + "Failed to store set_destination_retry_timings", + ) + + store_retry_timings() -- cgit 1.5.1 From aaecffba3a3d111d05a6d4310653168113385947 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Nov 2016 15:04:49 +0000 Subject: Correctly handle 500's and 429 on federation --- synapse/federation/transaction_queue.py | 7 +++++++ synapse/util/retryutils.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c94c74a67e..51b656d74a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -462,6 +462,13 @@ class TransactionQueue(object): code = e.code response = e.response + if e.code == 429 or 500 <= e.code: + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code + ) + raise e + logger.info( "TX [%s] {%s} got %d response", destination, txn_id, code diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 46ef5a8ec7..218029e8a8 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -123,7 +123,7 @@ class RetryDestinationLimiter(object): def __exit__(self, exc_type, exc_val, exc_tb): valid_err_code = False if exc_type is not None and issubclass(exc_type, CodeMessageException): - valid_err_code = 0 <= exc_val.code < 500 + valid_err_code = valid_err_code != 429 and 0 <= exc_val.code < 500 if exc_type is None or valid_err_code: # We connected successfully. -- cgit 1.5.1 From 11bfe438a2000bd6be3ac4c607b5455e1036be88 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Nov 2016 15:26:53 +0000 Subject: Use correct var --- synapse/util/retryutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 218029e8a8..e2de7fce91 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -123,7 +123,7 @@ class RetryDestinationLimiter(object): def __exit__(self, exc_type, exc_val, exc_tb): valid_err_code = False if exc_type is not None and issubclass(exc_type, CodeMessageException): - valid_err_code = valid_err_code != 429 and 0 <= exc_val.code < 500 + valid_err_code = exc_val.code != 429 and 0 <= exc_val.code < 500 if exc_type is None or valid_err_code: # We connected successfully. -- cgit 1.5.1 From fbaf868f621c2ecb6ea10679eb435f9adffa3b2a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Dec 2016 16:30:29 +0000 Subject: Correctly handle timeout errors --- synapse/notifier.py | 20 ++++++++++++++------ synapse/util/__init__.py | 7 ++++++- 2 files changed, 20 insertions(+), 7 deletions(-) (limited to 'synapse/util') diff --git a/synapse/notifier.py b/synapse/notifier.py index 40baa6969a..acbd4bb5ae 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext, preserve_fn @@ -320,6 +321,8 @@ class Notifier(object): listener.deferred, time_out=(end_time - now) / 1000. ) + except DeferredTimedOutError: + break except defer.CancelledError: break else: @@ -490,22 +493,27 @@ class Notifier(object): """ listener = _NotificationListener(None) - def timed_out(): - listener.deferred.cancel() + end_time = self.clock.time_msec() + timeout - timer = self.clock.call_later(timeout / 1000., timed_out) while True: listener.deferred = self.replication_deferred.observe() result = yield callback() if result: break + now = self.clock.time_msec() + if end_time <= now: + break + try: with PreserveLoggingContext(): - yield listener.deferred + yield self.clock.time_bound_deferred( + listener.deferred, + time_out=(end_time - now) / 1000. + ) + except DeferredTimedOutError: + break except defer.CancelledError: break - self.clock.cancel_call_later(timer, ignore_errs=True) - defer.returnValue(result) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c05b9450be..30fc480108 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -24,6 +24,11 @@ import logging logger = logging.getLogger(__name__) +class DeferredTimedOutError(SynapseError): + def __init__(self): + super(SynapseError).__init__(504, "Timed out") + + def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. failure.trap(defer.FirstError) @@ -89,7 +94,7 @@ class Clock(object): def timed_out_fn(): try: - ret_deferred.errback(SynapseError(504, "Timed out")) + ret_deferred.errback(DeferredTimedOutError()) except: pass -- cgit 1.5.1