summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-28 12:11:45 +0000
committerGitHub <noreply@github.com>2020-10-28 12:11:45 +0000
commita6ea1a957e8e38ca3f98d4da32ee49a40fcb4807 (patch)
tree43068698506ed93e735b48b785e58fc1d1b464b9 /synapse/notifier.py
parentTell Black to format code for Python 3.5 (#8664) (diff)
downloadsynapse-a6ea1a957e8e38ca3f98d4da32ee49a40fcb4807.tar.xz
Don't pull event from DB when handling replication traffic. (#8669)
I was trying to make it so that we didn't have to start a background task when handling RDATA, but that is a bigger job (due to all the code in `generic_worker`). However I still think not pulling the event from the DB may help reduce some DB usage due to replication, even if most workers will simply go and pull that event from the DB later anyway.

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py68
1 files changed, 55 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index eb56b26f21..a17352ef46 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,6 +28,7 @@ from typing import (
     Union,
 )
 
+import attr
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -173,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
         return bool(self.events)
 
 
+@attr.s(slots=True, frozen=True)
+class _PendingRoomEventEntry:
+    event_pos = attr.ib(type=PersistedEventPosition)
+    extra_users = attr.ib(type=Collection[UserID])
+
+    room_id = attr.ib(type=str)
+    type = attr.ib(type=str)
+    state_key = attr.ib(type=Optional[str])
+    membership = attr.ib(type=Optional[str])
+
+
 class Notifier:
     """ This class is responsible for notifying any listeners when there are
     new events available for it.
@@ -190,9 +202,7 @@ class Notifier:
         self.storage = hs.get_storage()
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
-        self.pending_new_room_events = (
-            []
-        )  # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
+        self.pending_new_room_events = []  # type: List[_PendingRoomEventEntry]
 
         # Called when there are new things to stream over replication
         self.replication_callbacks = []  # type: List[Callable[[], None]]
@@ -255,7 +265,29 @@ class Notifier:
         max_room_stream_token: RoomStreamToken,
         extra_users: Collection[UserID] = [],
     ):
-        """ Used by handlers to inform the notifier something has happened
+        """Unwraps event and calls `on_new_room_event_args`.
+        """
+        self.on_new_room_event_args(
+            event_pos=event_pos,
+            room_id=event.room_id,
+            event_type=event.type,
+            state_key=event.get("state_key"),
+            membership=event.content.get("membership"),
+            max_room_stream_token=max_room_stream_token,
+            extra_users=extra_users,
+        )
+
+    def on_new_room_event_args(
+        self,
+        room_id: str,
+        event_type: str,
+        state_key: Optional[str],
+        membership: Optional[str],
+        event_pos: PersistedEventPosition,
+        max_room_stream_token: RoomStreamToken,
+        extra_users: Collection[UserID] = [],
+    ):
+        """Used by handlers to inform the notifier something has happened
         in the room, room event wise.
 
         This triggers the notifier to wake up any listeners that are
@@ -266,7 +298,16 @@ class Notifier:
         until all previous events have been persisted before notifying
         the client streams.
         """
-        self.pending_new_room_events.append((event_pos, event, extra_users))
+        self.pending_new_room_events.append(
+            _PendingRoomEventEntry(
+                event_pos=event_pos,
+                extra_users=extra_users,
+                room_id=room_id,
+                type=event_type,
+                state_key=state_key,
+                membership=membership,
+            )
+        )
         self._notify_pending_new_room_events(max_room_stream_token)
 
         self.notify_replication()
@@ -284,18 +325,19 @@ class Notifier:
         users = set()  # type: Set[UserID]
         rooms = set()  # type: Set[str]
 
-        for event_pos, event, extra_users in pending:
-            if event_pos.persisted_after(max_room_stream_token):
-                self.pending_new_room_events.append((event_pos, event, extra_users))
+        for entry in pending:
+            if entry.event_pos.persisted_after(max_room_stream_token):
+                self.pending_new_room_events.append(entry)
             else:
                 if (
-                    event.type == EventTypes.Member
-                    and event.membership == Membership.JOIN
+                    entry.type == EventTypes.Member
+                    and entry.membership == Membership.JOIN
+                    and entry.state_key
                 ):
-                    self._user_joined_room(event.state_key, event.room_id)
+                    self._user_joined_room(entry.state_key, entry.room_id)
 
-                users.update(extra_users)
-                rooms.add(event.room_id)
+                users.update(entry.extra_users)
+                rooms.add(entry.room_id)
 
         if users or rooms:
             self.on_new_event(