1 files changed, 17 insertions, 1 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ccc7ca30d8..1aa7ba3da6 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,8 @@ from typing import List, Tuple, Type
import attr
-from ._base import Stream, StreamUpdateResult, Token
+from synapse.replication.tcp.streams._base import Stream, StreamUpdateResult, Token
+from synapse.replication.tcp.commands import Command, PersistedToCommand
"""Handling of the 'events' replication stream
@@ -222,3 +223,18 @@ class EventsStream(Stream):
(typ, data) = row
data = TypeToRow[typ].from_data(data)
return EventsStreamRow(typ, data)
+
+ def has_updates(self) -> bool:
+ return True
+
+ def extra_commands(self, sent_updates: bool) -> List[Command]:
+ if sent_updates:
+ return []
+
+ return [
+ PersistedToCommand(
+ self.NAME,
+ self.local_instance_name,
+ self._store._stream_id_gen.get_max_persisted_position_for_self(),
+ )
+ ]
|