diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 7a42de3f7d..682d47f402 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -79,7 +79,7 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool]
UpdateFunction = Callable[[str, Token, Token, int], Awaitable[StreamUpdateResult]]
-class Stream(object):
+class Stream:
"""Base class for the streams.
Provides a `get_updates()` function that returns new updates since the last
@@ -352,7 +352,7 @@ class PushRulesStream(Stream):
)
def _current_token(self, instance_name: str) -> int:
- push_rules_token, _ = self.store.get_push_rules_stream_token()
+ push_rules_token = self.store.get_max_push_rules_stream_id()
return push_rules_token
@@ -405,7 +405,7 @@ class CachesStream(Stream):
store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
- store.get_cache_stream_token,
+ store.get_cache_stream_token_for_writer,
store.get_all_updated_caches,
)
|