From 28e973ac119e0b4ec5b9e45772a572a94d0e6643 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Feb 2018 16:30:10 +0000 Subject: Calculate push actions on worker --- synapse/app/event_creator.py | 8 ++++ synapse/handlers/message.py | 86 ++++++++++++++++++++++++---------- synapse/replication/http/send_event.py | 2 +- 3 files changed, 70 insertions(+), 26 deletions(-) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index b2ce399258..fc0b9e8c04 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -27,10 +27,14 @@ from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler @@ -48,6 +52,10 @@ logger = logging.getLogger("synapse.app.event_creator") class EventCreatorSlavedStore( + SlavedAccountDataStore, + SlavedPusherStore, + SlavedReceiptsStore, + SlavedPushRuleStore, SlavedDeviceStore, SlavedClientIpStore, SlavedApplicationServiceStore, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d99d8049b3..4c186965a7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -553,21 +553,67 @@ class EventCreationHandler(object): event, context, ratelimit=True, - extra_users=[] + extra_users=[], ): - # We now need to go and hit out to wherever we need to hit out to. - - # If we're a worker we need to hit out to the master. - if self.config.worker_app: - yield send_event_to_master( - self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - requester=requester, - event=event, - context=context, + """Processes a new event. This includes checking auth, persisting it, + notifying users, sending to remote servers, etc. + + If called from a worker will hit out to the master process for final + processing. + + Args: + requester (Requester) + event (FrozenEvent) + context (EventContext) + ratelimit (bool) + extra_users (list(str)): Any extra users to notify about event + """ + + yield self.action_generator.handle_push_actions_for_event( + event, context + ) + + try: + # We now need to go and hit out to wherever we need to hit out to. + + # If we're a worker we need to hit out to the master. + if self.config.worker_app: + yield send_event_to_master( + self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + event=event, + context=context, + ) + return + + yield self.persist_and_notify_client_event( + requester, + event, + context, + ratelimit=ratelimit, + extra_users=extra_users, ) - return + except: # noqa: E722, as we reraise the exception this is fine. + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) + raise + + @defer.inlineCallbacks + def persist_and_notify_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[], + ): + """Called when we have fully built and authed the event. This should + only be run on master. + """ + assert not self.config.worker_app if ratelimit: yield self.base_handler.ratelimit(requester) @@ -679,20 +725,10 @@ class EventCreationHandler(object): "Changing the room create event is forbidden", ) - yield self.action_generator.handle_push_actions_for_event( - event, context + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context ) - try: - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area - preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) - raise - # this intentionally does not yield: we don't care about the result # and don't need to wait for it. preserve_fn(self.pusher_pool.on_new_notifications)( diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 468f4b68f4..3a99a88bc5 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -106,7 +106,7 @@ class ReplicationSendEventRestServlet(RestServlet): event.event_id, event.room_id, ) - yield self.event_creation_handler.handle_new_client_event( + yield self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ) -- cgit 1.4.1