diff options
author | Erik Johnston <erikj@jki.re> | 2018-03-01 11:20:07 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-01 11:20:07 +0000 |
commit | 182ff17c8322c770eb001b3a0a3194a89df464e1 (patch) | |
tree | 1cccf81ea8935305ef6cac5da0e9677ebb832f42 /synapse/handlers | |
parent | Merge pull request #2913 from matrix-org/erikj/store_push (diff) | |
parent | Check event auth on the worker (diff) | |
download | synapse-182ff17c8322c770eb001b3a0a3194a89df464e1.tar.xz |
Merge pull request #2875 from matrix-org/erikj/push_actions_worker
Calculate push actions on worker
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/message.py | 94 |
1 files changed, 66 insertions, 28 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d99d8049b3..7d28c2745c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -553,24 +553,21 @@ class EventCreationHandler(object): event, context, ratelimit=True, - extra_users=[] + extra_users=[], ): - # We now need to go and hit out to wherever we need to hit out to. - - # If we're a worker we need to hit out to the master. - if self.config.worker_app: - yield send_event_to_master( - self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - requester=requester, - event=event, - context=context, - ) - return + """Processes a new event. This includes checking auth, persisting it, + notifying users, sending to remote servers, etc. - if ratelimit: - yield self.base_handler.ratelimit(requester) + If called from a worker will hit out to the master process for final + processing. + + Args: + requester (Requester) + event (FrozenEvent) + context (EventContext) + ratelimit (bool) + extra_users (list(str)): Any extra users to notify about event + """ try: yield self.auth.check_from_context(event, context) @@ -586,6 +583,57 @@ class EventCreationHandler(object): logger.exception("Failed to encode content: %r", event.content) raise + yield self.action_generator.handle_push_actions_for_event( + event, context + ) + + try: + # If we're a worker we need to hit out to the master. + if self.config.worker_app: + yield send_event_to_master( + self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ) + return + + yield self.persist_and_notify_client_event( + requester, + event, + context, + ratelimit=ratelimit, + extra_users=extra_users, + ) + except: # noqa: E722, as we reraise the exception this is fine. + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) + raise + + @defer.inlineCallbacks + def persist_and_notify_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[], + ): + """Called when we have fully built the event, have already + calculated the push actions for the event, and checked auth. + + This should only be run on master. + """ + assert not self.config.worker_app + + if ratelimit: + yield self.base_handler.ratelimit(requester) + yield self.base_handler.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: @@ -679,20 +727,10 @@ class EventCreationHandler(object): "Changing the room create event is forbidden", ) - yield self.action_generator.handle_push_actions_for_event( - event, context + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context ) - try: - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area - preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) - raise - # this intentionally does not yield: we don't care about the result # and don't need to wait for it. preserve_fn(self.pusher_pool.on_new_notifications)( |