summary refs log tree commit diff
path: root/synapse/replication/tcp/streams
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r--synapse/replication/tcp/streams/_base.py7
-rw-r--r--synapse/replication/tcp/streams/events.py18
2 files changed, 24 insertions, 1 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py

index 54dccd15a6..f3ea34f886 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -31,6 +31,7 @@ from typing import ( import attr from synapse.replication.http.streams import ReplicationGetStreamUpdates +from synapse.replication.tcp.commands import Command if TYPE_CHECKING: import synapse.server @@ -187,6 +188,12 @@ class Stream: ) return updates, upto_token, limited + def has_updates(self) -> bool: + return self.current_token(self.local_instance_name) != self.last_token + + def extra_commands(self, sent_updates: bool) -> List[Command]: + return [] + def current_token_without_instance( current_token: Callable[[], int] diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ccc7ca30d8..1aa7ba3da6 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,8 @@ from typing import List, Tuple, Type import attr -from ._base import Stream, StreamUpdateResult, Token +from synapse.replication.tcp.streams._base import Stream, StreamUpdateResult, Token +from synapse.replication.tcp.commands import Command, PersistedToCommand """Handling of the 'events' replication stream @@ -222,3 +223,18 @@ class EventsStream(Stream): (typ, data) = row data = TypeToRow[typ].from_data(data) return EventsStreamRow(typ, data) + + def has_updates(self) -> bool: + return True + + def extra_commands(self, sent_updates: bool) -> List[Command]: + if sent_updates: + return [] + + return [ + PersistedToCommand( + self.NAME, + self.local_instance_name, + self._store._stream_id_gen.get_max_persisted_position_for_self(), + ) + ]