diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 42 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 10 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 141 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 12 | ||||
-rw-r--r-- | synapse/handlers/message.py | 230 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 19 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 61 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 43 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 13 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 25 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 50 |
11 files changed, 398 insertions, 248 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..6cc2388306 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -18,7 +18,9 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, run_in_background, +) import logging @@ -84,11 +86,16 @@ class ApplicationServicesHandler(object): if not events: break + events_by_room = {} for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + @defer.inlineCallbacks + def handle_event(event): # Gather interested services services = yield self._get_services_for_event(event) if len(services) == 0: - continue # no services need notifying + return # no services need notifying # Do we know this user exists? If not, poke the user # query API for all services which match that user regex. @@ -108,9 +115,33 @@ class ApplicationServicesHandler(object): service, event ) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + yield make_deferred_yieldable(defer.gatherResults([ + run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False @@ -167,7 +198,10 @@ class ApplicationServicesHandler(object): services = yield self._get_services_for_3pn(protocol) results = yield make_deferred_yieldable(defer.DeferredList([ - preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) + run_in_background( + self.appservice_api.query_3pe, + service, kind, protocol, fields, + ) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..25aec624af 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -24,7 +24,7 @@ from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, ) from synapse.types import get_domain_from_id, UserID -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -139,9 +139,9 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(do_remote_query)(destination) + run_in_background(do_remote_query, destination) for destination in remote_queries_not_in_cache - ])) + ], consumeErrors=True)) defer.returnValue({ "device_keys": results, "failures": failures, @@ -242,9 +242,9 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) + run_in_background(claim_client_keys, destination) for destination in remote_queries - ])) + ], consumeErrors=True)) logger.info( "Claimed one-time-keys: %s", diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 080aca3d71..bab0db2904 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -15,8 +15,16 @@ # limitations under the License. """Contains handlers for federation events.""" + +import httplib +import itertools +import logging +import sys + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json +import six +from twisted.internet import defer from unpaddedbase64 import decode_base64 from ._base import BaseHandler @@ -43,10 +51,6 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.distributor import user_joined_room -from twisted.internet import defer - -import itertools -import logging logger = logging.getLogger(__name__) @@ -115,6 +119,19 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # do some initial sanity-checking of the event. In particular, make + # sure it doesn't have hundreds of prev_events or auth_events, which + # could cause a huge state resolution or cascade of event fetches. + try: + self._sanity_check_event(pdu) + except SynapseError as err: + raise FederationError( + "ERROR", + err.code, + err.msg, + affected=pdu.event_id, + ) + # If we are currently in the process of joining this room, then we # queue up events for later processing. if pdu.room_id in self.room_queues: @@ -149,10 +166,6 @@ class FederationHandler(BaseHandler): auth_chain = [] - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - fetch_state = False # Get missing pdus if necessary. @@ -168,7 +181,7 @@ class FederationHandler(BaseHandler): ) prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this @@ -196,8 +209,7 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: logger.info( @@ -248,8 +260,7 @@ class FederationHandler(BaseHandler): min_depth (int): Minimum depth of events to return. """ # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return @@ -361,9 +372,7 @@ class FederationHandler(BaseHandler): if auth_chain: event_ids |= {e.event_id for e in auth_chain} - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + seen_ids = yield self.store.have_seen_events(event_ids) if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication @@ -527,9 +536,16 @@ class FederationHandler(BaseHandler): def backfill(self, dest, room_id, limit, extremities): """ Trigger a backfill request to `dest` for the given `room_id` - This will attempt to get more events from the remote. This may return - be successfull and still return no events if the other side has no new - events to offer. + This will attempt to get more events from the remote. If the other side + has no new events to offer, this will return an empty list. + + As the events are received, we check their signatures, and also do some + sanity-checking on them. If any of the backfilled events are invalid, + this method throws a SynapseError. + + TODO: make this more useful to distinguish failures of the remote + server from invalid events (there is probably no point in trying to + re-fetch invalid events from every other HS in the room.) """ if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") @@ -541,6 +557,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # ideally we'd sanity check the events here for excess prev_events etc, + # but it's hard to reject events at this point without completely + # breaking backfill in the same way that it is currently broken by + # events whose signature we cannot verify (#3121). + # + # So for now we accept the events anyway. #3124 tracks this. + # + # for ev in events: + # self._sanity_check_event(ev) + # Don't bother processing events we already have. seen_events = yield self.store.have_events_in_timeline( set(e.event_id for e in events) @@ -613,7 +639,8 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -633,7 +660,7 @@ class FederationHandler(BaseHandler): failed_to_fetch = missing_auth - set(auth_events) - seen_events = yield self.store.have_events( + seen_events = yield self.store.have_seen_events( set(auth_events.keys()) | set(state_events.keys()) ) @@ -843,6 +870,38 @@ class FederationHandler(BaseHandler): defer.returnValue(False) + def _sanity_check_event(self, ev): + """ + Do some early sanity checks of a received event + + In particular, checks it doesn't have an excessive number of + prev_events or auth_events, which could cause a huge state resolution + or cascade of event fetches. + + Args: + ev (synapse.events.EventBase): event to be checked + + Returns: None + + Raises: + SynapseError if the event does not pass muster + """ + if len(ev.prev_events) > 20: + logger.warn("Rejecting event %s which has %i prev_events", + ev.event_id, len(ev.prev_events)) + raise SynapseError( + httplib.BAD_REQUEST, + "Too many prev_events", + ) + + if len(ev.auth_events) > 10: + logger.warn("Rejecting event %s which has %i auth_events", + ev.event_id, len(ev.auth_events)) + raise SynapseError( + httplib.BAD_REQUEST, + "Too many auth_events", + ) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -967,7 +1026,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1457,18 +1516,21 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area - logcontext.preserve_fn( - self.store.remove_push_actions_from_staging - )(event.event_id) - raise + tp, value, tb = sys.exc_info() + + logcontext.run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1482,7 +1544,8 @@ class FederationHandler(BaseHandler): """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), @@ -1736,7 +1799,8 @@ class FederationHandler(BaseHandler): event_key = None if event_auth_events - current_state: - have_events = yield self.store.have_events( + # TODO: can we use store.have_seen_events here instead? + have_events = yield self.store.get_seen_events_with_rejections( event_auth_events - current_state ) else: @@ -1759,12 +1823,12 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): + if e.event_id in seen_remotes: continue if e.event_id == event.event_id: @@ -1791,7 +1855,7 @@ class FederationHandler(BaseHandler): except AuthError: pass - have_events = yield self.store.have_events( + have_events = yield self.store.get_seen_events_with_rejections( [e_id for e_id, _ in event.auth_events] ) seen_events = set(have_events.keys()) @@ -1810,7 +1874,8 @@ class FederationHandler(BaseHandler): different_events = yield logcontext.make_deferred_yieldable( defer.gatherResults([ - logcontext.preserve_fn(self.store.get_event)( + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, @@ -1876,13 +1941,13 @@ class FederationHandler(BaseHandler): local_auth_chain, ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in result["auth_chain"]] ) # 3. Process any remote auth chain events we haven't seen. for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): + if ev.event_id in seen_remotes: continue if ev.event_id == event.event_id: diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..cd33a86599 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler): (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background( + self.store.get_recent_events_for_room, event.room_id, limit=limit, end_token=room_end_token, @@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, room_id, limit=limit, end_token=now_token.room_key, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6de6e13b7b..23502eda70 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -13,6 +13,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import simplejson +import sys + +from canonicaljson import encode_canonical_json +import six from twisted.internet import defer, reactor from twisted.python.failure import Failure @@ -25,7 +31,7 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter -from synapse.util.logcontext import preserve_fn, run_in_background +from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.stringutils import random_string @@ -34,12 +40,6 @@ from synapse.replication.http.send_event import send_event_to_master from ._base import BaseHandler -from canonicaljson import encode_canonical_json - -import logging -import random -import simplejson - logger = logging.getLogger(__name__) @@ -433,7 +433,7 @@ class EventCreationHandler(object): @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_event_ids=None): + prev_events_and_hashes=None): """ Given a dict from a client, create a new event. @@ -447,47 +447,52 @@ class EventCreationHandler(object): event_dict (dict): An entire event token_id (str) txn_id (str) - prev_event_ids (list): The prev event ids to use when creating the event + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. Returns: Tuple of created event (FrozenEvent), Context """ builder = self.event_builder_factory.new(event_dict) - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if token_id is not None: - builder.internal_metadata.token_id = token_id + if token_id is not None: + builder.internal_metadata.token_id = token_id - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id - event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) + event, context = yield self.create_new_client_event( + builder=builder, + requester=requester, + prev_events_and_hashes=prev_events_and_hashes, + ) defer.returnValue((event, context)) @@ -557,64 +562,76 @@ class EventCreationHandler(object): See self.create_event and self.send_nonmember_event. """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN + # We limit the number of concurrent event sends in a room so that we + # don't fork the DAG too much. If we don't limit then we can end up in + # a situation where event persistence can't keep up, causing + # extremities to pile up, which in turn leads to state resolution + # taking longer. + with (yield self.limiter.queue(event_dict["room_id"])): + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id ) - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, basestring): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) defer.returnValue(event) @measure_func("create_new_client_event") @defer.inlineCallbacks - def create_new_client_event(self, builder, requester=None, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, + def create_new_client_event(self, builder, requester=None, + prev_events_and_hashes=None): + """Create a new event for a local client + + Args: + builder (EventBuilder): + + requester (synapse.types.Requester|None): + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. + + Returns: + Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] + """ + + if prev_events_and_hashes is not None: + assert len(prev_events_and_hashes) <= 10, \ + "Attempting to create an event with %i prev_events" % ( + len(prev_events_and_hashes), ) + else: + prev_events_and_hashes = \ + yield self.store.get_prev_events_for_room(builder.room_id) - # We want to limit the max number of prev events we point to in our - # new event - if len(latest_ret) > 10: - # Sort by reverse depth, so we point to the most recent. - latest_ret.sort(key=lambda a: -a[2]) - new_latest_ret = latest_ret[:5] - - # We also randomly point to some of the older events, to make - # sure that we don't completely ignore the older events. - if latest_ret[5:]: - sample_size = min(5, len(latest_ret[5:])) - new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) - latest_ret = new_latest_ret - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 + if prev_events_and_hashes: + depth = max([d for _, _, d in prev_events_and_hashes]) + 1 + else: + depth = 1 - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in prev_events_and_hashes + ] builder.prev_events = prev_events builder.depth = depth @@ -713,8 +730,14 @@ class EventCreationHandler(object): except: # noqa: E722, as we reraise the exception this is fine. # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. - preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) - raise + tp, value, tb = sys.exc_info() + + run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) @defer.inlineCallbacks def persist_and_notify_client_event( @@ -834,22 +857,33 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + try: + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room event") - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(requester.user) + run_in_background(self._bump_active_time, requester.user) + + @defer.inlineCallbacks + def _bump_active_time(self, user): + try: + presence = self.hs.get_presence_handler() + yield presence.bump_presence_active_time(user) + except Exception: + logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a5e501897c..585f3e4da2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.async import Linearizer -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -255,6 +255,14 @@ class PresenceHandler(object): logger.info("Finished _persist_unpersisted_changes") @defer.inlineCallbacks + def _update_states_and_catch_exception(self, new_states): + try: + res = yield self._update_states(new_states) + defer.returnValue(res) + except Exception: + logger.exception("Error updating presence") + + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state @@ -364,7 +372,7 @@ class PresenceHandler(object): now=now, ) - preserve_fn(self._update_states)(changes) + run_in_background(self._update_states_and_catch_exception, changes) except Exception: logger.exception("Exception in _handle_timeouts loop") @@ -422,20 +430,23 @@ class PresenceHandler(object): @defer.inlineCallbacks def _end(): - if affect_presence: + try: self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) yield self._update_states([prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec(), )]) + except Exception: + logger.exception("Error updating presence after sync") @contextmanager def _user_syncing(): try: yield finally: - preserve_fn(_end)() + if affect_presence: + run_in_background(_end) defer.returnValue(_user_syncing()) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 3f215c2b4e..2e0672161c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler): """Given a list of receipts, works out which remote servers should be poked and pokes them. """ - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, + try: + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - } + }, }, - }, - key=(room_id, receipt_type, user_id), - ) + key=(room_id, receipt_type, user_id), + ) + except Exception: + logger.exception("Error pushing receipts to remote servers") @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5d81f59b44..add3f9b009 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -20,7 +20,6 @@ from ._base import BaseHandler from synapse.api.constants import ( EventTypes, JoinRules, ) -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.async import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache @@ -44,8 +43,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs) - self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) + self.response_cache = ResponseCache(hs, "room_list") + self.remote_response_cache = ResponseCache(hs, "remote_room_list", + timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, @@ -77,18 +77,11 @@ class RoomListHandler(BaseHandler): ) key = (limit, since_token, network_tuple) - result = self.response_cache.get(key) - if not result: - logger.info("No cached result, calculating one.") - result = self.response_cache.set( - key, - preserve_fn(self._get_public_room_list)( - limit, since_token, network_tuple=network_tuple - ) - ) - else: - logger.info("Using cached deferred result.") - return make_deferred_yieldable(result) + return self.response_cache.wrap( + key, + self._get_public_room_list, + limit, since_token, network_tuple=network_tuple, + ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, @@ -422,18 +415,14 @@ class RoomListHandler(BaseHandler): server_name, limit, since_token, include_all_networks, third_party_instance_id, ) - result = self.remote_response_cache.get(key) - if not result: - result = self.remote_response_cache.set( - key, - repl_layer.get_public_rooms( - server_name, limit=limit, since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - ) - return result + return self.remote_response_cache.wrap( + key, + repl_layer.get_public_rooms, + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index c45142d38d..714583f1d5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -149,7 +149,7 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, - prev_event_ids, + prev_events_and_hashes, txn_id=None, ratelimit=True, content=None, @@ -175,7 +175,7 @@ class RoomMemberHandler(object): }, token_id=requester.access_token_id, txn_id=txn_id, - prev_event_ids=prev_event_ids, + prev_events_and_hashes=prev_events_and_hashes, ) # Check if this event matches the previous membership event for the user. @@ -314,7 +314,12 @@ class RoomMemberHandler(object): 403, "Invites have been disabled on this server", ) - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + prev_events_and_hashes = yield self.store.get_prev_events_for_room( + room_id, + ) + latest_event_ids = ( + event_id for (event_id, _, _) in prev_events_and_hashes + ) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, ) @@ -403,7 +408,7 @@ class RoomMemberHandler(object): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, - prev_event_ids=latest_event_ids, + prev_events_and_hashes=prev_events_and_hashes, content=content, ) defer.returnValue(res) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0f713ce038..b52e4c2aff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -15,7 +15,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute -from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user @@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ to tell if room needs to be part of the sync result. """ return bool(self.events) + __bool__ = __nonzero__ # python3 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ @@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ # nb the notification count does not, er, count: if there's nothing # else in the result, we don't need to send it. ) + __bool__ = __nonzero__ # python3 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ @@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ or self.state or self.account_data ) + __bool__ = __nonzero__ # python3 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ @@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ def __nonzero__(self): """Invited rooms should always be reported to the client""" return True + __bool__ = __nonzero__ # python3 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ @@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ def __nonzero__(self): return bool(self.join or self.invite or self.leave) + __bool__ = __nonzero__ # python3 class DeviceLists(collections.namedtuple("DeviceLists", [ @@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [ def __nonzero__(self): return bool(self.changed or self.left) + __bool__ = __nonzero__ # python3 class SyncResult(collections.namedtuple("SyncResult", [ @@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.device_lists or self.groups ) + __bool__ = __nonzero__ # python3 class SyncHandler(object): @@ -169,7 +176,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs) + self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, @@ -180,15 +187,11 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - result = self.response_cache.get(sync_config.request_key) - if not result: - result = self.response_cache.set( - sync_config.request_key, - preserve_fn(self._wait_for_sync_for_user)( - sync_config, since_token, timeout, full_state - ) - ) - return make_deferred_yieldable(result) + return self.response_cache.wrap( + sync_config.request_key, + self._wait_for_sync_for_user, + sync_config, since_token, timeout, full_state, + ) @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..5d9736e88f 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id @@ -97,7 +97,8 @@ class TypingHandler(object): if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - preserve_fn(self._push_remote)( + run_in_background( + self._push_remote, member=member, typing=True ) @@ -196,7 +197,7 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - preserve_fn(self._push_remote)(member, typing) + run_in_background(self._push_remote, member, typing) self._push_update_local( member=member, @@ -205,28 +206,31 @@ class TypingHandler(object): @defer.inlineCallbacks def _push_remote(self, member, typing): - users = yield self.state.get_current_user_in_room(member.room_id) - self._member_last_federation_poke[member] = self.clock.time_msec() + try: + users = yield self.state.get_current_user_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() - now = self.clock.time_msec() - self.wheel_timer.insert( - now=now, - obj=member, - then=now + FEDERATION_PING_INTERVAL, - ) + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, + ) - for domain in set(get_domain_from_id(u) for u in users): - if domain != self.server_name: - self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": member.room_id, - "user_id": member.user_id, - "typing": typing, - }, - key=member, - ) + for domain in set(get_domain_from_id(u) for u in users): + if domain != self.server_name: + self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) + except Exception: + logger.exception("Error pushing typing notif to remotes") @defer.inlineCallbacks def _recv_edu(self, origin, content): |