diff --git a/synapse/notifier.py b/synapse/notifier.py
index eb56b26f21..a17352ef46 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,6 +28,7 @@ from typing import (
Union,
)
+import attr
from prometheus_client import Counter
from twisted.internet import defer
@@ -173,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
return bool(self.events)
+@attr.s(slots=True, frozen=True)
+class _PendingRoomEventEntry:
+ event_pos = attr.ib(type=PersistedEventPosition)
+ extra_users = attr.ib(type=Collection[UserID])
+
+ room_id = attr.ib(type=str)
+ type = attr.ib(type=str)
+ state_key = attr.ib(type=Optional[str])
+ membership = attr.ib(type=Optional[str])
+
+
class Notifier:
""" This class is responsible for notifying any listeners when there are
new events available for it.
@@ -190,9 +202,7 @@ class Notifier:
self.storage = hs.get_storage()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
- self.pending_new_room_events = (
- []
- ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
+ self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry]
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -255,7 +265,29 @@ class Notifier:
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
- """ Used by handlers to inform the notifier something has happened
+ """Unwraps event and calls `on_new_room_event_args`.
+ """
+ self.on_new_room_event_args(
+ event_pos=event_pos,
+ room_id=event.room_id,
+ event_type=event.type,
+ state_key=event.get("state_key"),
+ membership=event.content.get("membership"),
+ max_room_stream_token=max_room_stream_token,
+ extra_users=extra_users,
+ )
+
+ def on_new_room_event_args(
+ self,
+ room_id: str,
+ event_type: str,
+ state_key: Optional[str],
+ membership: Optional[str],
+ event_pos: PersistedEventPosition,
+ max_room_stream_token: RoomStreamToken,
+ extra_users: Collection[UserID] = [],
+ ):
+ """Used by handlers to inform the notifier something has happened
in the room, room event wise.
This triggers the notifier to wake up any listeners that are
@@ -266,7 +298,16 @@ class Notifier:
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ self.pending_new_room_events.append(
+ _PendingRoomEventEntry(
+ event_pos=event_pos,
+ extra_users=extra_users,
+ room_id=room_id,
+ type=event_type,
+ state_key=state_key,
+ membership=membership,
+ )
+ )
self._notify_pending_new_room_events(max_room_stream_token)
self.notify_replication()
@@ -284,18 +325,19 @@ class Notifier:
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]
- for event_pos, event, extra_users in pending:
- if event_pos.persisted_after(max_room_stream_token):
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ for entry in pending:
+ if entry.event_pos.persisted_after(max_room_stream_token):
+ self.pending_new_room_events.append(entry)
else:
if (
- event.type == EventTypes.Member
- and event.membership == Membership.JOIN
+ entry.type == EventTypes.Member
+ and entry.membership == Membership.JOIN
+ and entry.state_key
):
- self._user_joined_room(event.state_key, event.room_id)
+ self._user_joined_room(entry.state_key, entry.room_id)
- users.update(extra_users)
- rooms.add(event.room_id)
+ users.update(entry.extra_users)
+ rooms.add(entry.room_id)
if users or rooms:
self.on_new_event(
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e27ee216f0..2618eb1e53 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -141,21 +141,25 @@ class ReplicationDataHandler:
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)
+ assert isinstance(row.data, EventsStreamEventRow)
- event = await self.store.get_event(
- row.data.event_id, allow_rejected=True
- )
- if event.rejected_reason:
+ if row.data.rejected:
continue
extra_users = () # type: Tuple[UserID, ...]
- if event.type == EventTypes.Member:
- extra_users = (UserID.from_string(event.state_key),)
+ if row.data.type == EventTypes.Member and row.data.state_key:
+ extra_users = (UserID.from_string(row.data.state_key),)
max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
- self.notifier.on_new_room_event(
- event, event_pos, max_token, extra_users
+ self.notifier.on_new_room_event_args(
+ event_pos=event_pos,
+ max_room_stream_token=max_token,
+ extra_users=extra_users,
+ room_id=row.data.room_id,
+ event_type=row.data.type,
+ state_key=row.data.state_key,
+ membership=row.data.membership,
)
# Notify any waiting deferreds. The list is ordered by position so we
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 82e9e0d64e..86a62b71eb 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,12 +15,15 @@
# limitations under the License.
import heapq
from collections.abc import Iterable
-from typing import List, Tuple, Type
+from typing import TYPE_CHECKING, List, Optional, Tuple, Type
import attr
from ._base import Stream, StreamUpdateResult, Token
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
"""Handling of the 'events' replication stream
This stream contains rows of various types. Each row therefore contains a 'type'
@@ -81,12 +84,14 @@ class BaseEventsStreamRow:
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"
- event_id = attr.ib() # str
- room_id = attr.ib() # str
- type = attr.ib() # str
- state_key = attr.ib() # str, optional
- redacts = attr.ib() # str, optional
- relates_to = attr.ib() # str, optional
+ event_id = attr.ib(type=str)
+ room_id = attr.ib(type=str)
+ type = attr.ib(type=str)
+ state_key = attr.ib(type=Optional[str])
+ redacts = attr.ib(type=Optional[str])
+ relates_to = attr.ib(type=Optional[str])
+ membership = attr.ib(type=Optional[str])
+ rejected = attr.ib(type=bool)
@attr.s(slots=True, frozen=True)
@@ -113,7 +118,7 @@ class EventsStream(Stream):
NAME = "events"
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index cd1f31aa62..5ae263827d 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1117,11 +1117,13 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_forward_event_rows(txn):
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
+ " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
@@ -1152,12 +1154,14 @@ class EventsWorkerStore(SQLBaseStore):
def get_ex_outlier_stream_rows_txn(txn):
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
+ " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
|