diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 54dccd15a6..61b282ab2d 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -240,13 +240,18 @@ class BackfillStream(Stream):
ROW_TYPE = BackfillStreamRow
def __init__(self, hs):
- store = hs.get_datastore()
+ self.store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
- current_token_without_instance(store.get_current_backfill_token),
- store.get_all_new_backfill_event_rows,
+ self._current_token,
+ self.store.get_all_new_backfill_event_rows,
)
+ def _current_token(self, instance_name: str) -> int:
+ # The backfill stream over replication operates on *positive* numbers,
+ # which means we need to negate it.
+ return -self.store._backfill_id_gen.get_current_token_for_writer(instance_name)
+
class PresenceStream(Stream):
PresenceStreamRow = namedtuple(
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ccc7ca30d8..82e9e0d64e 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -155,7 +155,7 @@ class EventsStream(Stream):
# now we fetch up to that many rows from the events table
event_rows = await self._store.get_all_new_forward_event_rows(
- from_token, current_token, target_row_count
+ instance_name, from_token, current_token, target_row_count
) # type: List[Tuple]
# we rely on get_all_new_forward_event_rows strictly honouring the limit, so
@@ -180,7 +180,7 @@ class EventsStream(Stream):
upper_limit,
state_rows_limited,
) = await self._store.get_all_updated_current_state_deltas(
- from_token, upper_limit, target_row_count
+ instance_name, from_token, upper_limit, target_row_count
)
limited = limited or state_rows_limited
@@ -189,7 +189,7 @@ class EventsStream(Stream):
# not to bother with the limit.
ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
- from_token, upper_limit
+ instance_name, from_token, upper_limit
) # type: List[Tuple]
# we now need to turn the raw database rows returned into tuples suitable
|