From a6ea1a957e8e38ca3f98d4da32ee49a40fcb4807 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Oct 2020 12:11:45 +0000 Subject: Don't pull event from DB when handling replication traffic. (#8669) I was trying to make it so that we didn't have to start a background task when handling RDATA, but that is a bigger job (due to all the code in `generic_worker`). However I still think not pulling the event from the DB may help reduce some DB usage due to replication, even if most workers will simply go and pull that event from the DB later anyway. Co-authored-by: Patrick Cloke --- synapse/replication/tcp/client.py | 20 ++++++++++++-------- synapse/replication/tcp/streams/events.py | 21 +++++++++++++-------- 2 files changed, 25 insertions(+), 16 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e27ee216f0..2618eb1e53 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -141,21 +141,25 @@ class ReplicationDataHandler: if row.type != EventsStreamEventRow.TypeId: continue assert isinstance(row, EventsStreamRow) + assert isinstance(row.data, EventsStreamEventRow) - event = await self.store.get_event( - row.data.event_id, allow_rejected=True - ) - if event.rejected_reason: + if row.data.rejected: continue extra_users = () # type: Tuple[UserID, ...] - if event.type == EventTypes.Member: - extra_users = (UserID.from_string(event.state_key),) + if row.data.type == EventTypes.Member and row.data.state_key: + extra_users = (UserID.from_string(row.data.state_key),) max_token = self.store.get_room_max_token() event_pos = PersistedEventPosition(instance_name, token) - self.notifier.on_new_room_event( - event, event_pos, max_token, extra_users + self.notifier.on_new_room_event_args( + event_pos=event_pos, + max_room_stream_token=max_token, + extra_users=extra_users, + room_id=row.data.room_id, + event_type=row.data.type, + state_key=row.data.state_key, + membership=row.data.membership, ) # Notify any waiting deferreds. The list is ordered by position so we diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 82e9e0d64e..86a62b71eb 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -15,12 +15,15 @@ # limitations under the License. import heapq from collections.abc import Iterable -from typing import List, Tuple, Type +from typing import TYPE_CHECKING, List, Optional, Tuple, Type import attr from ._base import Stream, StreamUpdateResult, Token +if TYPE_CHECKING: + from synapse.server import HomeServer + """Handling of the 'events' replication stream This stream contains rows of various types. Each row therefore contains a 'type' @@ -81,12 +84,14 @@ class BaseEventsStreamRow: class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" - event_id = attr.ib() # str - room_id = attr.ib() # str - type = attr.ib() # str - state_key = attr.ib() # str, optional - redacts = attr.ib() # str, optional - relates_to = attr.ib() # str, optional + event_id = attr.ib(type=str) + room_id = attr.ib(type=str) + type = attr.ib(type=str) + state_key = attr.ib(type=Optional[str]) + redacts = attr.ib(type=Optional[str]) + relates_to = attr.ib(type=Optional[str]) + membership = attr.ib(type=Optional[str]) + rejected = attr.ib(type=bool) @attr.s(slots=True, frozen=True) @@ -113,7 +118,7 @@ class EventsStream(Stream): NAME = "events" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self._store = hs.get_datastore() super().__init__( hs.get_instance_name(), -- cgit 1.4.1