From 28e973ac119e0b4ec5b9e45772a572a94d0e6643 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Feb 2018 16:30:10 +0000 Subject: Calculate push actions on worker --- synapse/app/event_creator.py | 8 ++++ synapse/handlers/message.py | 86 ++++++++++++++++++++++++---------- synapse/replication/http/send_event.py | 2 +- 3 files changed, 70 insertions(+), 26 deletions(-) (limited to 'synapse') diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index b2ce399258..fc0b9e8c04 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -27,10 +27,14 @@ from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler @@ -48,6 +52,10 @@ logger = logging.getLogger("synapse.app.event_creator") class EventCreatorSlavedStore( + SlavedAccountDataStore, + SlavedPusherStore, + SlavedReceiptsStore, + SlavedPushRuleStore, SlavedDeviceStore, SlavedClientIpStore, SlavedApplicationServiceStore, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d99d8049b3..4c186965a7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -553,21 +553,67 @@ class EventCreationHandler(object): event, context, ratelimit=True, - extra_users=[] + extra_users=[], ): - # We now need to go and hit out to wherever we need to hit out to. - - # If we're a worker we need to hit out to the master. - if self.config.worker_app: - yield send_event_to_master( - self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, - requester=requester, - event=event, - context=context, + """Processes a new event. This includes checking auth, persisting it, + notifying users, sending to remote servers, etc. + + If called from a worker will hit out to the master process for final + processing. + + Args: + requester (Requester) + event (FrozenEvent) + context (EventContext) + ratelimit (bool) + extra_users (list(str)): Any extra users to notify about event + """ + + yield self.action_generator.handle_push_actions_for_event( + event, context + ) + + try: + # We now need to go and hit out to wherever we need to hit out to. + + # If we're a worker we need to hit out to the master. + if self.config.worker_app: + yield send_event_to_master( + self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + event=event, + context=context, + ) + return + + yield self.persist_and_notify_client_event( + requester, + event, + context, + ratelimit=ratelimit, + extra_users=extra_users, ) - return + except: # noqa: E722, as we reraise the exception this is fine. + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) + raise + + @defer.inlineCallbacks + def persist_and_notify_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[], + ): + """Called when we have fully built and authed the event. This should + only be run on master. + """ + assert not self.config.worker_app if ratelimit: yield self.base_handler.ratelimit(requester) @@ -679,20 +725,10 @@ class EventCreationHandler(object): "Changing the room create event is forbidden", ) - yield self.action_generator.handle_push_actions_for_event( - event, context + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context ) - try: - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area - preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) - raise - # this intentionally does not yield: we don't care about the result # and don't need to wait for it. preserve_fn(self.pusher_pool.on_new_notifications)( diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 468f4b68f4..3a99a88bc5 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -106,7 +106,7 @@ class ReplicationSendEventRestServlet(RestServlet): event.event_id, event.room_id, ) - yield self.event_creation_handler.handle_new_client_event( + yield self.event_creation_handler.persist_and_notify_client_event( requester, event, context, ) -- cgit 1.4.1 From f756f961eab7ae6e53052ee419413c74d171d144 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 10:05:27 +0000 Subject: Fixup comments --- synapse/handlers/message.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4c186965a7..c4151d73eb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -574,8 +574,6 @@ class EventCreationHandler(object): ) try: - # We now need to go and hit out to wherever we need to hit out to. - # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( @@ -610,8 +608,10 @@ class EventCreationHandler(object): ratelimit=True, extra_users=[], ): - """Called when we have fully built and authed the event. This should - only be run on master. + """Called when we have fully built the event, and have already + calculated the push actions for the event. + + This should only be run on master. """ assert not self.config.worker_app -- cgit 1.4.1 From 6b8604239f4c9463023e59664e7810ba58b8f428 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 10:08:28 +0000 Subject: Correctly send ratelimit and extra_users params --- synapse/handlers/message.py | 2 ++ synapse/replication/http/send_event.py | 14 +++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c4151d73eb..5f88f84d38 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -583,6 +583,8 @@ class EventCreationHandler(object): requester=requester, event=event, context=context, + ratelimit=ratelimit, + extra_users=extra_users, ) return diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 3a99a88bc5..439bfbb4f6 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -29,7 +29,8 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def send_event_to_master(client, host, port, requester, event, context): +def send_event_to_master(client, host, port, requester, event, context, + ratelimit, extra_users): """Send event to be handled on the master Args: @@ -39,6 +40,8 @@ def send_event_to_master(client, host, port, requester, event, context): requester (Requester) event (FrozenEvent) context (EventContext) + ratelimit (bool) + extra_users (list(str)): Any extra users to notify about event """ uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,) @@ -48,6 +51,8 @@ def send_event_to_master(client, host, port, requester, event, context): "rejected_reason": event.rejected_reason, "context": context.serialize(event), "requester": requester.serialize(), + "ratelimit": ratelimit, + "extra_users": extra_users, } try: @@ -74,6 +79,8 @@ class ReplicationSendEventRestServlet(RestServlet): "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, "requester": { .. serialized requester .. }, + "ratelimit": true, + "extra_users": [], } """ PATTERNS = [re.compile("^/_synapse/replication/send_event$")] @@ -98,6 +105,9 @@ class ReplicationSendEventRestServlet(RestServlet): requester = Requester.deserialize(self.store, content["requester"]) context = yield EventContext.deserialize(self.store, content["context"]) + ratelimit = content["ratelimit"] + extra_users = content["extra_users"] + if requester.user: request.authenticated_entity = requester.user.to_string() @@ -108,6 +118,8 @@ class ReplicationSendEventRestServlet(RestServlet): yield self.event_creation_handler.persist_and_notify_client_event( requester, event, context, + ratelimit=ratelimit, + extra_users=extra_users, ) defer.returnValue((200, {})) -- cgit 1.4.1 From f381d6381344eb442f46ae27f29e039175721ff5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Mar 2018 10:18:33 +0000 Subject: Check event auth on the worker --- synapse/handlers/message.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5f88f84d38..7d28c2745c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -569,6 +569,20 @@ class EventCreationHandler(object): extra_users (list(str)): Any extra users to notify about event """ + try: + yield self.auth.check_from_context(event, context) + except AuthError as err: + logger.warn("Denying new event %r because %s", event, err) + raise err + + # Ensure that we can round trip before trying to persist in db + try: + dump = ujson.dumps(unfreeze(event.content)) + ujson.loads(dump) + except Exception: + logger.exception("Failed to encode content: %r", event.content) + raise + yield self.action_generator.handle_push_actions_for_event( event, context ) @@ -610,8 +624,8 @@ class EventCreationHandler(object): ratelimit=True, extra_users=[], ): - """Called when we have fully built the event, and have already - calculated the push actions for the event. + """Called when we have fully built the event, have already + calculated the push actions for the event, and checked auth. This should only be run on master. """ @@ -620,20 +634,6 @@ class EventCreationHandler(object): if ratelimit: yield self.base_handler.ratelimit(requester) - try: - yield self.auth.check_from_context(event, context) - except AuthError as err: - logger.warn("Denying new event %r because %s", event, err) - raise err - - # Ensure that we can round trip before trying to persist in db - try: - dump = ujson.dumps(unfreeze(event.content)) - ujson.loads(dump) - except Exception: - logger.exception("Failed to encode content: %r", event.content) - raise - yield self.base_handler.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: -- cgit 1.4.1