summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-04-27 22:03:56 +0100
committerRichard van der Hoff <richard@matrix.org>2020-04-28 17:54:46 +0100
commit3778424b0e01a75d0ae8aaaa12fcd2d5159731da (patch)
tree1bbc80ec1deda498cf07c82ddb5258e00f94807d
parentRework TestReplicationDataHandler (diff)
downloadsynapse-3778424b0e01a75d0ae8aaaa12fcd2d5159731da.tar.xz
Fix AssertionErrors being thrown by EventsStream
Part of the problem was that there was an off-by-one error in the assertion,
but also the limit logic was too simple. Fix it all up and add some tests.
-rw-r--r--synapse/replication/tcp/streams/events.py28
-rw-r--r--synapse/storage/data_stores/main/events_worker.py14
-rw-r--r--tests/replication/tcp/streams/test_events.py390
-rw-r--r--tests/rest/client/v1/utils.py2
4 files changed, 423 insertions, 11 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index aa50492569..f4ebbeea89 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -176,16 +176,30 @@ class EventsStream(Stream):
             from_token, upper_limit, target_row_count
         )  # type: List[Tuple]
 
-        # again, if we've hit the limit there, we'll need to limit the other sources
-        assert len(state_rows) < target_row_count
+        assert len(state_rows) <= target_row_count
+
+        # there can be more than one row per stream_id in that table, so if we hit
+        # the limit there, we'll need to truncate the results so that we have a complete
+        # set of changes for all the stream IDs we include.
         if len(state_rows) == target_row_count:
             assert state_rows[-1][0] <= upper_limit
-            upper_limit = state_rows[-1][0]
-            limited = True
+            upper_limit = state_rows[-1][0] - 1
+
+            # search for the point to truncate the list
+            for idx in range(len(state_rows) - 1, 0, -1):
+                if state_rows[idx - 1][0] <= upper_limit:
+                    state_rows = state_rows[:idx]
+                    break
+            else:
+                # bother. We didn't get a full set of changes for even a single
+                # stream id. let's run the query again, without a row limit, but for
+                # just one stream id.
+                upper_limit += 1
+                state_rows = await self._store.get_all_updated_current_state_deltas(
+                    from_token, upper_limit, limit=None
+                )
 
-            # FIXME: is it a given that there is only one row per stream_id in the
-            # state_deltas table (so that we can be sure that we have got all of the
-            # rows for upper_limit)?
+            limited = True
 
         # finally, fetch the ex-outliers rows. We assume there are few enough of these
         # not to bother with the limit.
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index ce8be72bfe..bce9aa7fb8 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1084,15 +1084,23 @@ class EventsWorkerStore(SQLBaseStore):
             "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
         )
 
-    def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
+    def get_all_updated_current_state_deltas(
+        self, from_token: int, to_token: int, limit: Optional[int]
+    ):
         def get_all_updated_current_state_deltas_txn(txn):
             sql = """
                 SELECT stream_id, room_id, type, state_key, event_id
                 FROM current_state_delta_stream
                 WHERE ? < stream_id AND stream_id <= ?
-                ORDER BY stream_id ASC LIMIT ?
+                ORDER BY stream_id ASC
             """
