From 25e55d25980ae048bd33f8c695a22a3ddaa971fe Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 18 Aug 2020 07:53:23 -0400 Subject: Return the previous stream token if a non-member event is a duplicate. (#8093) --- synapse/handlers/message.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) (limited to 'synapse/handlers/message.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 48b0fc7279..f242d3c6ac 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -667,14 +667,14 @@ class EventCreationHandler(object): assert self.hs.is_mine(user), "User must be our own: %s" % (user,) if event.is_state(): - prev_state = await self.deduplicate_state_event(event, context) - if prev_state is not None: + prev_event = await self.deduplicate_state_event(event, context) + if prev_event is not None: logger.info( "Not bothering to persist state event %s duplicated by %s", event.event_id, - prev_state.event_id, + prev_event.event_id, ) - return prev_state + return await self.store.get_stream_token_for_event(prev_event.event_id) return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit @@ -682,27 +682,32 @@ class EventCreationHandler(object): async def deduplicate_state_event( self, event: EventBase, context: EventContext - ) -> None: + ) -> Optional[EventBase]: """ Checks whether event is in the latest resolved state in context. - If so, returns the version of the event in context. - Otherwise, returns None. + Args: + event: The event to check for duplication. + context: The event context. + + Returns: + The previous verion of the event is returned, if it is found in the + event context. Otherwise, None is returned. """ prev_state_ids = await context.get_prev_state_ids() prev_event_id = prev_state_ids.get((event.type, event.state_key)) if not prev_event_id: - return + return None prev_event = await self.store.get_event(prev_event_id, allow_none=True) if not prev_event: - return + return None if prev_event and event.user_id == prev_event.user_id: prev_content = encode_canonical_json(prev_event.content) next_content = encode_canonical_json(event.content) if prev_content == next_content: return prev_event - return + return None async def create_and_send_nonmember_event( self, -- cgit 1.5.1 From 3c01724b330ac99d6defb12634ea9046ae52fe63 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 18 Aug 2020 09:53:13 -0400 Subject: Fix the return type of send_nonmember_events. (#8112) --- changelog.d/8112.misc | 1 + synapse/handlers/message.py | 2 +- synapse/storage/databases/main/stream.py | 19 +++++++++++++++---- 3 files changed, 17 insertions(+), 5 deletions(-) create mode 100644 changelog.d/8112.misc (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/8112.misc b/changelog.d/8112.misc new file mode 100644 index 0000000000..80045dde1a --- /dev/null +++ b/changelog.d/8112.misc @@ -0,0 +1 @@ +Return the previous stream token if a non-member event is a duplicate. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f242d3c6ac..532fc30681 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -674,7 +674,7 @@ class EventCreationHandler(object): event.event_id, prev_event.event_id, ) - return await self.store.get_stream_token_for_event(prev_event.event_id) + return await self.store.get_stream_id_for_event(prev_event.event_id) return await self.handle_new_client_event( requester=requester, event=event, context=context, ratelimit=ratelimit diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8ccfb8fc46..4377bddb8c 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -582,6 +582,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) return "t%d-%d" % (topo, token) + async def get_stream_id_for_event(self, event_id: str) -> int: + """The stream ID for an event + Args: + event_id: The id of the event to look up a stream token for. + Raises: + StoreError if the event wasn't in the database. + Returns: + A stream ID. + """ + return await self.db_pool.simple_select_one_onecol( + table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering" + ) + async def get_stream_token_for_event(self, event_id: str) -> str: """The stream token for an event Args: @@ -591,10 +604,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: A "s%d" stream token. """ - row = await self.db_pool.simple_select_one_onecol( - table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering" - ) - return "s%d" % (row,) + stream_id = await self.get_stream_id_for_event(event_id) + return "s%d" % (stream_id,) async def get_topological_token_for_event(self, event_id: str) -> str: """The stream token for an event -- cgit 1.5.1