diff options
Diffstat (limited to 'tests/replication/tcp/streams/test_events.py')
-rw-r--r-- | tests/replication/tcp/streams/test_events.py | 74 |
1 files changed, 59 insertions, 15 deletions
diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py index 51bf0ef4e9..097e1653b4 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py @@ -17,6 +17,7 @@ from typing import List, Optional from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase +from synapse.replication.tcp.commands import RdataCommand from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT from synapse.replication.tcp.streams.events import ( EventsStreamCurrentStateRow, @@ -66,11 +67,6 @@ class EventsStreamTestCase(BaseStreamTestCase): # also one state event state_event = self._inject_state_event() - # tell the notifier to catch up to avoid duplicate rows. - # workaround for https://github.com/matrix-org/synapse/issues/7360 - # FIXME remove this when the above is fixed - self.replicate() - # check we're testing what we think we are: no rows should yet have been # received self.assertEqual([], self.test_handler.received_rdata_rows) @@ -174,11 +170,6 @@ class EventsStreamTestCase(BaseStreamTestCase): # one more bit of state that doesn't get rolled back state2 = self._inject_state_event() - # tell the notifier to catch up to avoid duplicate rows. - # workaround for https://github.com/matrix-org/synapse/issues/7360 - # FIXME remove this when the above is fixed - self.replicate() - # check we're testing what we think we are: no rows should yet have been # received self.assertEqual([], self.test_handler.received_rdata_rows) @@ -327,11 +318,6 @@ class EventsStreamTestCase(BaseStreamTestCase): prev_events = [e.event_id] pl_events.append(e) - # tell the notifier to catch up to avoid duplicate rows. - # workaround for https://github.com/matrix-org/synapse/issues/7360 - # FIXME remove this when the above is fixed - self.replicate() - # check we're testing what we think we are: no rows should yet have been # received self.assertEqual([], self.test_handler.received_rdata_rows) @@ -378,6 +364,64 @@ class EventsStreamTestCase(BaseStreamTestCase): self.assertEqual([], received_rows) + def test_backwards_stream_id(self): + """ + Test that RDATA that comes after the current position should be discarded. + """ + # disconnect, so that we can stack up some changes + self.disconnect() + + # Generate an events. We inject them using inject_event so that they are + # not send out over replication until we call self.replicate(). + event = self._inject_test_event() + + # check we're testing what we think we are: no rows should yet have been + # received + self.assertEqual([], self.test_handler.received_rdata_rows) + + # now reconnect to pull the updates + self.reconnect() + self.replicate() + + # We should have received the expected single row (as well as various + # cache invalidation updates which we ignore). + received_rows = [ + row for row in self.test_handler.received_rdata_rows if row[0] == "events" + ] + + # There should be a single received row. + self.assertEqual(len(received_rows), 1) + + stream_name, token, row = received_rows[0] + self.assertEqual("events", stream_name) + self.assertIsInstance(row, EventsStreamRow) + self.assertEqual(row.type, "ev") + self.assertIsInstance(row.data, EventsStreamEventRow) + self.assertEqual(row.data.event_id, event.event_id) + + # Reset the data. + self.test_handler.received_rdata_rows = [] + + # Save the current token for later. + worker_events_stream = self.worker_hs.get_replication_streams()["events"] + prev_token = worker_events_stream.current_token("master") + + # Manually send an old RDATA command, which should get dropped. This + # re-uses the row from above, but with an earlier stream token. + self.hs.get_tcp_replication().send_command( + RdataCommand("events", "master", 1, row) + ) + + # No updates have been received (because it was discard as old). + received_rows = [ + row for row in self.test_handler.received_rdata_rows if row[0] == "events" + ] + self.assertEqual(len(received_rows), 0) + + # Ensure the stream has not gone backwards. + current_token = worker_events_stream.current_token("master") + self.assertGreaterEqual(current_token, prev_token) + event_count = 0 def _inject_test_event( |