summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-04-29 12:30:36 +0100
committerGitHub <noreply@github.com>2020-04-29 12:30:36 +0100
commitc2e1a2110fbe9ead26b4ecbb1afd504ed035a04d (patch)
treefabef4097b767da1d68d2b955ba186e33d21fa91 /synapse/replication
parentFix fallback value for account_threepid_delegates.email (#7316) (diff)
downloadsynapse-c2e1a2110fbe9ead26b4ecbb1afd504ed035a04d.tar.xz
Fix limit logic for EventsStream (#7358)
* Factor out functions for injecting events into database

I want to add some more flexibility to the tools for injecting events into the
database, and I don't want to clutter up HomeserverTestCase with them, so let's
factor them out to a new file.

* Rework TestReplicationDataHandler

This wasn't very easy to work with: the mock wrapping was largely superfluous,
and it's useful to be able to inspect the received rows, and clear out the
received list.

* Fix AssertionErrors being thrown by EventsStream

Part of the problem was that there was an off-by-one error in the assertion,
but also the limit logic was too simple. Fix it all up and add some tests.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/handler.py4
-rw-r--r--synapse/replication/tcp/streams/events.py22
2 files changed, 11 insertions, 15 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0db5a3a24d..3a8c7c7e2d 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -87,7 +87,9 @@ class ReplicationCommandHandler:
             stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
         }  # type: Dict[str, Stream]
 
-        self._position_linearizer = Linearizer("replication_position")
+        self._position_linearizer = Linearizer(
+            "replication_position", clock=self._clock
+        )
 
         # Map of stream to batched updates. See RdataCommand for info on how
         # batching works.
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index aa50492569..52df81b1bd 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -170,22 +170,16 @@ class EventsStream(Stream):
             limited = False
             upper_limit = current_token
 
-        # next up is the state delta table
-
-        state_rows = await self._store.get_all_updated_current_state_deltas(
+        # next up is the state delta table.
+        (
+            state_rows,
+            upper_limit,
+            state_rows_limited,
+        ) = await self._store.get_all_updated_current_state_deltas(
             from_token, upper_limit, target_row_count
-        )  # type: List[Tuple]
-
-        # again, if we've hit the limit there, we'll need to limit the other sources
-        assert len(state_rows) < target_row_count
-        if len(state_rows) == target_row_count:
-            assert state_rows[-1][0] <= upper_limit
-            upper_limit = state_rows[-1][0]
-            limited = True
+        )
 
-            # FIXME: is it a given that there is only one row per stream_id in the
-            # state_deltas table (so that we can be sure that we have got all of the
-            # rows for upper_limit)?
+        limited = limited or state_rows_limited
 
         # finally, fetch the ex-outliers rows. We assume there are few enough of these
         # not to bother with the limit.