From 0ac2a79faa918280767c18e4db7ec29d7d3a3afb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Feb 2015 17:24:14 +0000 Subject: Initial stab at implementing a batched get_missing_pdus request --- synapse/handlers/federation.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0eb2ff95ca..26bdc6d1a7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -581,12 +581,13 @@ class FederationHandler(BaseHandler): defer.returnValue(event) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id): + def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): yield run_on_reactor() - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") + if do_auth: + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") state_groups = yield self.store.get_state_groups( [event_id] -- cgit 1.5.1 From db215b7e0007a207b8775d78c6693153e16f2731 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Feb 2015 13:58:02 +0000 Subject: Implement and use new batched get missing pdu --- synapse/federation/federation_client.py | 19 ++++ synapse/federation/federation_server.py | 150 +++++++++++--------------------- synapse/federation/transaction_queue.py | 2 +- synapse/federation/transport/client.py | 19 ++++ synapse/federation/transport/server.py | 31 +++++++ synapse/handlers/federation.py | 23 +++++ 6 files changed, 144 insertions(+), 100 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index cd3c962d50..ca89a0787c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -439,6 +439,25 @@ class FederationClient(FederationBase): defer.returnValue(ret) + @defer.inlineCallbacks + def get_missing_events(self, destination, room_id, earliest_events, + latest_events, limit, min_depth): + content = yield self.transport_layer.get_missing_events( + destination, room_id, earliest_events, latest_events, limit, + min_depth, + ) + + events = [ + self.event_from_pdu_json(e) + for e in content.get("events", []) + ] + + signed_events = yield self._check_sigs_and_hash_and_fetch( + destination, events, outlier=True + ) + + defer.returnValue(signed_events) + def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( pdu_json diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34bc397e8a..f74e16abd5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -142,7 +142,15 @@ class FederationServer(FederationBase): if r[0]: ret.append({}) else: - logger.exception(r[1]) + failure = r[1] + logger.error( + "Failed to handle PDU", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) @@ -306,75 +314,17 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - def get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit, min_depth): - limit = max(limit, 50) - min_depth = max(min_depth, 0) - - missing_events = yield self.store.get_missing_events( - room_id=room_id, - earliest_events=earliest_events, - latest_events=latest_events, - limit=limit, - min_depth=min_depth, + @log_function + def on_get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + missing_events = yield self.handler.on_get_missing_events( + origin, room_id, earliest_events, latest_events, limit, min_depth ) - known_ids = {e.event_id for e in missing_events} | {earliest_events} - - back_edges = { - e for e in missing_events - if {i for i, h in e.prev_events.items()} <= known_ids - } - - decoded_auth_events = set() - state = {} - auth_events = set() - auth_and_state = {} - for event in back_edges: - state_pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event.event_id, - do_auth=False, - ) - - state[event.event_id] = [s.event_id for s in state_pdus] - - auth_and_state.update({ - s.event_id: s for s in state_pdus - }) - - state_ids = {pdu.event_id for pdu in state_pdus} - prev_ids = {i for i, h in event.prev_events.items()} - partial_auth_chain = yield self.store.get_auth_chain( - state_ids | prev_ids, have_ids=decoded_auth_events.keys() - ) - - for p in partial_auth_chain: - p.signatures.update( - compute_event_signature( - p, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - - auth_events.update( - a.event_id for a in partial_auth_chain - ) - - auth_and_state.update({ - a.event_id: a for a in partial_auth_chain - }) - time_now = self._clock.time_msec() defer.returnValue({ "events": [ev.get_pdu_json(time_now) for ev in missing_events], - "state_for_events": state, - "auth_events": auth_events, - "event_map": { - k: ev.get_pdu_json(time_now) - for k, ev in auth_and_state.items() - }, }) @log_function @@ -403,7 +353,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, origin, pdu, max_recursion=10): + def _handle_new_pdu(self, origin, pdu, get_missing=True): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu( origin, pdu.event_id, do_auth=False @@ -455,48 +405,50 @@ class FederationServer(FederationBase): pdu.room_id, min_depth ) + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this # message, to work around the fact that some events will # reference really really old events we really don't want to # send to the clients. pdu.internal_metadata.outlier = True - elif min_depth and pdu.depth > min_depth and max_recursion > 0: - for event_id, hashes in pdu.prev_events: - if event_id not in have_seen: - logger.debug( - "_handle_new_pdu requesting pdu %s", - event_id + elif min_depth and pdu.depth > min_depth: + if get_missing and prevs - seen: + latest_tuples = yield self.store.get_latest_events_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(e_id for e_id, _, _ in latest_tuples) + latest |= seen + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events=list(latest), + latest_events=[pdu.event_id], + limit=10, + min_depth=min_depth, + ) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False ) - try: - new_pdu = yield self.federation_client.get_pdu( - [origin, pdu.origin], - event_id=event_id, - ) - - if new_pdu: - yield self._handle_new_pdu( - origin, - new_pdu, - max_recursion=max_recursion-1 - ) - - logger.debug("Processed pdu %s", event_id) - else: - logger.warn("Failed to get PDU %s", event_id) - fetch_state = True - except: - # TODO(erikj): Do some more intelligent retries. - logger.exception("Failed to get PDU") - fetch_state = True - else: - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - if prevs - seen: - fetch_state = True - else: - fetch_state = True + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True if fetch_state: # We need to get the state at this event, since we haven't diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7d30c924d1..8f1acbe590 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -287,7 +287,7 @@ class TransactionQueue(object): code = 200 if response: - for e_id, r in getattr(response, "pdus", {}).items(): + for e_id, r in response.get("pdus", {}).items(): if "error" in r: logger.warn( "Transaction returned error for %s: %s", diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 8b137e7128..80d03012b7 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -219,3 +219,22 @@ class TransportLayerClient(object): ) defer.returnValue(content) + + @defer.inlineCallbacks + @log_function + def get_missing_events(self, destination, room_id, earliest_events, + latest_events, limit, min_depth): + path = PREFIX + "/get_missing_events/%s" % (room_id,) + + content = yield self.client.post_json( + destination=destination, + path=path, + data={ + "limit": int(limit), + "min_depth": int(min_depth), + "earliest_events": earliest_events, + "latest_events": latest_events, + } + ) + + defer.returnValue(content) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2ffb37aa18..ad75c8ddb7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -234,6 +234,7 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( "POST", re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), @@ -245,6 +246,17 @@ class TransportLayerServer(object): ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"), + self._with_authentication( + lambda origin, content, query, room_id: + self._get_missing_events( + origin, content, room_id, + ) + ) + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -344,3 +356,22 @@ class TransportLayerServer(object): ) defer.returnValue((200, new_content)) + + @defer.inlineCallbacks + @log_function + def _get_missing_events(self, origin, content, room_id): + limit = int(content.get("limit", 10)) + min_depth = int(content.get("min_depth", 0)) + earliest_events = content.get("earliest_events", []) + latest_events = content.get("latest_events", []) + + content = yield self.request_handler.on_get_missing_events( + origin, + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + min_depth=min_depth, + limit=limit, + ) + + defer.returnValue((200, content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 26bdc6d1a7..628e62f8b1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -789,6 +789,29 @@ class FederationHandler(BaseHandler): defer.returnValue(ret) + @defer.inlineCallbacks + def on_get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + in_room = yield self.auth.check_host_in_room( + room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + + limit = min(limit, 20) + min_depth = max(min_depth, 0) + + missing_events = yield self.store.get_missing_events( + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + limit=limit, + min_depth=min_depth, + ) + + defer.returnValue(missing_events) + @defer.inlineCallbacks @log_function def do_auth(self, origin, event, context, auth_events): -- cgit 1.5.1 From 443ba4eecc59e3dbddfaf5b925408b40e3581800 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 24 Feb 2015 15:00:12 +0000 Subject: %s for strings otherwise you end up sending 'u"foo"' --- synapse/handlers/directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 20ab9e269c..11d20a5d2d 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -160,7 +160,7 @@ class DirectoryHandler(BaseHandler): if not room_id: raise SynapseError( 404, - "Room alias %r not found" % (room_alias.to_string(),), + "Room alias %s not found" % (room_alias.to_string(),), Codes.NOT_FOUND ) -- cgit 1.5.1 From 2d20466f9a1349c97d5a3822eb4ee64f19bbdf27 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 15:00:59 +0000 Subject: Add stub functions and work out execution flow to implement AS event stream polling. --- synapse/handlers/events.py | 3 --- synapse/handlers/room.py | 34 +++++++++++++++++++++++++--------- synapse/storage/appservice.py | 19 +++++++++++++++++++ synapse/storage/stream.py | 21 +++++++++++++++++++++ 4 files changed, 65 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 025e7e7e62..8d5f5c8499 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - if pagin_config.from_token is None: - pagin_config.from_token = None - rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 914742d913..a8b0c95636 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler): def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]): """Returns a list of roomids that the user has any of the given membership states in.""" - rooms = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user.to_string(), membership_list=membership_list + + app_service = yield self.store.get_app_service_by_user_id( + user.to_string() ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + else: + rooms = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user.to_string(), membership_list=membership_list + ) # For some reason the list of events contains duplicates # TODO(paul): work out why because I really don't think it should @@ -559,13 +566,22 @@ class RoomEventSource(object): to_key = yield self.get_current_key() - events, end_key = yield self.store.get_room_events_stream( - user_id=user.to_string(), - from_key=from_key, - to_key=to_key, - room_id=None, - limit=limit, - ) + app_service = self.store.get_app_service_by_user_id(user.to_string()) + if app_service: + events, end_key = yield self.store.get_appservice_room_stream( + service=app_service, + from_key=from_key, + to_key=to_key, + limit=limit, + ) + else: + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_key, + to_key=to_key, + room_id=None, + limit=limit, + ) defer.returnValue((events, end_key)) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index dc3666efd4..435ccfd6fc 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.appservice import ApplicationService +from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -150,6 +151,16 @@ class ApplicationServiceStore(SQLBaseStore): yield self.cache_defer # make sure the cache is ready defer.returnValue(self.services_cache) + @defer.inlineCallbacks + def get_app_service_by_user_id(self, user_id): + yield self.cache_defer # make sure the cache is ready + + for service in self.services_cache: + if service.sender == user_id: + defer.returnValue(service) + return + defer.returnValue(None) + @defer.inlineCallbacks def get_app_service_by_token(self, token, from_cache=True): """Get the application service with the given token. @@ -173,6 +184,14 @@ class ApplicationServiceStore(SQLBaseStore): # TODO: The from_cache=False impl # TODO: This should be JOINed with the application_services_regex table. + @defer.inlineCallbacks + def get_app_service_rooms(self, service): + logger.info("get_app_service_rooms -> %s", service) + + # TODO stub + yield self.cache_defer + defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) + @defer.inlineCallbacks def _populate_cache(self): """Populates the ApplicationServiceCache from the database.""" diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3ccb6f8a61..aa3c9f8c9c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -127,6 +127,27 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + + def get_appservice_room_stream(self, service, from_key, to_key, limit=0): + # NB this lives here instead of appservice.py so we can reuse the + # 'private' StreamToken class in this file. + logger.info("get_appservice_room_stream -> %s", service) + + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) + + if from_key == to_key: + return defer.succeed(([], to_key)) + + # TODO stub + return defer.succeed(([], to_key)) + @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): -- cgit 1.5.1 From dcec7175dc30754603618351b59bc72ff41d305b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 16:23:01 +0000 Subject: Finish impl to get new events for AS. ASes should now be able to poll /events --- synapse/handlers/room.py | 4 ++- synapse/storage/stream.py | 62 +++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 58 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a8b0c95636..80f7ee3f12 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -566,7 +566,9 @@ class RoomEventSource(object): to_key = yield self.get_current_key() - app_service = self.store.get_app_service_by_user_id(user.to_string()) + app_service = yield self.store.get_app_service_by_user_id( + user.to_string() + ) if app_service: events, end_key = yield self.store.get_appservice_room_stream( service=app_service, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3c8f3320f1..6946e9fe70 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the # 'private' StreamToken class in this file. - logger.info("get_appservice_room_stream -> %s", service) - if limit: limit = max(limit, MAX_STREAM_SIZE) else: limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - # from_id = _StreamToken.parse_stream_token(from_key) - # to_id = _StreamToken.parse_stream_token(to_key) + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: - return defer.succeed(([], to_key)) + defer.returnValue(([], to_key)) + return + + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is used + # for the Notifier listener rooms). We can't reasonably make a SQL + # query for these room IDs, so we'll pull all the events between from/to + # and filter in python. + rooms_for_as = yield self.get_app_service_rooms(service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + # select all the events between from/to with a sensible limit + sql = ( + "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e " + "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "limit": limit + } + + def f(txn): + txn.execute(sql, (from_id.stream, to_id.stream,)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + # apply the filter on the room id list + [ + r["event_id"] for r in rows + if r["room_id"] in room_ids_for_as + ], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + return ret, key - # TODO stub - return defer.succeed(([], to_key)) + results = yield self.runInteraction("get_appservice_room_stream", f) + defer.returnValue(results) @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, -- cgit 1.5.1 From 58ff0660642e6ef8f9fe5672fb50e5d75a569479 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 27 Feb 2015 13:51:41 +0000 Subject: Implement exclusive namespace checks. --- synapse/handlers/directory.py | 14 ++++++++++++-- synapse/handlers/register.py | 11 ++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 11d20a5d2d..f76febee8f 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -232,13 +232,23 @@ class DirectoryHandler(BaseHandler): @defer.inlineCallbacks def can_modify_alias(self, alias, user_id=None): + # Any application service "interested" in an alias they are regexing on + # can modify the alias. + # Users can only modify the alias if ALL the interested services have + # non-exclusive locks on the alias (or there are no interested services) services = yield self.store.get_app_services() interested_services = [ s for s in services if s.is_interested_in_alias(alias.to_string()) ] + for service in interested_services: if user_id == service.sender: - # this user IS the app service + # this user IS the app service so they can do whatever they like defer.returnValue(True) return - defer.returnValue(len(interested_services) == 0) + elif service.is_exclusive_alias(alias.to_string()): + # another service has an exclusive lock on this alias. + defer.returnValue(False) + return + # either no interested services, or no service with an exclusive lock + defer.returnValue(True) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 516a936cee..cda4a8502a 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -201,11 +201,12 @@ class RegistrationHandler(BaseHandler): interested_services = [ s for s in services if s.is_interested_in_user(user_id) ] - if len(interested_services) > 0: - raise SynapseError( - 400, "This user ID is reserved by an application service.", - errcode=Codes.EXCLUSIVE - ) + for service in interested_services: + if service.is_exclusive_user(user_id): + raise SynapseError( + 400, "This user ID is reserved by an application service.", + errcode=Codes.EXCLUSIVE + ) def _generate_token(self, user_id): # urlsafe variant uses _ and - so use . as the separator and replace -- cgit 1.5.1 From 130df8fb01bce1bf244940131d85d1c1a54ed5ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Mar 2015 10:25:36 +0000 Subject: Add some randomness to the user specified timeout on event streams to mitigate against thundering herds problems --- synapse/handlers/events.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 8d5f5c8499..d3297b7292 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -23,6 +23,7 @@ from synapse.events.utils import serialize_event from ._base import BaseHandler import logging +import random logger = logging.getLogger(__name__) @@ -72,6 +73,14 @@ class EventStreamHandler(BaseHandler): rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) + if timeout: + # If they've set a timeout set a minimum limit. + timeout = max(timeout, 500) + + # Add some randomness to this value to try and mitigate against + # thundering herds on restart. + timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) + with PreserveLoggingContext(): events, tokens = yield self.notifier.get_events_for( auth_user, room_ids, pagin_config, timeout -- cgit 1.5.1 From 9ccccd4874fe827baa2583829602663d1936b0ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Mar 2015 16:24:05 +0000 Subject: When setting display name more graciously handle failures to update room state. --- synapse/handlers/profile.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 03b2159c53..2ddf9d5378 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -212,10 +212,16 @@ class ProfileHandler(BaseHandler): ) msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_event({ - "type": EventTypes.Member, - "room_id": j.room_id, - "state_key": user.to_string(), - "content": content, - "sender": user.to_string() - }, ratelimit=False) + try: + yield msg_handler.create_and_send_event({ + "type": EventTypes.Member, + "room_id": j.room_id, + "state_key": user.to_string(), + "content": content, + "sender": user.to_string() + }, ratelimit=False) + except Exception as e: + logger.warn( + "Failed to update join event for room %s - %s", + j.room_id, str(e.message) + ) -- cgit 1.5.1