diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 54dccd15a6..f3ea34f886 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -31,6 +31,7 @@ from typing import (
import attr
from synapse.replication.http.streams import ReplicationGetStreamUpdates
+from synapse.replication.tcp.commands import Command
if TYPE_CHECKING:
import synapse.server
@@ -187,6 +188,12 @@ class Stream:
)
return updates, upto_token, limited
+ def has_updates(self) -> bool:
+ return self.current_token(self.local_instance_name) != self.last_token
+
+ def extra_commands(self, sent_updates: bool) -> List[Command]:
+ return []
+
def current_token_without_instance(
current_token: Callable[[], int]
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ccc7ca30d8..1aa7ba3da6 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,8 @@ from typing import List, Tuple, Type
import attr
-from ._base import Stream, StreamUpdateResult, Token
+from synapse.replication.tcp.streams._base import Stream, StreamUpdateResult, Token
+from synapse.replication.tcp.commands import Command, PersistedToCommand
"""Handling of the 'events' replication stream
@@ -222,3 +223,18 @@ class EventsStream(Stream):
(typ, data) = row
data = TypeToRow[typ].from_data(data)
return EventsStreamRow(typ, data)
+
+ def has_updates(self) -> bool:
+ return True
+
+ def extra_commands(self, sent_updates: bool) -> List[Command]:
+ if sent_updates:
+ return []
+
+ return [
+ PersistedToCommand(
+ self.NAME,
+ self.local_instance_name,
+ self._store._stream_id_gen.get_max_persisted_position_for_self(),
+ )
+ ]
|