-            txn.execute(sql, (from_token, to_token, limit))
+            params = [from_token, to_token]
+
+            if limit is not None:
+                sql += "LIMIT ?"
+                params.append(limit)
+
+            txn.execute(sql, params)
             return txn.fetchall()
 
         return self.db.runInteraction(
diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
new file mode 100644
index 0000000000..9894aca2bd
--- /dev/null
+++ b/tests/replication/tcp/streams/test_events.py
@@ -0,0 +1,390 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.events import EventBase
+from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
+from synapse.replication.tcp.streams.events import (
+    EventsStreamCurrentStateRow,
+    EventsStreamEventRow,
+    EventsStreamRow,
+)
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, room
+
+from tests.replication.tcp.streams._base import BaseStreamTestCase
+from tests.test_utils.event_injection import inject_event, inject_member_event
+
+
+class EventsStreamTestCase(BaseStreamTestCase):
+    servlets = [
+        admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs):
+        super().prepare(reactor, clock, hs)
+        self.user_id = self.register_user("u1", "pass")
+        self.user_tok = self.login("u1", "pass")
+
+        self.reconnect()
+        self.test_handler.stream_positions["events"] = 0
+
+        self.room_id = self.helper.create_room_as(tok=self.user_tok)
+        self.test_handler.received_rdata_rows.clear()
+
+    def test_update_function_event_row_limit(self):
+        """Test replication with many non-state events
+
+        Checks that all events are correctly replicated when there are lots of
+        event rows to be replicated.
+        """
+
+        # generate lots of non-state events. We inject them using inject_event
+        # so that they are not send out over replication until we call self.replicate().
+        events = [
+            self._inject_test_event()
+            for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 1)
+        ]
+
+        # also one state event
+        state_event = self._inject_state_event()
+
+        # check we're testing what we think we are: no rows should yet have been
+        # receieved
+        self.assertEqual([], self.test_handler.received_rdata_rows)
+
+        # now fire up the replicator
+        self.replicate()
+
+        # we should have received all the expected rows in the right order
+        received_rows = self.test_handler.received_rdata_rows
+        for event in events:
+            stream_name, token, row = received_rows.pop(0)
+            self.assertEqual("events", stream_name)
+            self.assertIsInstance(row, EventsStreamRow)
+            self.assertEqual(row.type, "ev")
+            self.assertIsInstance(row.data, EventsStreamEventRow)
+            self.assertEqual(row.data.event_id, event.event_id)
+
+        stream_name, token, row = received_rows.pop(0)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertIsInstance(row.data, EventsStreamEventRow)
+        self.assertEqual(row.data.event_id, state_event.event_id)
+
+        stream_name, token, row = received_rows.pop(0)
+        self.assertEqual("events", stream_name)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertEqual(row.type, "state")
+        self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+        self.assertEqual(row.data.event_id, state_event.event_id)
+
+        self.assertEqual([], received_rows)
+
+    def test_update_function_huge_state_change(self):
+        """Test replication with many state events
+
+        Ensures that all events are correctly replicated when there are lots of
+        state change rows to be replicated.
+        """
+
+        # we want to generate lots of state changes at a single stream ID.
+        #
+        # We do this by having two branches in the DAG. On one, we have a moderator
+        # which that generates lots of state; on the other, we de-op the moderator,
+        # thus invalidating all the state.
+
+        OTHER_USER = "@other_user:localhost"
+
+        # have the user join
+        inject_member_event(self.hs, self.room_id, OTHER_USER, Membership.JOIN)
+
+        # Update existing power levels with mod at PL50
+        pls = self.helper.get_state(
+            self.room_id, EventTypes.PowerLevels, tok=self.user_tok
+        )
+        pls["users"][OTHER_USER] = 50
+        self.helper.send_state(
+            self.room_id, EventTypes.PowerLevels, pls, tok=self.user_tok,
+        )
+
+        # this is the point in the DAG where we make a fork
+        fork_point = self.get_success(
+            self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
+        )  # type: List[str]
+
+        events = [
+            self._inject_state_event(sender=OTHER_USER)
+            for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
+        ]
+
+        self.replicate()
+        # all those events and state changes should have landed
+        self.assertGreaterEqual(
+            len(self.test_handler.received_rdata_rows), 2 * len(events)
+        )
+        self.test_handler.received_rdata_rows.clear()
+
+        # a state event which doesn't get rolled back, to check that the state
+        # before the huge update comes through ok
+        state1 = self._inject_state_event()
+
+        # roll back all the state by de-modding the user
+        prev_events = fork_point
+        pls["users"][OTHER_USER] = 0
+        pl_event = inject_event(
+            self.hs,
+            prev_event_ids=prev_events,
+            type=EventTypes.PowerLevels,
+            state_key="",
+            sender=self.user_id,
+            room_id=self.room_id,
+            content=pls,
+        )
+
+        # one more bit of state that doesn't get rolled back
+        state2 = self._inject_state_event()
+
+        # check we're testing what we think we are: no rows should yet have been
+        # receieved
+        self.assertEqual([], self.test_handler.received_rdata_rows)
+
+        # now fire up the replicator
+        self.replicate()
+
+        # now we should have received all the expected rows in the right order.
+        #
+        # we expect:
+        #
+        # - two rows for state1
+        # - the PL event row, plus state rows for the PL event and each
+        #       of the states that got reverted.
+        # - two rows for state2
+
+        received_rows = self.test_handler.received_rdata_rows
+
+        # first check the first two rows, which should be state1
+
+        stream_name, token, row = received_rows.pop(0)
+        self.assertEqual("events", stream_name)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertEqual(row.type, "ev")
+        self.assertIsInstance(row.data, EventsStreamEventRow)
+        self.assertEqual(row.data.event_id, state1.event_id)
+
+        stream_name, token, row = received_rows.pop(0)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertEqual(row.type, "state")
+        self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+        self.assertEqual(row.data.event_id, state1.event_id)
+
+        # now the last two rows, which should be state2
+        stream_name, token, row = received_rows.pop(-2)
+        self.assertEqual("events", stream_name)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertEqual(row.type, "ev")
+        self.assertIsInstance(row.data, EventsStreamEventRow)
+        self.assertEqual(row.data.event_id, state2.event_id)
+
+        stream_name, token, row = received_rows.pop(-1)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertEqual(row.type, "state")
+        self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+        self.assertEqual(row.data.event_id, state2.event_id)
+
+        # that should leave us with the rows for the PL event
+        self.assertEqual(len(received_rows), len(events) + 2)
+
+        stream_name, token, row = received_rows.pop(0)
+        self.assertEqual("events", stream_name)
+        self.assertIsInstance(row, EventsStreamRow)
+        self.assertEqual(row.type, "ev")
+        self.assertIsInstance(row.data, EventsStreamEventRow)
+        self.assertEqual(row.data.event_id, pl_event.event_id)
+
+        # the state rows are unsorted
+        state_rows = []  # type: List[EventsStreamCurrentStateRow]
+        for stream_name, token, row in received_rows:
+            self.assertEqual("events", stream_name)
+            self.assertIsInstance(row, EventsStreamRow)
+            self.assertEqual(row.type, "state")
+            self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+            state_rows.append(row.data)
+
+        state_rows.sort(key=lambda r: r.state_key)
+
+        sr = state_rows.pop(0)
+        self.assertEqual(sr.type, EventTypes.PowerLevels)
+        self.assertEqual(sr.event_id, pl_event.event_id)
+        for sr in state_rows:
+            self.assertEqual(sr.type, "test_state_event")
+            # "None" indicates the state has been deleted
+            self.assertIsNone(sr.event_id)
+
+    def test_update_function_state_row_limit(self):
+        """Test replication with many state events over several stream ids.
+        """
+
+        # we want to generate lots of state changes, but for this test, we want to
+        # spread out the state changes over a few stream IDs.
+        #
+        # We do this by having two branches in the DAG. On one, we have four moderators,
+        # each of which that generates lots of state; on the other, we de-op the users,
+        # thus invalidating all the state.
+
+        NUM_USERS = 4
+        STATES_PER_USER = _STREAM_UPDATE_TARGET_ROW_COUNT // 4 + 1
+
+        user_ids = ["@user%i:localhost" % (i,) for i in range(NUM_USERS)]
+
+        # have the users join
+        for u in user_ids:
+            inject_member_event(self.hs, self.room_id, u, Membership.JOIN)
+
+        # Update existing power levels with mod at PL50
+        pls = self.helper.get_state(
+            self.room_id, EventTypes.PowerLevels, tok=self.user_tok
+        )
+        pls["users"].update({u: 50 for u in user_ids})
+        self.helper.send_state(
+            self.room_id, EventTypes.PowerLevels, pls, tok=self.user_tok,
+        )
+
+        # this is the point in the DAG where we make a fork
+        fork_point = self.get_success(
+            self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
+        )  # type: List[str]
+
+        events = []  # type: List[EventBase]
+        for user in user_ids:
+            events.extend(
+                self._inject_state_event(sender=user) for _ in range(STATES_PER_USER)
+            )
+
+        self.replicate()
+        # all those events and state changes should have landed
+        self.assertGreaterEqual(
+            len(self.test_handler.received_rdata_rows), 2 * len(events)
+        )
+        self.test_handler.received_rdata_rows.clear()
+
+        # now roll back all that state by de-modding the users
+        prev_events = fork_point
+        pl_events = []
+        for u in user_ids:
+            pls["users"][u] = 0
+            e = inject_event(
+                self.hs,
+                prev_event_ids=prev_events,
+                type=EventTypes.PowerLevels,
+                state_key="",
+                sender=self.user_id,
+                room_id=self.room_id,
+                content=pls,
+            )
+            prev_events = [e.event_id]
+            pl_events.append(e)
+
+        # check we're testing what we think we are: no rows should yet have been
+        # receieved
+        self.assertEqual([], self.test_handler.received_rdata_rows)
+
+        # now fire up the replicator
+        self.replicate()
+
+        # we should have received all the expected rows in the right order
+
+        received_rows = self.test_handler.received_rdata_rows
+        self.assertGreaterEqual(len(received_rows), len(events))
+        for i in range(NUM_USERS):
+            # for each user, we expect the PL event row, followed by state rows for
+            # the PL event and each of the states that got reverted.
+            stream_name, token, row = received_rows.pop(0)
+            self.assertEqual("events", stream_name)
+            self.assertIsInstance(row, EventsStreamRow)
+            self.assertEqual(row.type, "ev")
+            self.assertIsInstance(row.data, EventsStreamEventRow)
+            self.assertEqual(row.data.event_id, pl_events[i].event_id)
+
+            # the state rows are unsorted
+            state_rows = []  # type: List[EventsStreamCurrentStateRow]
+            for j in range(STATES_PER_USER + 1):
+                stream_name, token, row = received_rows.pop(0)
+                self.assertEqual("events", stream_name)
+                self.assertIsInstance(row, EventsStreamRow)
+                self.assertEqual(row.type, "state")
+                self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+                state_rows.append(row.data)
+
+            state_rows.sort(key=lambda r: r.state_key)
+
+            sr = state_rows.pop(0)
+            self.assertEqual(sr.type, EventTypes.PowerLevels)
+            self.assertEqual(sr.event_id, pl_events[i].event_id)
+            for sr in state_rows:
+                self.assertEqual(sr.type, "test_state_event")
+                # "None" indicates the state has been deleted
+                self.assertIsNone(sr.event_id)
+
+        self.assertEqual([], received_rows)
+
+    event_count = 0
+
+    def _inject_test_event(
+        self, body: Optional[str] = None, sender: Optional[str] = None, **kwargs
+    ) -> EventBase:
+        if sender is None:
+            sender = self.user_id
+
+        if body is None:
+            body = "event %i" % (self.event_count,)
+            self.event_count += 1
+
+        return inject_event(
+            self.hs,
+            room_id=self.room_id,
+            sender=sender,
+            type="test_event",
+            content={"body": body},
+            **kwargs
+        )
+
+    def _inject_state_event(
+        self,
+        body: Optional[str] = None,
+        state_key: Optional[str] = None,
+        sender: Optional[str] = None,
+    ) -> EventBase:
+        if sender is None:
+            sender = self.user_id
+
+        if state_key is None:
+            state_key = "state_%i" % (self.event_count,)
+            self.event_count += 1
+
+        if body is None:
+            body = "state event %s" % (state_key,)
+
+        return inject_event(
+            self.hs,
+            room_id=self.room_id,
+            sender=sender,
+            type="test_state_event",
+            state_key=state_key,
+            content={"body": body},
+        )
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index 371637618d..22d734e763 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -39,7 +39,7 @@ class RestHelper(object):
     resource = attr.ib()
     auth_user_id = attr.ib()
 
-    def create_room_as(self, room_creator, is_public=True, tok=None):
+    def create_room_as(self, room_creator=None, is_public=True, tok=None):
         temp_id = self.auth_user_id
         self.auth_user_id = room_creator
         path = "/_matrix/client/r0/createRoom"