summary refs log tree commit diff
path: root/tests/replication/tcp/streams/test_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/replication/tcp/streams/test_events.py')
-rw-r--r--tests/replication/tcp/streams/test_events.py74
1 files changed, 59 insertions, 15 deletions
diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
index 51bf0ef4e9..097e1653b4 100644
--- a/tests/replication/tcp/streams/test_events.py
+++ b/tests/replication/tcp/streams/test_events.py
@@ -17,6 +17,7 @@ from typing import List, Optional
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
+from synapse.replication.tcp.commands import RdataCommand
 from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
 from synapse.replication.tcp.streams.events import (
     EventsStreamCurrentStateRow,
@@ -66,11 +67,6 @@ class EventsStreamTestCase(BaseStreamTestCase):
         # also one state event
         state_event = self._inject_state_event()
 
-        # tell the notifier to catch up to avoid duplicate rows.
-        # workaround for https://github.com/matrix-org/synapse/issues/7360
-        # FIXME remove this when the above is fixed
-        self.replicate()
-
         # check we're testing what we think we are: no rows should yet have been
         # received
         self.assertEqual([], self.test_handler.received_rdata_rows)
@@ -174,11 +170,6 @@ class EventsStreamTestCase(BaseStreamTestCase):
         # one more bit of state that doesn't get rolled back
         state2 = self._inject_state_event()
 
-        # tell the notifier to catch up to avoid duplicate rows.
-        # workaround for https://github.com/matrix-org/synapse/issues/7360
-        # FIXME remove this when the above is fixed
-        self.replicate()
-
         # check we're testing what we think we are: no rows should yet have been
         # received
         self.assertEqual([], self.test_handler.received_rdata_rows)
@@ -327,11 +318,6 @@ class EventsStreamTestCase(BaseStreamTestCase):
             prev_events = [e.event_id]
             pl_events.append(e)
 
-        # tell the notifier to catch up to avoid duplicate rows.
-        # workaround for https://github.com/matrix-org/synapse/issues/7360
-        # FIXME remove this when the above is fixed
-        self.replicate()
-
         # check we're testing what we think we are: no rows should yet have been
         # received
         self.assertEqual([], self.test_handler.received_rdata_rows)
@@ -378,6 +364,64 @@ class EventsStreamTestCase(BaseStreamTestCase):
 
         self.assertEqual([], received_rows)
 
+    def test_backwards_stream_id(self):
+        """
+        Test that RDATA that comes after the current position should be discarded.
+        """
+        # disconnect, so that we can stack up some changes
+        self.disconnect()
+
+        # Generate an events. We inject them using inject_event so that they are
+        # not send out over replication until we call self.replicate().
+        event = self._inject_test_event()
+
+        # check we're testing what we think we are: no rows should yet have been
+        # received
+        self.assertEqual([], self.test_handler.received_rdata_rows)
+
+        # now reconnect to pull the updates
+        self.reconnect()
+        self.replicate()
+
+        # We should have received the expected single row (as well as various
+        # cache invalidation updates which we ignore).
+        received_rows = [
+            row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+        ]
+
+        # There should be a single received row.
+        self.assertEqual(len(received_rows), 1)
+
+        stream_name, token, row = received_rows[0]
+        self.assertEqual("events", stream_name)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertEqual(row.type, "ev")
+        self.assertIsInstance(row.data, EventsStreamEventRow)
+        self.assertEqual(row.data.event_id, event.event_id)
+
+        # Reset the data.
+        self.test_handler.received_rdata_rows = []
+
+        # Save the current token for later.
+        worker_events_stream = self.worker_hs.get_replication_streams()["events"]
+        prev_token = worker_events_stream.current_token("master")
+
+        # Manually send an old RDATA command, which should get dropped. This
+        # re-uses the row from above, but with an earlier stream token.
+        self.hs.get_tcp_replication().send_command(
+            RdataCommand("events", "master", 1, row)
+        )
+
+        # No updates have been received (because it was discard as old).
+        received_rows = [
+            row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+        ]
+        self.assertEqual(len(received_rows), 0)
+
+        # Ensure the stream has not gone backwards.
+        current_token = worker_events_stream.current_token("master")
+        self.assertGreaterEqual(current_token, prev_token)
+
     event_count = 0
 
     def _inject_test_event(