diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 318fd7dc71..e19b16064b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -70,6 +70,7 @@ from synapse.storage.database import (
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
AbstractStreamIdTracker,
MultiWriterIdGenerator,
StreamIdGenerator,
@@ -292,6 +293,93 @@ class EventsWorkerStore(SQLBaseStore):
id_column="chain_id",
)
+ self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
+
+ if isinstance(database.engine, PostgresEngine):
+ self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="un_partial_stated_event_stream",
+ instance_name=hs.get_instance_name(),
+ tables=[
+ ("un_partial_stated_event_stream", "instance_name", "stream_id")
+ ],
+ sequence_name="un_partial_stated_event_stream_sequence",
+ # TODO(faster_joins, multiple writers) Support multiple writers.
+ writers=["master"],
+ )
+ else:
+ self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
+ db_conn, "un_partial_stated_event_stream", "stream_id"
+ )
+
+ def get_un_partial_stated_events_token(self) -> int:
+ # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
+ # writers because workers that don't write often will hold all
+ # readers up.
+ return self._un_partial_stated_events_stream_id_gen.get_current_token()
+
+ async def get_un_partial_stated_events_from_stream(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+ """Get updates for the un-partial-stated events replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_un_partial_stated_events_from_stream_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+ sql = """
+ SELECT stream_id, event_id, rejection_status_changed
+ FROM un_partial_stated_event_stream
+ WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
+ updates = [
+ (
+ row[0],
+ (
+ row[1],
+ bool(row[2]),
+ ),
+ )
+ for row in txn
+ ]
+ limited = False
+ upto_token = current_id
+ if len(updates) >= limit:
+ upto_token = updates[-1][0]
+ limited = True
+
+ return updates, upto_token, limited
+
+ return await self.db_pool.runInteraction(
+ "get_un_partial_stated_events_from_stream",
+ get_un_partial_stated_events_from_stream_txn,
+ )
+
def process_replication_rows(
self,
stream_name: str,
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index c801a93b5b..f855903c39 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
+ self._instance_name: str = hs.get_instance_name()
async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
@@ -404,18 +405,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
context: EventContext,
) -> None:
"""Update the state group for a partial state event"""
- await self.db_pool.runInteraction(
- "update_state_for_partial_state_event",
- self._update_state_for_partial_state_event_txn,
- event,
- context,
- )
+ async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
+ await self.db_pool.runInteraction(
+ "update_state_for_partial_state_event",
+ self._update_state_for_partial_state_event_txn,
+ event,
+ context,
+ un_partial_state_event_stream_id,
+ )
def _update_state_for_partial_state_event_txn(
self,
txn: LoggingTransaction,
event: EventBase,
context: EventContext,
+ un_partial_state_event_stream_id: int,
) -> None:
# we shouldn't have any outliers here
assert not event.internal_metadata.is_outlier()
@@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# the event may now be rejected where it was not before, or vice versa,
# in which case we need to update the rejected flags.
- if bool(context.rejected) != (event.rejected_reason is not None):
+ rejection_status_changed = bool(context.rejected) != (
+ event.rejected_reason is not None
+ )
+ if rejection_status_changed:
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
self.db_pool.simple_delete_one_txn(
@@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
keyvalues={"event_id": event.event_id},
)
- # TODO(faster_joins): need to do something about workers here
- # https://github.com/matrix-org/synapse/issues/12994
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
txn.call_after(
self._get_state_group_for_event.prefill,
@@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
state_group,
)
+ self.db_pool.simple_insert_txn(
+ txn,
+ "un_partial_stated_event_stream",
+ {
+ "stream_id": un_partial_state_event_stream_id,
+ "instance_name": self._instance_name,
+ "event_id": event.event_id,
+ "rejection_status_changed": rejection_status_changed,
+ },
+ )
+
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
diff --git a/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql
new file mode 100644
index 0000000000..0e571f78c3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql
@@ -0,0 +1,34 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that an event has become un-partial-stated.
+CREATE TABLE un_partial_stated_event_stream(
+ -- Position in the stream
+ stream_id BIGINT PRIMARY KEY NOT NULL,
+
+ -- Which instance wrote this entry.
+ instance_name TEXT NOT NULL,
+
+ -- Which event has been un-partial-stated.
+ event_id TEXT NOT NULL REFERENCES events(event_id) ON DELETE CASCADE,
+
+ -- true iff the `rejected` status of the event changed when it became
+ -- un-partial-stated.
+ rejection_status_changed BOOLEAN NOT NULL
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting an event, the database needs to be able to check here.
+CREATE UNIQUE INDEX un_partial_stated_event_stream_room_id ON un_partial_stated_event_stream (event_id);
diff --git a/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..1ec24702f3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_event_stream_sequence;
+
+SELECT setval('un_partial_stated_event_stream_sequence', (
+ SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_event_stream
+));
|