From a5166e4d5febc0e03ba9da9db99127a797a0bc4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 14:08:35 +0100 Subject: Land improved room list based on room stats (#6019) Use room_stats and room_state for room directory search --- tests/handlers/test_roomlist.py | 39 --------------------------------------- 1 file changed, 39 deletions(-) delete mode 100644 tests/handlers/test_roomlist.py (limited to 'tests/handlers') diff --git a/tests/handlers/test_roomlist.py b/tests/handlers/test_roomlist.py deleted file mode 100644 index 61eebb6985..0000000000 --- a/tests/handlers/test_roomlist.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.handlers.room_list import RoomListNextBatch - -import tests.unittest -import tests.utils - - -class RoomListTestCase(tests.unittest.TestCase): - """ Tests RoomList's RoomListNextBatch. """ - - def setUp(self): - pass - - def test_check_read_batch_tokens(self): - batch_token = RoomListNextBatch( - stream_ordering="abcdef", - public_room_stream_id="123", - current_limit=20, - direction_is_forward=True, - ).to_token() - next_batch = RoomListNextBatch.from_token(batch_token) - self.assertEquals(next_batch.stream_ordering, "abcdef") - self.assertEquals(next_batch.public_room_stream_id, "123") - self.assertEquals(next_batch.current_limit, 20) - self.assertEquals(next_batch.direction_is_forward, True) -- cgit 1.5.1 From 6527fa18c1e6f9bcb22318916b5e9534c91c84c1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 2 Oct 2019 14:44:58 +0100 Subject: Add test case --- synapse/handlers/federation.py | 2 +- tests/handlers/test_federation.py | 83 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 tests/handlers/test_federation.py (limited to 'tests/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 75d79bb8e4..91f3a69298 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2570,7 +2570,7 @@ class FederationHandler(BaseHandler): ) try: - self.auth.check_from_context(room_version, event, context) + yield self.auth.check_from_context(room_version, event, context) except AuthError as e: logger.warn("Denying third party invite %r because %s", event, e) raise e diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py new file mode 100644 index 0000000000..20416a0142 --- /dev/null +++ b/tests/handlers/test_federation.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.constants import EventTypes +from synapse.api.errors import AuthError, Codes +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests import unittest + + +class FederationTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver(http_client=None) + self.handler = hs.get_handlers().federation_handler + self.store = hs.get_datastore() + return hs + + def test_exchange_revoked_invite(self): + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + + room_id = self.helper.create_room_as( + room_creator=user_id, tok=tok + ) + + # Send a 3PID invite event with an empty body so it's considered as a revoked one. + invite_token = "sometoken" + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.ThirdPartyInvite, + state_key=invite_token, + body={}, + tok=tok, + ) + + d = self.handler.on_exchange_third_party_invite_request( + room_id=room_id, + event_dict={ + "type": EventTypes.Member, + "room_id": room_id, + "sender": user_id, + "state_key": "@someone:example.org", + "content": { + "membership": "invite", + "third_party_invite": { + "display_name": "alice", + "signed": { + "mxid": "@alice:localhost", + "token": invite_token, + "signatures": { + "magic.forest": { + "ed25519:3": "fQpGIW1Snz+pwLZu6sTy2aHy/DYWWTspTJRPyNp0PKkymfIsNffysMl6ObMMFdIJhk6g6pwlIqZ54rxo8SLmAg" + } + } + } + } + } + } + ) + + failure = self.get_failure(d, AuthError).value + + self.assertEqual(failure.code, 403, failure) + self.assertEqual(failure.errcode, Codes.FORBIDDEN, failure) + self.assertEqual(failure.msg, "You are not invited to this room.") -- cgit 1.5.1 From ebcb6a30d7b1bdb859a1fd22d567b163a1488763 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 11:29:07 +0100 Subject: Lint --- tests/handlers/test_federation.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'tests/handlers') diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 20416a0142..a18dfc0e96 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -69,11 +69,11 @@ class FederationTestCase(unittest.HomeserverTestCase): "magic.forest": { "ed25519:3": "fQpGIW1Snz+pwLZu6sTy2aHy/DYWWTspTJRPyNp0PKkymfIsNffysMl6ObMMFdIJhk6g6pwlIqZ54rxo8SLmAg" } - } - } - } - } - } + }, + }, + }, + }, + }, ) failure = self.get_failure(d, AuthError).value -- cgit 1.5.1 From 8a5e8e829b98687ea274fae47db3aa801b6f97d3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 11:30:43 +0100 Subject: Lint (again) --- tests/handlers/test_federation.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'tests/handlers') diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index a18dfc0e96..d56220f403 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -37,9 +37,7 @@ class FederationTestCase(unittest.HomeserverTestCase): user_id = self.register_user("kermit", "test") tok = self.login("kermit", "test") - room_id = self.helper.create_room_as( - room_creator=user_id, tok=tok - ) + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) # Send a 3PID invite event with an empty body so it's considered as a revoked one. invite_token = "sometoken" -- cgit 1.5.1 From a139420a3cfda6a4a4ee4750611b31dd71fc33f3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 10 Oct 2019 11:29:01 +0100 Subject: Fix races in room stats (and other) updates. (#6187) Hopefully this will fix the occasional failures we were seeing in the room directory. The problem was that events are not necessarily persisted (and `current_state_delta_stream` updated) in the same order as their stream_id. So for instance current_state_delta 9 might be persisted *before* current_state_delta 8. Then, when the room stats saw stream_id 9, it assumed it had done everything up to 9, and never came back to do stream_id 8. We can solve this easily by only processing up to the stream_id where we know all events have been persisted. --- changelog.d/6187.bugfix | 1 + synapse/handlers/presence.py | 16 ++++++++++++---- synapse/handlers/stats.py | 12 +++++++----- synapse/handlers/user_directory.py | 17 ++++++++++++----- synapse/storage/state_deltas.py | 38 +++++++++++++++++++++++++++++--------- tests/handlers/test_typing.py | 2 +- tests/rest/admin/test_admin.py | 2 +- 7 files changed, 63 insertions(+), 25 deletions(-) create mode 100644 changelog.d/6187.bugfix (limited to 'tests/handlers') diff --git a/changelog.d/6187.bugfix b/changelog.d/6187.bugfix new file mode 100644 index 0000000000..6142c5b98d --- /dev/null +++ b/changelog.d/6187.bugfix @@ -0,0 +1 @@ +Fix occasional missed updates in the room and user directories. \ No newline at end of file diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 053cf66b28..2a5f1a007d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -803,17 +803,25 @@ class PresenceHandler(object): # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "presence_delta"): - deltas = yield self.store.get_current_state_deltas(self._event_pos) - if not deltas: + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self._event_pos == room_max_stream_ordering: return + logger.debug( + "Processing presence stats %s->%s", + self._event_pos, + room_max_stream_ordering, + ) + max_pos, deltas = yield self.store.get_current_state_deltas( + self._event_pos, room_max_stream_ordering + ) yield self._handle_state_delta(deltas) - self._event_pos = deltas[-1]["stream_id"] + self._event_pos = max_pos # Expose current event processing position to prometheus synapse.metrics.event_processing_positions.labels("presence").set( - self._event_pos + max_pos ) @defer.inlineCallbacks diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index c62b113115..466daf9202 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -87,21 +87,23 @@ class StatsHandler(StateDeltasHandler): # Be sure to read the max stream_ordering *before* checking if there are any outstanding # deltas, since there is otherwise a chance that we could miss updates which arrive # after we check the deltas. - room_max_stream_ordering = yield self.store.get_room_max_stream_ordering() + room_max_stream_ordering = self.store.get_room_max_stream_ordering() if self.pos == room_max_stream_ordering: break - deltas = yield self.store.get_current_state_deltas(self.pos) + logger.debug( + "Processing room stats %s->%s", self.pos, room_max_stream_ordering + ) + max_pos, deltas = yield self.store.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) if deltas: logger.debug("Handling %d state deltas", len(deltas)) room_deltas, user_deltas = yield self._handle_deltas(deltas) - - max_pos = deltas[-1]["stream_id"] else: room_deltas = {} user_deltas = {} - max_pos = room_max_stream_ordering # Then count deltas for total_events and total_event_bytes. room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes( diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e53669e40d..624f05ab5b 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -138,21 +138,28 @@ class UserDirectoryHandler(StateDeltasHandler): # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) - if not deltas: + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self.pos == room_max_stream_ordering: return + logger.debug( + "Processing user stats %s->%s", self.pos, room_max_stream_ordering + ) + max_pos, deltas = yield self.store.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) + logger.info("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) - self.pos = deltas[-1]["stream_id"] + self.pos = max_pos # Expose current event processing position to prometheus synapse.metrics.event_processing_positions.labels("user_dir").set( - self.pos + max_pos ) - yield self.store.update_user_directory_stream_pos(self.pos) + yield self.store.update_user_directory_stream_pos(max_pos) @defer.inlineCallbacks def _handle_deltas(self, deltas): diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py index 5fdb442104..28f33ec18f 100644 --- a/synapse/storage/state_deltas.py +++ b/synapse/storage/state_deltas.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) class StateDeltasStore(SQLBaseStore): - def get_current_state_deltas(self, prev_stream_id): + def get_current_state_deltas(self, prev_stream_id: int, max_stream_id: int): """Fetch a list of room state changes since the given stream id Each entry in the result contains the following fields: @@ -36,15 +36,27 @@ class StateDeltasStore(SQLBaseStore): Args: prev_stream_id (int): point to get changes since (exclusive) + max_stream_id (int): the point that we know has been correctly persisted + - ie, an upper limit to return changes from. Returns: - Deferred[list[dict]]: results + Deferred[tuple[int, list[dict]]: A tuple consisting of: + - the stream id which these results go up to + - list of current_state_delta_stream rows. If it is empty, we are + up to date. """ prev_stream_id = int(prev_stream_id) + + # check we're not going backwards + assert prev_stream_id <= max_stream_id + if not self._curr_state_delta_stream_cache.has_any_entity_changed( prev_stream_id ): - return [] + # if the CSDs haven't changed between prev_stream_id and now, we + # know for certain that they haven't changed between prev_stream_id and + # max_stream_id. + return max_stream_id, [] def get_current_state_deltas_txn(txn): # First we calculate the max stream id that will give us less than @@ -54,21 +66,29 @@ class StateDeltasStore(SQLBaseStore): sql = """ SELECT stream_id, count(*) FROM current_state_delta_stream - WHERE stream_id > ? + WHERE stream_id > ? AND stream_id <= ? GROUP BY stream_id ORDER BY stream_id ASC LIMIT 100 """ - txn.execute(sql, (prev_stream_id,)) + txn.execute(sql, (prev_stream_id, max_stream_id)) total = 0 - max_stream_id = prev_stream_id - for max_stream_id, count in txn: + + for stream_id, count in txn: total += count if total > 100: # We arbitarily limit to 100 entries to ensure we don't # select toooo many. + logger.debug( + "Clipping current_state_delta_stream rows to stream_id %i", + stream_id, + ) + clipped_stream_id = stream_id break + else: + # if there's no problem, we may as well go right up to the max_stream_id + clipped_stream_id = max_stream_id # Now actually get the deltas sql = """ @@ -77,8 +97,8 @@ class StateDeltasStore(SQLBaseStore): WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC """ - txn.execute(sql, (prev_stream_id, max_stream_id)) - return self.cursor_to_dict(txn) + txn.execute(sql, (prev_stream_id, clipped_stream_id)) + return clipped_stream_id, self.cursor_to_dict(txn) return self.runInteraction( "get_current_state_deltas", get_current_state_deltas_txn diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 1f2ef5d01f..67f1013051 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -139,7 +139,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): defer.succeed(1) ) - self.datastore.get_current_state_deltas.return_value = None + self.datastore.get_current_state_deltas.return_value = (0, None) self.datastore.get_to_device_stream_token = lambda: 0 self.datastore.get_new_device_msgs_for_remote = lambda *args, **kargs: ([], 0) diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py index 5877bb2133..d3a4f717f7 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py @@ -62,7 +62,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.device_handler.check_device_registered = Mock(return_value="FAKE") self.datastore = Mock(return_value=Mock()) - self.datastore.get_current_state_deltas = Mock(return_value=[]) + self.datastore.get_current_state_deltas = Mock(return_value=(0, [])) self.secrets = Mock() -- cgit 1.5.1