summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation_event.py2
-rw-r--r--synapse/replication/tcp/streams/__init__.py7
-rw-r--r--synapse/replication/tcp/streams/partial_state.py28
-rw-r--r--synapse/storage/databases/main/events_worker.py88
-rw-r--r--synapse/storage/databases/main/state.py34
-rw-r--r--synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql34
-rw-r--r--synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres20
7 files changed, 203 insertions, 10 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 66aca2f864..31df7f55cc 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -610,6 +610,8 @@ class FederationEventHandler:
             self._state_storage_controller.notify_event_un_partial_stated(
                 event.event_id
             )
+            # Notify that there's a new row in the un_partial_stated_events stream.
+            self._notifier.notify_replication()
 
     @trace
     async def backfill(
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 8575666d9c..110f10aab9 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,7 +42,10 @@ from synapse.replication.tcp.streams._base import (
 )
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
+from synapse.replication.tcp.streams.partial_state import (
+    UnPartialStatedEventStream,
+    UnPartialStatedRoomStream,
+)
 
 STREAMS_MAP = {
     stream.NAME: stream
@@ -63,6 +66,7 @@ STREAMS_MAP = {
         AccountDataStream,
         UserSignatureStream,
         UnPartialStatedRoomStream,
+        UnPartialStatedEventStream,
     )
 }
 
@@ -83,4 +87,5 @@ __all__ = [
     "AccountDataStream",
     "UserSignatureStream",
     "UnPartialStatedRoomStream",
+    "UnPartialStatedEventStream",
 ]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
index 18f087ffa2..b5a2ae74b6 100644
--- a/synapse/replication/tcp/streams/partial_state.py
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -46,3 +46,31 @@ class UnPartialStatedRoomStream(Stream):
             current_token_without_instance(store.get_un_partial_stated_rooms_token),
             store.get_un_partial_stated_rooms_from_stream,
         )
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedEventStreamRow:
+    # ID of the event that has been un-partial-stated.
+    event_id: str
+
+    # True iff the rejection status of the event changed as a result of being
+    # un-partial-stated.
+    rejection_status_changed: bool
+
+
+class UnPartialStatedEventStream(Stream):
+    """
+    Stream to notify about events becoming un-partial-stated.
+    """
+
+    NAME = "un_partial_stated_event"
+    ROW_TYPE = UnPartialStatedEventStreamRow
+
+    def __init__(self, hs: "HomeServer"):
+        store = hs.get_datastores().main
+        super().__init__(
+            hs.get_instance_name(),
+            # TODO(faster_joins, multiple writers): we need to account for instance names
+            current_token_without_instance(store.get_un_partial_stated_events_token),
+            store.get_un_partial_stated_events_from_stream,
+        )
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 318fd7dc71..e19b16064b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -70,6 +70,7 @@ from synapse.storage.database import (
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import (
+    AbstractStreamIdGenerator,
     AbstractStreamIdTracker,
     MultiWriterIdGenerator,
     StreamIdGenerator,
@@ -292,6 +293,93 @@ class EventsWorkerStore(SQLBaseStore):
             id_column="chain_id",
         )
 
+        self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
+
+        if isinstance(database.engine, PostgresEngine):
+            self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="un_partial_stated_event_stream",
+                instance_name=hs.get_instance_name(),
+                tables=[
+                    ("un_partial_stated_event_stream", "instance_name", "stream_id")
+                ],
+                sequence_name="un_partial_stated_event_stream_sequence",
+                # TODO(faster_joins, multiple writers) Support multiple writers.
+                writers=["master"],
+            )
+        else:
+            self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
+                db_conn, "un_partial_stated_event_stream", "stream_id"
+            )
+
+    def get_un_partial_stated_events_token(self) -> int:
+        # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
+        #     writers because workers that don't write often will hold all
+        #     readers up.
+        return self._un_partial_stated_events_stream_id_gen.get_current_token()
+
+    async def get_un_partial_stated_events_from_stream(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+        """Get updates for the un-partial-stated events replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_un_partial_stated_events_from_stream_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+            sql = """
+                SELECT stream_id, event_id, rejection_status_changed
+                FROM un_partial_stated_event_stream
+                WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, instance_name, limit))
+            updates = [
+                (
+                    row[0],
+                    (
+                        row[1],
+                        bool(row[2]),
+                    ),
+                )
+                for row in txn
+            ]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db_pool.runInteraction(
+            "get_un_partial_stated_events_from_stream",
+            get_un_partial_stated_events_from_stream_txn,
+        )
+
     def process_replication_rows(
         self,
         stream_name: str,
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index c801a93b5b..f855903c39 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         hs: "HomeServer",
     ):
         super().__init__(database, db_conn, hs)
+        self._instance_name: str = hs.get_instance_name()
 
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
@@ -404,18 +405,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         context: EventContext,
     ) -> None:
         """Update the state group for a partial state event"""
-        await self.db_pool.runInteraction(
-            "update_state_for_partial_state_event",
-            self._update_state_for_partial_state_event_txn,
-            event,
-            context,
-        )
+        async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
+            await self.db_pool.runInteraction(
+                "update_state_for_partial_state_event",
+                self._update_state_for_partial_state_event_txn,
+                event,
+                context,
+                un_partial_state_event_stream_id,
+            )
 
     def _update_state_for_partial_state_event_txn(
         self,
         txn: LoggingTransaction,
         event: EventBase,
         context: EventContext,
+        un_partial_state_event_stream_id: int,
     ) -> None:
         # we shouldn't have any outliers here
         assert not event.internal_metadata.is_outlier()
@@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         # the event may now be rejected where it was not before, or vice versa,
         # in which case we need to update the rejected flags.
-        if bool(context.rejected) != (event.rejected_reason is not None):
+        rejection_status_changed = bool(context.rejected) != (
+            event.rejected_reason is not None
+        )
+        if rejection_status_changed:
             self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
 
         self.db_pool.simple_delete_one_txn(
@@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             keyvalues={"event_id": event.event_id},
         )
 
-        # TODO(faster_joins): need to do something about workers here
-        #   https://github.com/matrix-org/synapse/issues/12994
         txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
         txn.call_after(
             self._get_state_group_for_event.prefill,
@@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             state_group,
         )
 
+        self.db_pool.simple_insert_txn(
+            txn,
+            "un_partial_stated_event_stream",
+            {
+                "stream_id": un_partial_state_event_stream_id,
+                "instance_name": self._instance_name,
+                "event_id": event.event_id,
+                "rejection_status_changed": rejection_status_changed,
+            },
+        )
+
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 
diff --git a/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql
new file mode 100644
index 0000000000..0e571f78c3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql
@@ -0,0 +1,34 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that an event has become un-partial-stated.
+CREATE TABLE un_partial_stated_event_stream(
+    -- Position in the stream
+    stream_id BIGINT PRIMARY KEY NOT NULL,
+
+    -- Which instance wrote this entry.
+    instance_name TEXT NOT NULL,
+
+    -- Which event has been un-partial-stated.
+    event_id TEXT NOT NULL REFERENCES events(event_id) ON DELETE CASCADE,
+
+    -- true iff the `rejected` status of the event changed when it became
+    -- un-partial-stated.
+    rejection_status_changed BOOLEAN NOT NULL
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting an event, the database needs to be able to check here.
+CREATE UNIQUE INDEX un_partial_stated_event_stream_room_id ON un_partial_stated_event_stream (event_id);
diff --git a/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..1ec24702f3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_event_stream_sequence;
+
+SELECT setval('un_partial_stated_event_stream_sequence', (
+    SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_event_stream
+));