summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/6645.bugfix1
-rw-r--r--synapse/app/synchrotron.py18
-rw-r--r--synapse/storage/data_stores/main/events_worker.py44
3 files changed, 48 insertions, 15 deletions
diff --git a/changelog.d/6645.bugfix b/changelog.d/6645.bugfix
new file mode 100644
index 0000000000..f648df3fc0
--- /dev/null
+++ b/changelog.d/6645.bugfix
@@ -0,0 +1 @@
+Fix exceptions in the synchrotron worker log when events are rejected.
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index dd2132e608..03031ee34d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams.events import EventsStreamEventRow
+from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
 from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
 from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
     def get_currently_syncing_users(self):
         return self.presence_handler.get_currently_syncing_users()
 
-    @defer.inlineCallbacks
-    def process_and_notify(self, stream_name, token, rows):
+    async def process_and_notify(self, stream_name, token, rows):
         try:
             if stream_name == "events":
                 # We shouldn't get multiple rows per token for events stream, so
@@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
                 for row in rows:
                     if row.type != EventsStreamEventRow.TypeId:
                         continue
-                    event = yield self.store.get_event(row.data.event_id)
+                    assert isinstance(row, EventsStreamRow)
+
+                    event = await self.store.get_event(
+                        row.data.event_id, allow_rejected=True
+                    )
+                    if event.rejected_reason:
+                        continue
+
                     extra_users = ()
                     if event.type == EventTypes.Member:
                         extra_users = (event.state_key,)
@@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
             elif stream_name == "device_lists":
                 all_room_ids = set()
                 for row in rows:
-                    room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                    room_ids = await self.store.get_rooms_for_user(row.user_id)
                     all_room_ids.update(room_ids)
                 self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
             elif stream_name == "presence":
-                yield self.presence_handler.process_replication_rows(token, rows)
+                await self.presence_handler.process_replication_rows(token, rows)
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
                     "groups_key", token, users=[row.user_id for row in rows]
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 2c9142814c..0cce5232f5 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_event(
         self,
-        event_id: List[str],
+        event_id: str,
         redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT,
         get_prev_content: bool = False,
         allow_rejected: bool = False,
@@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore):
 
         Args:
             event_id: The event_id of the event to fetch
+
             redact_behaviour: Determine what to do with a redacted event. Possible values:
                 * AS_IS - Return the full event body with no redacted content
                 * REDACT - Return the event but with a redacted body
-                * DISALLOW - Do not return redacted events
+                * DISALLOW - Do not return redacted events (behave as per allow_none
+                    if the event is redacted)
+
             get_prev_content: If True and event is a state event,
                 include the previous states content in the unsigned field.
-            allow_rejected: If True return rejected events.
+
+            allow_rejected: If True, return rejected events. Otherwise,
+                behave as per allow_none.
+
             allow_none: If True, return None if no event found, if
                 False throw a NotFoundError
+
             check_room_id: if not None, check the room of the found event.
                 If there is a mismatch, behave as per allow_none.
 
@@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore):
 
         Args:
             event_ids: The event_ids of the events to fetch
+
             redact_behaviour: Determine what to do with a redacted event. Possible
                 values:
                 * AS_IS - Return the full event body with no redacted content
                 * REDACT - Return the event but with a redacted body
-                * DISALLOW - Do not return redacted events
+                * DISALLOW - Do not return redacted events (omit them from the response)
+
             get_prev_content: If True and event is a state event,
                 include the previous states content in the unsigned field.
-            allow_rejected: If True return rejected events.
+
+            allow_rejected: If True, return rejected events. Otherwise,
+                omits rejeted events from the response.
 
         Returns:
             Deferred : Dict from event_id to event.
@@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore):
         """Get events from the database and return in a list in the same order
         as given by `event_ids` arg.
 
+        Unknown events will be omitted from the response.
+
         Args:
             event_ids: The event_ids of the events to fetch
+
             redact_behaviour: Determine what to do with a redacted event. Possible values:
                 * AS_IS - Return the full event body with no redacted content
                 * REDACT - Return the event but with a redacted body
-                * DISALLOW - Do not return redacted events
+                * DISALLOW - Do not return redacted events (omit them from the response)
+
             get_prev_content: If True and event is a state event,
                 include the previous states content in the unsigned field.
-            allow_rejected: If True, return rejected events.
+
+            allow_rejected: If True, return rejected events. Otherwise,
+                omits rejected events from the response.
 
         Returns:
             Deferred[list[EventBase]]: List of events fetched from the database. The
@@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore):
 
         If events are pulled from the database, they will be cached for future lookups.
 
+        Unknown events are omitted from the response.
+
         Args:
+
             event_ids (Iterable[str]): The event_ids of the events to fetch
-            allow_rejected (bool): Whether to include rejected events
+
+            allow_rejected (bool): Whether to include rejected events. If False,
+                rejected events are omitted from the response.
 
         Returns:
             Deferred[Dict[str, _EventCacheEntry]]:
@@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore):
 
         Returned events will be added to the cache for future lookups.
 
+        Unknown events are omitted from the response.
+
         Args:
             event_ids (Iterable[str]): The event_ids of the events to fetch
-            allow_rejected (bool): Whether to include rejected events
+
+            allow_rejected (bool): Whether to include rejected events. If False,
+                rejected events are omitted from the response.
 
         Returns:
             Deferred[Dict[str, _EventCacheEntry]]: