diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 682d47f402..54dccd15a6 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -345,7 +345,7 @@ class PushRulesStream(Stream):
def __init__(self, hs):
self.store = hs.get_datastore()
- super(PushRulesStream, self).__init__(
+ super().__init__(
hs.get_instance_name(),
self._current_token,
self.store.get_all_push_rule_updates,
@@ -383,7 +383,7 @@ class CachesStream(Stream):
the cache on the workers
"""
- @attr.s
+ @attr.s(slots=True)
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
@@ -441,7 +441,7 @@ class DeviceListsStream(Stream):
told about a device update.
"""
- @attr.s
+ @attr.s(slots=True)
class DeviceListsStreamRow:
entity = attr.ib(type=str)
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f929fc3954..ccc7ca30d8 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import List, Tuple, Type
import attr
-from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
+from ._base import Stream, StreamUpdateResult, Token
"""Handling of the 'events' replication stream
@@ -117,7 +117,7 @@ class EventsStream(Stream):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
- current_token_without_instance(self._store.get_current_events_token),
+ self._store._stream_id_gen.get_current_token_for_writer,
self._update_function,
)
|