From b520a1bf5a272b04473f485def18a9e6f6e4c3b9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 2 Oct 2020 16:45:41 +0100 Subject: De-duplicate duplicate handling move the "duplicate state event" handling down into `handle_new_client_event` where it can be shared between multiple call paths. --- synapse/handlers/message.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) (limited to 'synapse/handlers/message.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 00513fbf37..ea8e3517d7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -674,22 +674,14 @@ class EventCreationHandler: assert self.hs.is_mine(user), "User must be our own: %s" % (user,) - if event.is_state(): - prev_event = await self.deduplicate_state_event(event, context) - if prev_event is not None: - logger.info( - "Not bothering to persist state event %s duplicated by %s", - event.event_id, - prev_event.event_id, - ) - # we know it was persisted, so must have a stream ordering - assert prev_event.internal_metadata.stream_ordering - return prev_event.internal_metadata.stream_ordering - - return await self.handle_new_client_event( + ev = await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit ) + # we know it was persisted, so must have a stream ordering + assert ev.internal_metadata.stream_ordering + return ev.internal_metadata.stream_ordering + async def deduplicate_state_event( self, event: EventBase, context: EventContext ) -> Optional[EventBase]: @@ -845,8 +837,10 @@ class EventCreationHandler: context: EventContext, ratelimit: bool = True, extra_users: List[UserID] = [], - ) -> int: - """Processes a new event. This includes checking auth, persisting it, + ) -> EventBase: + """Processes a new event. + + This includes deduplicating, checking auth, persisting, notifying users, sending to remote servers, etc. If called from a worker will hit out to the master process for final @@ -860,9 +854,20 @@ class EventCreationHandler: extra_users: Any extra users to notify about event Return: - The stream_id of the persisted event. + If the event was deduplicated, the previous, duplicate, event. Otherwise, + `event`. """ + if event.is_state(): + prev_event = await self.deduplicate_state_event(event, context) + if prev_event is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, + prev_event.event_id, + ) + return prev_event + if event.is_state() and (event.type, event.state_key) == ( EventTypes.Create, "", @@ -917,13 +922,13 @@ class EventCreationHandler: ) stream_id = result["stream_id"] event.internal_metadata.stream_ordering = stream_id - return stream_id + return event stream_id = await self.persist_and_notify_client_event( requester, event, context, ratelimit=ratelimit, extra_users=extra_users ) - return stream_id + return event except Exception: # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. -- cgit 1.4.1