diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 17 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 151 | ||||
-rw-r--r-- | synapse/handlers/message.py | 25 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 10 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 44 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 23 |
7 files changed, 181 insertions, 91 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f0f89af7dc..17eedf4dbf 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -28,6 +28,7 @@ from synapse.metrics import ( event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import log_failure from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -36,17 +37,6 @@ logger = logging.getLogger(__name__) events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "") -def log_failure(failure): - logger.error( - "Application Services Failure", - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject() - ) - ) - - class ApplicationServicesHandler(object): def __init__(self, hs): @@ -112,7 +102,10 @@ class ApplicationServicesHandler(object): if not self.started_scheduler: def start_scheduler(): - return self.scheduler.start().addErrback(log_failure) + return self.scheduler.start().addErrback( + log_failure, "Application Services Failure", + ) + run_as_background_process("as_scheduler", start_scheduler) self.started_scheduler = True diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 578e9250fb..9dc46aa15f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -341,7 +341,7 @@ class E2eKeysHandler(object): def _exception_to_failure(e): if isinstance(e, CodeMessageException): return { - "status": e.code, "message": e.message, + "status": e.code, "message": str(e), } if isinstance(e, NotRetryingDestination): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ccdc3bfa7..45d955e6f5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -18,7 +18,6 @@ import itertools import logging -import sys import six from six import iteritems, itervalues @@ -106,7 +105,7 @@ class FederationHandler(BaseHandler): self.hs = hs - self.store = hs.get_datastore() + self.store = hs.get_datastore() # type: synapse.storage.DataStore self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname @@ -323,14 +322,22 @@ class FederationHandler(BaseHandler): affected=pdu.event_id, ) - # Calculate the state of the previous events, and - # de-conflict them to find the current state. - state_groups = [] + # Calculate the state after each of the previous events, and + # resolve them to find the correct state at the current event. auth_chains = set() + event_map = { + event_id: pdu, + } try: # Get the state of the events we know about - ours = yield self.store.get_state_groups(room_id, list(seen)) - state_groups.append(ours) + ours = yield self.store.get_state_groups_ids(room_id, seen) + + # state_maps is a list of mappings from (type, state_key) to event_id + # type: list[dict[tuple[str, str], str]] + state_maps = list(ours.values()) + + # we don't need this any more, let's delete it. + del ours # Ask the remote server for the states we don't # know about @@ -339,27 +346,65 @@ class FederationHandler(BaseHandler): "[%s %s] Requesting state at missing prev_event %s", room_id, event_id, p, ) - state, got_auth_chain = ( - yield self.federation_client.get_state_for_room( - origin, room_id, p, + + with logcontext.nested_logging_context(p): + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. + remote_state, got_auth_chain = ( + yield self.federation_client.get_state_for_room( + origin, room_id, p, + ) ) - ) - auth_chains.update(got_auth_chain) - state_group = {(x.type, x.state_key): x.event_id for x in state} - state_groups.append(state_group) + + # we want the state *after* p; get_state_for_room returns the + # state *before* p. + remote_event = yield self.federation_client.get_pdu( + [origin], p, outlier=True, + ) + + if remote_event is None: + raise Exception( + "Unable to get missing prev_event %s" % (p, ) + ) + + if remote_event.is_state(): + remote_state.append(remote_event) + + # XXX hrm I'm not convinced that duplicate events will compare + # for equality, so I'm not sure this does what the author + # hoped. + auth_chains.update(got_auth_chain) + + remote_state_map = { + (x.type, x.state_key): x.event_id for x in remote_state + } + state_maps.append(remote_state_map) + + for x in remote_state: + event_map[x.event_id] = x # Resolve any conflicting state + @defer.inlineCallbacks def fetch(ev_ids): - return self.store.get_events( - ev_ids, get_prev_content=False, check_redacted=False + fetched = yield self.store.get_events( + ev_ids, get_prev_content=False, check_redacted=False, ) + # add any events we fetch here to the `event_map` so that we + # can use them to build the state event list below. + event_map.update(fetched) + defer.returnValue(fetched) room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_groups, {event_id: pdu}, fetch + room_version, state_maps, event_map, fetch, ) - state = (yield self.store.get_events(state_map.values())).values() + # we need to give _process_received_pdu the actual state events + # rather than event ids, so generate that now. + state = [ + event_map[e] for e in six.itervalues(state_map) + ] auth_chain = list(auth_chains) except Exception: logger.warn( @@ -483,20 +528,21 @@ class FederationHandler(BaseHandler): "[%s %s] Handling received prev_event %s", room_id, event_id, ev.event_id, ) - try: - yield self.on_receive_pdu( - origin, - ev, - sent_to_us_directly=False, - ) - except FederationError as e: - if e.code == 403: - logger.warn( - "[%s %s] Received prev_event %s failed history check.", - room_id, event_id, ev.event_id, + with logcontext.nested_logging_context(ev.event_id): + try: + yield self.on_receive_pdu( + origin, + ev, + sent_to_us_directly=False, ) - else: - raise + except FederationError as e: + if e.code == 403: + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) + else: + raise @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): @@ -572,6 +618,10 @@ class FederationHandler(BaseHandler): }) seen_ids.add(e.event_id) + logger.info( + "[%s %s] persisting newly-received auth/state events %s", + room_id, event_id, [e["event"].event_id for e in event_infos] + ) yield self._handle_new_events(origin, event_infos) try: @@ -1135,7 +1185,8 @@ class FederationHandler(BaseHandler): try: logger.info("Processing queued PDU %s which was received " "while we were joining %s", p.event_id, p.room_id) - yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) + with logcontext.nested_logging_context(p.event_id): + yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) except Exception as e: logger.warn( "Error handling queued PDU %s from %s: %s", @@ -1550,6 +1601,9 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) + # reraise does not allow inlineCallbacks to preserve the stacktrace, so we + # hack around with a try/finally instead. + success = False try: if not event.internal_metadata.is_outlier() and not backfilled: yield self.action_generator.handle_push_actions_for_event( @@ -1560,15 +1614,13 @@ class FederationHandler(BaseHandler): [(event, context)], backfilled=backfilled, ) - except: # noqa: E722, as we reraise the exception this is fine. - tp, value, tb = sys.exc_info() - - logcontext.run_in_background( - self.store.remove_push_actions_from_staging, - event.event_id, - ) - - six.reraise(tp, value, tb) + success = True + finally: + if not success: + logcontext.run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) defer.returnValue(context) @@ -1581,15 +1633,22 @@ class FederationHandler(BaseHandler): Notifies about the events where appropriate. """ - contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background( - self._prep_event, + + @defer.inlineCallbacks + def prep(ev_info): + event = ev_info["event"] + with logcontext.nested_logging_context(suffix=event.event_id): + res = yield self._prep_event( origin, - ev_info["event"], + event, state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) + defer.returnValue(res) + + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.run_in_background(prep, ev_info) for ev_info in event_infos ], consumeErrors=True, )) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e484061cc0..4954b23a0d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -14,9 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import sys -import six from six import iteritems, itervalues, string_types from canonicaljson import encode_canonical_json, json @@ -624,6 +622,9 @@ class EventCreationHandler(object): event, context ) + # reraise does not allow inlineCallbacks to preserve the stacktrace, so we + # hack around with a try/finally instead. + success = False try: # If we're a worker we need to hit out to the master. if self.config.worker_app: @@ -636,6 +637,7 @@ class EventCreationHandler(object): ratelimit=ratelimit, extra_users=extra_users, ) + success = True return yield self.persist_and_notify_client_event( @@ -645,17 +647,16 @@ class EventCreationHandler(object): ratelimit=ratelimit, extra_users=extra_users, ) - except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area, if we calculated them. - tp, value, tb = sys.exc_info() - - run_in_background( - self.store.remove_push_actions_from_staging, - event.event_id, - ) - six.reraise(tp, value, tb) + success = True + finally: + if not success: + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) @defer.inlineCallbacks def persist_and_notify_client_event( diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 75b8b7ce6a..1dfbde84fd 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -142,10 +142,8 @@ class BaseProfileHandler(BaseHandler): if e.code != 404: logger.exception("Failed to get displayname") raise - except Exception: - logger.exception("Failed to get displayname") - else: - defer.returnValue(result["displayname"]) + + defer.returnValue(result["displayname"]) @defer.inlineCallbacks def set_displayname(self, target_user, requester, new_displayname, by_admin=False): @@ -199,8 +197,6 @@ class BaseProfileHandler(BaseHandler): if e.code != 404: logger.exception("Failed to get avatar_url") raise - except Exception: - logger.exception("Failed to get avatar_url") defer.returnValue(result["avatar_url"]) @@ -278,7 +274,7 @@ class BaseProfileHandler(BaseHandler): except Exception as e: logger.warn( "Failed to update join event for room %s - %s", - room_id, str(e.message) + room_id, str(e) ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c7d69d9d80..351892a94f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,8 @@ import logging from six import iteritems, itervalues +from prometheus_client import Counter + from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -36,6 +38,19 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) + +# Counts the number of times we returned a non-empty sync. `type` is one of +# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is +# "true" or "false" depending on if the request asked for lazy loaded members or +# not. +non_empty_sync_counter = Counter( + "synapse_handlers_sync_nonempty_total", + "Count of non empty sync responses. type is initial_sync/full_state_sync" + "/incremental_sync. lazy_loaded indicates if lazy loaded members were " + "enabled for that request.", + ["type", "lazy_loaded"], +) + # Store the cache that tracks which lazy-loaded members have been sent to a given # client for no more than 30 minutes. LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 @@ -227,14 +242,16 @@ class SyncHandler(object): @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state): + if since_token is None: + sync_type = "initial_sync" + elif full_state: + sync_type = "full_state_sync" + else: + sync_type = "incremental_sync" + context = LoggingContext.current_context() if context: - if since_token is None: - context.tag = "initial_sync" - elif full_state: - context.tag = "full_state_sync" - else: - context.tag = "incremental_sync" + context.tag = sync_type if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling @@ -242,7 +259,6 @@ class SyncHandler(object): result = yield self.current_sync_for_user( sync_config, since_token, full_state=full_state, ) - defer.returnValue(result) else: def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) @@ -251,7 +267,15 @@ class SyncHandler(object): sync_config.user.to_string(), timeout, current_sync_callback, from_token=since_token, ) - defer.returnValue(result) + + if result: + if sync_config.filter_collection.lazy_load_members(): + lazy_loaded = "true" + else: + lazy_loaded = "false" + non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() + + defer.returnValue(result) def current_sync_for_user(self, sync_config, since_token=None, full_state=False): @@ -567,13 +591,13 @@ class SyncHandler(object): # be a valid name or canonical_alias - i.e. we're checking that they # haven't been "deleted" by blatting {} over the top. if name_id: - name = yield self.store.get_event(name_id, allow_none=False) + name = yield self.store.get_event(name_id, allow_none=True) if name and name.content: defer.returnValue(summary) if canonical_alias_id: canonical_alias = yield self.store.get_event( - canonical_alias_id, allow_none=False, + canonical_alias_id, allow_none=True, ) if canonical_alias and canonical_alias.content: defer.returnValue(summary) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 2d2d3d5a0d..c610933dd4 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -20,6 +20,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError from synapse.types import UserID, get_domain_from_id +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -68,6 +69,11 @@ class TypingHandler(object): # map room IDs to sets of users currently typing self._room_typing = {} + # caches which room_ids changed at which serials + self._typing_stream_change_cache = StreamChangeCache( + "TypingStreamChangeCache", self._latest_room_serial, + ) + self.clock.looping_call( self._handle_timeouts, 5000, @@ -218,6 +224,7 @@ class TypingHandler(object): for domain in set(get_domain_from_id(u) for u in users): if domain != self.server_name: + logger.debug("sending typing update to %s", domain) self.federation.send_edu( destination=domain, edu_type="m.typing", @@ -274,19 +281,29 @@ class TypingHandler(object): self._latest_room_serial += 1 self._room_serials[member.room_id] = self._latest_room_serial + self._typing_stream_change_cache.entity_has_changed( + member.room_id, self._latest_room_serial, + ) self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[member.room_id] ) def get_all_typing_updates(self, last_id, current_id): - # TODO: Work out a way to do this without scanning the entire state. if last_id == current_id: return [] + changed_rooms = self._typing_stream_change_cache.get_all_entities_changed( + last_id, + ) + + if changed_rooms is None: + changed_rooms = self._room_serials + rows = [] - for room_id, serial in self._room_serials.items(): - if last_id < serial and serial <= current_id: + for room_id in changed_rooms: + serial = self._room_serials[room_id] + if last_id < serial <= current_id: typing = self._room_typing[room_id] rows.append((serial, room_id, list(typing))) rows.sort() |