diff options
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r-- | synapse/handlers/message.py | 146 |
1 files changed, 145 insertions, 1 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index bb77f6210c..08c45db0ba 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -56,6 +56,7 @@ from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet +from synapse.replication.http.send_events import ReplicationSendEventsRestServlet from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter @@ -66,7 +67,7 @@ from synapse.types import ( StateMap, StreamToken, UserID, - create_requester, + create_requester, PersistedEventPosition, ) from synapse.util import json_decoder, json_encoder, log_failure, unwrapFirstError from synapse.util.async_helpers import Linearizer, gather_results @@ -493,6 +494,7 @@ class EventCreationHandler: self.membership_types_to_include_profile_data_in.add(Membership.INVITE) self.send_event = ReplicationSendEventRestServlet.make_client(hs) + self.send_events = ReplicationSendEventsRestServlet.make_client(hs) self.request_ratelimiter = hs.get_request_ratelimiter() @@ -1316,6 +1318,148 @@ class EventCreationHandler: 400, "Cannot start threads from an event with a relation" ) + + async def handle_create_room_events( + self, + requester: Requester, + events_and_ctx: List[Tuple[EventBase, EventContext]], + ratelimit: bool = True, + ) -> EventBase: + """ + Process a batch of room creation events. For each event in the list it checks + the authorization and that the event can be serialized. Returns the last event in the + list once it has been persisted. + Args: + requester: the room creator + events_and_ctx: a set of events and their associated contexts to persist + ratelimit: whether to ratelimit this request + """ + for event, context in events_and_ctx: + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context( + event, context + ) + except AuthError as err: + logger.warning("Denying new event %r because %s", event, err) + raise err + + # Ensure that we can round trip before trying to persist in db + try: + dump = json_encoder.encode(event.content) + json_decoder.decode(dump) + except Exception: + logger.exception("Failed to encode content: %r", event.content) + raise + + # We now persist the events + try: + result = await self._persist_events_batch( + requester, events_and_ctx, ratelimit + ) + except Exception as e: + logger.info(f"Encountered an error persisting events: {e}") + + return result + + async def _persist_events_batch( + self, + requester: Requester, + events_and_ctx: List[Tuple[EventBase, EventContext]], + ratelimit: bool = True, + ) -> EventBase: + """ + Processes the push actions and adds them to the push staging area before attempting to + persist the batch of events. + See handle_create_room_events for arguments + Returns the last event in the list if persisted successfully + """ + for event, context in events_and_ctx: + with opentracing.start_active_span("calculate_push_actions"): + await self._bulk_push_rule_evaluator.action_for_event_by_user( + event, context + ) + try: + # If we're a worker we need to hit out to the master. + writer_instance = self._events_shard_config.get_instance(event.room_id) + if writer_instance != self._instance_name: + try: + result = await self.send_events( + instance_name=writer_instance, + store=self.store, + requester=requester, + events_and_ctx=events_and_ctx, + ratelimit=ratelimit, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise + stream_id = result["stream_id"] + + # If we newly persisted the event then we need to update its + # stream_ordering entry manually (as it was persisted on + # another worker). + event.internal_metadata.stream_ordering = stream_id + return event + + last_event = await self.persist_and_notify_batched_events( + requester, events_and_ctx, ratelimit + ) + except Exception: + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + for event, _ in events_and_ctx: + await self.store.remove_push_actions_from_staging(event.event_id) + raise + + return last_event + + async def persist_and_notify_batched_events( + self, + requester: Requester, + events_and_ctx: List[Tuple[EventBase, EventContext]], + ratelimit: bool = True, + ) -> EventBase: + """ + Handles the actual persisting of a batch of events to the DB, and sends the appropriate + notifications when this is done. + Args: + requester: the room creator + events_and_ctx: list of events and their associated contexts to persist + ratelimit: whether to apply ratelimiting to this request + """ + if ratelimit: + await self.request_ratelimiter.ratelimit(requester) + + for event, context in events_and_ctx: + await self._actions_by_event_type(event, context) + + assert self._storage_controllers.persistence is not None + ( + persisted_events, + max_stream_token, + ) = await self._storage_controllers.persistence.persist_events(events_and_ctx) + + stream_ordering = persisted_events[-1].internal_metadata.stream_ordering + assert stream_ordering is not None + pos = PersistedEventPosition(self._instance_name, stream_ordering) + + async def _notify() -> None: + try: + await self.notifier.on_new_room_event( + persisted_events[-1], pos, max_stream_token + ) + except Exception: + logger.exception( + "Error notifying about new room event %s", + event.event_id, + ) + + run_in_background(_notify) + + return persisted_events[-1] + @measure_func("handle_new_client_event") async def handle_new_client_event( self, |