diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 318fd7dc71..e19b16064b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -70,6 +70,7 @@ from synapse.storage.database import (
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
+ AbstractStreamIdGenerator,
AbstractStreamIdTracker,
MultiWriterIdGenerator,
StreamIdGenerator,
@@ -292,6 +293,93 @@ class EventsWorkerStore(SQLBaseStore):
id_column="chain_id",
)
+ self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator
+
+ if isinstance(database.engine, PostgresEngine):
+ self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="un_partial_stated_event_stream",
+ instance_name=hs.get_instance_name(),
+ tables=[
+ ("un_partial_stated_event_stream", "instance_name", "stream_id")
+ ],
+ sequence_name="un_partial_stated_event_stream_sequence",
+ # TODO(faster_joins, multiple writers) Support multiple writers.
+ writers=["master"],
+ )
+ else:
+ self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
+ db_conn, "un_partial_stated_event_stream", "stream_id"
+ )
+
+ def get_un_partial_stated_events_token(self) -> int:
+ # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
+ # writers because workers that don't write often will hold all
+ # readers up.
+ return self._un_partial_stated_events_stream_id_gen.get_current_token()
+
+ async def get_un_partial_stated_events_from_stream(
+ self, instance_name: str, last_id: int, current_id: int, limit: int
+ ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+ """Get updates for the un-partial-stated events replication stream.
+
+ Args:
+ instance_name: The writer we want to fetch updates from. Unused
+ here since there is only ever one writer.
+ last_id: The token to fetch updates from. Exclusive.
+ current_id: The token to fetch updates up to. Inclusive.
+ limit: The requested limit for the number of rows to return. The
+ function may return more or fewer rows.
+
+ Returns:
+ A tuple consisting of: the updates, a token to use to fetch
+ subsequent updates, and whether we returned fewer rows than exists
+ between the requested tokens due to the limit.
+
+ The token returned can be used in a subsequent call to this
+ function to get further updatees.
+
+ The updates are a list of 2-tuples of stream ID and the row data
+ """
+
+ if last_id == current_id:
+ return [], current_id, False
+
+ def get_un_partial_stated_events_from_stream_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]:
+ sql = """
+ SELECT stream_id, event_id, rejection_status_changed
+ FROM un_partial_stated_event_stream
+ WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
+ updates = [
+ (
+ row[0],
+ (
+ row[1],
+ bool(row[2]),
+ ),
+ )
+ for row in txn
+ ]
+ limited = False
+ upto_token = current_id
+ if len(updates) >= limit:
+ upto_token = updates[-1][0]
+ limited = True
+
+ return updates, upto_token, limited
+
+ return await self.db_pool.runInteraction(
+ "get_un_partial_stated_events_from_stream",
+ get_un_partial_stated_events_from_stream_txn,
+ )
+
def process_replication_rows(
self,
stream_name: str,
|