summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-04-12 13:35:56 +0100
committerErik Johnston <erik@matrix.org>2018-04-12 13:35:56 +0100
commitdd7f1b10cece222db5b57d92727afaf7780f8b79 (patch)
treee9499a2b7cd19f5efab7d7268461e89a5ad2d19a /synapse/handlers
parentMerge branch 'develop' of https://github.com/matrix-org/synapse into matrix-o... (diff)
parentMerge pull request #3092 from matrix-org/rav/response_cache_metrics (diff)
downloadsynapse-dd7f1b10cece222db5b57d92727afaf7780f8b79.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py37
-rw-r--r--synapse/handlers/message.py104
-rw-r--r--synapse/handlers/room_list.py5
-rw-r--r--synapse/handlers/sync.py3
4 files changed, 94 insertions, 55 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py

index 3dd3fa2a27..0245197c02 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import ( + make_deferred_yieldable, preserve_fn, run_in_background, +) import logging @@ -84,11 +86,16 @@ class ApplicationServicesHandler(object): if not events: break + events_by_room = {} for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + @defer.inlineCallbacks + def handle_event(event): # Gather interested services services = yield self._get_services_for_event(event) if len(services) == 0: - continue # no services need notifying + return # no services need notifying # Do we know this user exists? If not, poke the user # query API for all services which match that user regex. @@ -108,9 +115,33 @@ class ApplicationServicesHandler(object): service, event ) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + yield make_deferred_yieldable(defer.gatherResults([ + run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2134741719..726a3932da 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -454,40 +454,39 @@ class EventCreationHandler(object): """ builder = self.event_builder_factory.new(event_dict) - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if token_id is not None: - builder.internal_metadata.token_id = token_id + if token_id is not None: + builder.internal_metadata.token_id = token_id - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id - event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) + event, context = yield self.create_new_client_event( + builder=builder, + requester=requester, + prev_event_ids=prev_event_ids, + ) defer.returnValue((event, context)) @@ -557,27 +556,34 @@ class EventCreationHandler(object): See self.create_event and self.send_nonmember_event. """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN + # We limit the number of concurrent event sends in a room so that we + # don't fork the DAG too much. If we don't limit then we can end up in + # a situation where event persistence can't keep up, causing + # extremities to pile up, which in turn leads to state resolution + # taking longer. + with (yield self.limiter.queue(event_dict["room_id"])): + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id ) - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, basestring): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) defer.returnValue(event) @measure_func("create_new_client_event") diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index f5657a3521..ef3a3b352a 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -44,8 +44,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs, timeout_ms=10 * 60 * 1000) - self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) + self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000) + self.remote_response_cache = ResponseCache(hs, "remote_room_list", + timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 37fae43f71..da234c741f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -171,7 +171,8 @@ class SyncHandler(object): self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() self.response_cache = ResponseCache( - hs, timeout_ms=SYNC_RESPONSE_CACHE_MS, + hs, "sync", + timeout_ms=SYNC_RESPONSE_CACHE_MS, ) self.state = hs.get_state_handler()