2 files changed, 11 insertions, 15 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0db5a3a24d..3a8c7c7e2d 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -87,7 +87,9 @@ class ReplicationCommandHandler:
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]
- self._position_linearizer = Linearizer("replication_position")
+ self._position_linearizer = Linearizer(
+ "replication_position", clock=self._clock
+ )
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index aa50492569..52df81b1bd 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -170,22 +170,16 @@ class EventsStream(Stream):
limited = False
upper_limit = current_token
- # next up is the state delta table
-
- state_rows = await self._store.get_all_updated_current_state_deltas(
+ # next up is the state delta table.
+ (
+ state_rows,
+ upper_limit,
+ state_rows_limited,
+ ) = await self._store.get_all_updated_current_state_deltas(
from_token, upper_limit, target_row_count
- ) # type: List[Tuple]
-
- # again, if we've hit the limit there, we'll need to limit the other sources
- assert len(state_rows) < target_row_count
- if len(state_rows) == target_row_count:
- assert state_rows[-1][0] <= upper_limit
- upper_limit = state_rows[-1][0]
- limited = True
+ )
- # FIXME: is it a given that there is only one row per stream_id in the
- # state_deltas table (so that we can be sure that we have got all of the
- # rows for upper_limit)?
+ limited = limited or state_rows_limited
# finally, fetch the ex-outliers rows. We assume there are few enough of these
# not to bother with the limit.
|