From f14428b25c37e44675edac4a80d7bd1e47112586 Mon Sep 17 00:00:00 2001 From: David Teller Date: Fri, 11 Dec 2020 20:05:15 +0100 Subject: Allow spam-checker modules to be provide async methods. (#8890) Spam checker modules can now provide async methods. This is implemented in a backwards-compatible manner. --- synapse/handlers/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers/message.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 96843338ae..2b8aa9443d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -744,7 +744,7 @@ class EventCreationHandler: event.sender, ) - spam_error = self.spam_checker.check_event_for_spam(event) + spam_error = await self.spam_checker.check_event_for_spam(event) if spam_error: if not isinstance(spam_error, str): spam_error = "Spam is not permitted here" -- cgit 1.5.1 From 70586aa63eaf129505324976fb092cb3ad327590 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Dec 2020 09:49:18 +0000 Subject: Try and drop stale extremities. (#8929) If we see stale extremities while persisting events, and notice that they don't change the result of state resolution, we drop them. --- changelog.d/8929.misc | 1 + synapse/api/constants.py | 2 + synapse/handlers/message.py | 2 +- synapse/storage/persist_events.py | 200 +++++++++++++++++++++-- synapse/visibility.py | 2 +- tests/storage/test_events.py | 334 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 523 insertions(+), 18 deletions(-) create mode 100644 changelog.d/8929.misc create mode 100644 tests/storage/test_events.py (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/8929.misc b/changelog.d/8929.misc new file mode 100644 index 0000000000..157018b6a6 --- /dev/null +++ b/changelog.d/8929.misc @@ -0,0 +1 @@ +Automatically drop stale forward-extremities under some specific conditions. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 1932df83b4..565a8cd76a 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -95,6 +95,8 @@ class EventTypes: Presence = "m.presence" + Dummy = "org.matrix.dummy_event" + class RejectedReason: AUTH_ERROR = "auth_error" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2b8aa9443d..9dfeab09cd 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1261,7 +1261,7 @@ class EventCreationHandler: event, context = await self.create_event( requester, { - "type": "org.matrix.dummy_event", + "type": EventTypes.Dummy, "content": {}, "room_id": room_id, "sender": user_id, diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 70e636b0ba..61fc49c69c 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -31,7 +31,14 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases import Databases from synapse.storage.databases.main.events import DeltaState -from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap +from synapse.storage.databases.main.events_worker import EventRedactBehaviour +from synapse.types import ( + Collection, + PersistedEventPosition, + RoomStreamToken, + StateMap, + get_domain_from_id, +) from synapse.util.async_helpers import ObservableDeferred from synapse.util.metrics import Measure @@ -68,6 +75,21 @@ stale_forward_extremities_counter = Histogram( buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), ) +state_resolutions_during_persistence = Counter( + "synapse_storage_events_state_resolutions_during_persistence", + "Number of times we had to do state res to calculate new current state", +) + +potential_times_prune_extremities = Counter( + "synapse_storage_events_potential_times_prune_extremities", + "Number of times we might be able to prune extremities", +) + +times_pruned_extremities = Counter( + "synapse_storage_events_times_pruned_extremities", + "Number of times we were actually be able to prune extremities", +) + class _EventPeristenceQueue: """Queues up events so that they can be persisted in bulk with only one @@ -454,7 +476,15 @@ class EventsPersistenceStorage: latest_event_ids, new_latest_event_ids, ) - current_state, delta_ids = res + current_state, delta_ids, new_latest_event_ids = res + + # there should always be at least one forward extremity. + # (except during the initial persistence of the send_join + # results, in which case there will be no existing + # extremities, so we'll `continue` above and skip this bit.) + assert new_latest_event_ids, "No forward extremities left!" + + new_forward_extremeties[room_id] = new_latest_event_ids # If either are not None then there has been a change, # and we need to work out the delta (or use that @@ -573,29 +603,35 @@ class EventsPersistenceStorage: self, room_id: str, events_context: List[Tuple[EventBase, EventContext]], - old_latest_event_ids: Iterable[str], - new_latest_event_ids: Iterable[str], - ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]]]: + old_latest_event_ids: Set[str], + new_latest_event_ids: Set[str], + ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]], Set[str]]: """Calculate the current state dict after adding some new events to a room Args: - room_id (str): + room_id: room to which the events are being added. Used for logging etc - events_context (list[(EventBase, EventContext)]): + events_context: events and contexts which are being added to the room - old_latest_event_ids (iterable[str]): + old_latest_event_ids: the old forward extremities for the room. - new_latest_event_ids (iterable[str]): + new_latest_event_ids : the new forward extremities for the room. Returns: - Returns a tuple of two state maps, the first being the full new current - state and the second being the delta to the existing current state. - If both are None then there has been no change. + Returns a tuple of two state maps and a set of new forward + extremities. + + The first state map is the full new current state and the second + is the delta to the existing current state. If both are None then + there has been no change. + + The function may prune some old entries from the set of new + forward extremities if it's safe to do so. If there has been a change then we only return the delta if its already been calculated. Conversely if we do know the delta then @@ -672,7 +708,7 @@ class EventsPersistenceStorage: # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return None, None + return None, None, new_latest_event_ids if len(new_state_groups) == 1 and len(old_state_groups) == 1: # If we're going from one state group to another, lets check if @@ -689,7 +725,7 @@ class EventsPersistenceStorage: # the current state in memory then lets also return that, # but it doesn't matter if we don't. new_state = state_groups_map.get(new_state_group) - return new_state, delta_ids + return new_state, delta_ids, new_latest_event_ids # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -701,7 +737,7 @@ class EventsPersistenceStorage: if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - return state_groups_map[new_state_groups.pop()], None + return state_groups_map[new_state_groups.pop()], None, new_latest_event_ids # Ok, we need to defer to the state handler to resolve our state sets. @@ -734,7 +770,139 @@ class EventsPersistenceStorage: state_res_store=StateResolutionStore(self.main_store), ) - return res.state, None + state_resolutions_during_persistence.inc() + + # If the returned state matches the state group of one of the new + # forward extremities then we check if we are able to prune some state + # extremities. + if res.state_group and res.state_group in new_state_groups: + new_latest_event_ids = await self._prune_extremities( + room_id, + new_latest_event_ids, + res.state_group, + event_id_to_state_group, + events_context, + ) + + return res.state, None, new_latest_event_ids + + async def _prune_extremities( + self, + room_id: str, + new_latest_event_ids: Set[str], + resolved_state_group: int, + event_id_to_state_group: Dict[str, int], + events_context: List[Tuple[EventBase, EventContext]], + ) -> Set[str]: + """See if we can prune any of the extremities after calculating the + resolved state. + """ + potential_times_prune_extremities.inc() + + # We keep all the extremities that have the same state group, and + # see if we can drop the others. + new_new_extrems = { + e + for e in new_latest_event_ids + if event_id_to_state_group[e] == resolved_state_group + } + + dropped_extrems = set(new_latest_event_ids) - new_new_extrems + + logger.debug("Might drop extremities: %s", dropped_extrems) + + # We only drop events from the extremities list if: + # 1. we're not currently persisting them; + # 2. they're not our own events (or are dummy events); and + # 3. they're either: + # 1. over N hours old and more than N events ago (we use depth to + # calculate); or + # 2. we are persisting an event from the same domain and more than + # M events ago. + # + # The idea is that we don't want to drop events that are "legitimate" + # extremities (that we would want to include as prev events), only + # "stuck" extremities that are e.g. due to a gap in the graph. + # + # Note that we either drop all of them or none of them. If we only drop + # some of the events we don't know if state res would come to the same + # conclusion. + + for ev, _ in events_context: + if ev.event_id in dropped_extrems: + logger.debug( + "Not dropping extremities: %s is being persisted", ev.event_id + ) + return new_latest_event_ids + + dropped_events = await self.main_store.get_events( + dropped_extrems, + allow_rejected=True, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + + new_senders = {get_domain_from_id(e.sender) for e, _ in events_context} + + one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000 + current_depth = max(e.depth for e, _ in events_context) + for event in dropped_events.values(): + # If the event is a local dummy event then we should check it + # doesn't reference any local events, as we want to reference those + # if we send any new events. + # + # Note we do this recursively to handle the case where a dummy event + # references a dummy event that only references remote events. + # + # Ideally we'd figure out a way of still being able to drop old + # dummy events that reference local events, but this is good enough + # as a first cut. + events_to_check = [event] + while events_to_check: + new_events = set() + for event_to_check in events_to_check: + if self.is_mine_id(event_to_check.sender): + if event_to_check.type != EventTypes.Dummy: + logger.debug("Not dropping own event") + return new_latest_event_ids + new_events.update(event_to_check.prev_event_ids()) + + prev_events = await self.main_store.get_events( + new_events, + allow_rejected=True, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + events_to_check = prev_events.values() + + if ( + event.origin_server_ts < one_day_ago + and event.depth < current_depth - 100 + ): + continue + + # We can be less conservative about dropping extremities from the + # same domain, though we do want to wait a little bit (otherwise + # we'll immediately remove all extremities from a given server). + if ( + get_domain_from_id(event.sender) in new_senders + and event.depth < current_depth - 20 + ): + continue + + logger.debug( + "Not dropping as too new and not in new_senders: %s", new_senders, + ) + + return new_latest_event_ids + + times_pruned_extremities.inc() + + logger.info( + "Pruning forward extremities in room %s: from %s -> %s", + room_id, + new_latest_event_ids, + new_new_extrems, + ) + return new_new_extrems async def _calculate_state_delta( self, room_id: str, current_state: StateMap[str] diff --git a/synapse/visibility.py b/synapse/visibility.py index f2836ba9f0..ec50e7e977 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -125,7 +125,7 @@ async def filter_events_for_client( # see events in the room at that point in the DAG, and that shouldn't be decided # on those checks. if filter_send_to_client: - if event.type == "org.matrix.dummy_event": + if event.type == EventTypes.Dummy: return None if not event.is_state() and event.sender in ignore_list: diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py new file mode 100644 index 0000000000..71210ce606 --- /dev/null +++ b/tests/storage/test_events.py @@ -0,0 +1,334 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from synapse.api.constants import EventTypes, Membership +from synapse.api.room_versions import RoomVersions +from synapse.federation.federation_base import event_from_pdu_json +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests.unittest import HomeserverTestCase + + +class ExtremPruneTestCase(HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, homeserver): + self.state = self.hs.get_state_handler() + self.persistence = self.hs.get_storage().persistence + self.store = self.hs.get_datastore() + + self.register_user("user", "pass") + self.token = self.login("user", "pass") + + self.room_id = self.helper.create_room_as( + "user", room_version=RoomVersions.V6.identifier, tok=self.token + ) + + body = self.helper.send(self.room_id, body="Test", tok=self.token) + local_message_event_id = body["event_id"] + + # Fudge a remote event and persist it. This will be the extremity before + # the gap. + self.remote_event_1 = event_from_pdu_json( + { + "type": EventTypes.Message, + "state_key": "@user:other", + "content": {}, + "room_id": self.room_id, + "sender": "@user:other", + "depth": 5, + "prev_events": [local_message_event_id], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + self.persist_event(self.remote_event_1) + + # Check that the current extremities is the remote event. + self.assert_extremities([self.remote_event_1.event_id]) + + def persist_event(self, event, state=None): + """Persist the event, with optional state + """ + context = self.get_success( + self.state.compute_event_context(event, old_state=state) + ) + self.get_success(self.persistence.persist_event(event, context)) + + def assert_extremities(self, expected_extremities): + """Assert the current extremities for the room + """ + extremities = self.get_success( + self.store.get_prev_events_for_room(self.room_id) + ) + self.assertCountEqual(extremities, expected_extremities) + + def test_prune_gap(self): + """Test that we drop extremities after a gap when we see an event from + the same domain. + """ + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other", + "depth": 50, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id]) + + def test_do_not_prune_gap_if_state_different(self): + """Test that we don't prune extremities after a gap if the resolved + state is different. + """ + + # Fudge a second event which points to an event we don't have. + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Message, + "state_key": "@user:other", + "content": {}, + "room_id": self.room_id, + "sender": "@user:other", + "depth": 10, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + # Now we persist it with state with a dropped history visibility + # setting. The state resolution across the old and new event will then + # include it, and so the resolved state won't match the new state. + state_before_gap = dict( + self.get_success(self.state.get_current_state(self.room_id)) + ) + state_before_gap.pop(("m.room.history_visibility", "")) + + context = self.get_success( + self.state.compute_event_context( + remote_event_2, old_state=state_before_gap.values() + ) + ) + + self.get_success(self.persistence.persist_event(remote_event_2, context)) + + # Check that we haven't dropped the old extremity. + self.assert_extremities([self.remote_event_1.event_id, remote_event_2.event_id]) + + def test_prune_gap_if_old(self): + """Test that we drop extremities after a gap when the previous extremity + is "old" + """ + + # Advance the clock for many days to make the old extremity "old". We + # also set the depth to "lots". + self.reactor.advance(7 * 24 * 60 * 60) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id]) + + def test_do_not_prune_gap_if_other_server(self): + """Test that we do not drop extremities after a gap when we see an event + from a different domain. + """ + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([self.remote_event_1.event_id, remote_event_2.event_id]) + + def test_prune_gap_if_dummy_remote(self): + """Test that we drop extremities after a gap when the previous extremity + is a local dummy event and only points to remote events. + """ + + body = self.helper.send_event( + self.room_id, type=EventTypes.Dummy, content={}, tok=self.token + ) + local_message_event_id = body["event_id"] + self.assert_extremities([local_message_event_id]) + + # Advance the clock for many days to make the old extremity "old". We + # also set the depth to "lots". + self.reactor.advance(7 * 24 * 60 * 60) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id]) + + def test_prune_gap_if_dummy_local(self): + """Test that we don't drop extremities after a gap when the previous + extremity is a local dummy event and points to local events. + """ + + body = self.helper.send(self.room_id, body="Test", tok=self.token) + + body = self.helper.send_event( + self.room_id, type=EventTypes.Dummy, content={}, tok=self.token + ) + local_message_event_id = body["event_id"] + self.assert_extremities([local_message_event_id]) + + # Advance the clock for many days to make the old extremity "old". We + # also set the depth to "lots". + self.reactor.advance(7 * 24 * 60 * 60) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([remote_event_2.event_id, local_message_event_id]) + + def test_do_not_prune_gap_if_not_dummy(self): + """Test that we do not drop extremities after a gap when the previous extremity + is not a dummy event. + """ + + body = self.helper.send(self.room_id, body="test", tok=self.token) + local_message_event_id = body["event_id"] + self.assert_extremities([local_message_event_id]) + + # Fudge a second event which points to an event we don't have. This is a + # state event so that the state changes (otherwise we won't prune the + # extremity as they'll have the same state group). + remote_event_2 = event_from_pdu_json( + { + "type": EventTypes.Member, + "state_key": "@user:other2", + "content": {"membership": Membership.JOIN}, + "room_id": self.room_id, + "sender": "@user:other2", + "depth": 10000, + "prev_events": ["$some_unknown_message"], + "auth_events": [], + "origin_server_ts": self.clock.time_msec(), + }, + RoomVersions.V6, + ) + + state_before_gap = self.get_success(self.state.get_current_state(self.room_id)) + + self.persist_event(remote_event_2, state=state_before_gap.values()) + + # Check the new extremity is just the new remote event. + self.assert_extremities([local_message_event_id, remote_event_2.event_id]) -- cgit 1.5.1 From dd8da8c5f6ac525a7456437913a03f68d4504605 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Jan 2021 13:57:31 +0000 Subject: Precompute joined hosts and store in Redis (#9198) --- changelog.d/9198.misc | 1 + stubs/txredisapi.pyi | 12 +++- synapse/config/_base.pyi | 2 + synapse/federation/sender/__init__.py | 50 +++++++++----- synapse/handlers/federation.py | 5 ++ synapse/handlers/message.py | 42 ++++++++++++ synapse/replication/tcp/external_cache.py | 105 ++++++++++++++++++++++++++++++ synapse/replication/tcp/handler.py | 15 +---- synapse/server.py | 30 +++++++++ synapse/state/__init__.py | 11 +++- tests/replication/_base.py | 41 +++++++----- 11 files changed, 265 insertions(+), 49 deletions(-) create mode 100644 changelog.d/9198.misc create mode 100644 synapse/replication/tcp/external_cache.py (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/9198.misc b/changelog.d/9198.misc new file mode 100644 index 0000000000..a6cb77fbb2 --- /dev/null +++ b/changelog.d/9198.misc @@ -0,0 +1 @@ +Precompute joined hosts and store in Redis. diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index bdc892ec82..618548a305 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -15,11 +15,21 @@ """Contains *incomplete* type hints for txredisapi. """ -from typing import List, Optional, Type, Union +from typing import Any, List, Optional, Type, Union class RedisProtocol: def publish(self, channel: str, message: bytes): ... async def ping(self) -> None: ... + async def set( + self, + key: str, + value: Any, + expire: Optional[int] = None, + pexpire: Optional[int] = None, + only_if_not_exists: bool = False, + only_if_exists: bool = False, + ) -> None: ... + async def get(self, key: str) -> Any: ... class SubscriberProtocol(RedisProtocol): def __init__(self, *args, **kwargs): ... diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index 29aa064e57..8ba669059a 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -18,6 +18,7 @@ from synapse.config import ( password_auth_providers, push, ratelimiting, + redis, registration, repository, room_directory, @@ -79,6 +80,7 @@ class RootConfig: roomdirectory: room_directory.RoomDirectoryConfig thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig tracer: tracer.TracerConfig + redis: redis.RedisConfig config_classes: List = ... def __init__(self) -> None: ... diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 604cfd1935..643b26ae6d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -142,6 +142,8 @@ class FederationSender: self._wake_destinations_needing_catchup, ) + self._external_cache = hs.get_external_cache() + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination @@ -197,22 +199,40 @@ class FederationSender: if not event.internal_metadata.should_proactively_send(): return - try: - # Get the state from before the event. - # We need to make sure that this is the state from before - # the event and not from after it. - # Otherwise if the last member on a server in a room is - # banned then it won't receive the event because it won't - # be in the room after the ban. - destinations = await self.state.get_hosts_in_room_at_events( - event.room_id, event_ids=event.prev_event_ids() - ) - except Exception: - logger.exception( - "Failed to calculate hosts in room for event: %s", - event.event_id, + destinations = None # type: Optional[Set[str]] + if not event.prev_event_ids(): + # If there are no prev event IDs then the state is empty + # and so no remote servers in the room + destinations = set() + else: + # We check the external cache for the destinations, which is + # stored per state group. + + sg = await self._external_cache.get( + "event_to_prev_state_group", event.event_id ) - return + if sg: + destinations = await self._external_cache.get( + "get_joined_hosts", str(sg) + ) + + if destinations is None: + try: + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. + destinations = await self.state.get_hosts_in_room_at_events( + event.room_id, event_ids=event.prev_event_ids() + ) + except Exception: + logger.exception( + "Failed to calculate hosts in room for event: %s", + event.event_id, + ) + return destinations = { d diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fd8de8696d..b6dc7f99b6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2093,6 +2093,11 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.GuestAccess and not context.rejected: await self.maybe_kick_guest_users(event) + # If we are going to send this event over federation we precaclculate + # the joined hosts. + if event.internal_metadata.get_send_on_behalf_of(): + await self.event_creation_handler.cache_joined_hosts_for_event(event) + return context async def _check_for_soft_fail( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9dfeab09cd..e2a7d567fa 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -432,6 +432,8 @@ class EventCreationHandler: self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + self._external_cache = hs.get_external_cache() + async def create_event( self, requester: Requester, @@ -939,6 +941,8 @@ class EventCreationHandler: await self.action_generator.handle_push_actions_for_event(event, context) + await self.cache_joined_hosts_for_event(event) + try: # If we're a worker we need to hit out to the master. writer_instance = self._events_shard_config.get_instance(event.room_id) @@ -978,6 +982,44 @@ class EventCreationHandler: await self.store.remove_push_actions_from_staging(event.event_id) raise + async def cache_joined_hosts_for_event(self, event: EventBase) -> None: + """Precalculate the joined hosts at the event, when using Redis, so that + external federation senders don't have to recalculate it themselves. + """ + + if not self._external_cache.is_enabled(): + return + + # We actually store two mappings, event ID -> prev state group, + # state group -> joined hosts, which is much more space efficient + # than event ID -> joined hosts. + # + # Note: We have to cache event ID -> prev state group, as we don't + # store that in the DB. + # + # Note: We always set the state group -> joined hosts cache, even if + # we already set it, so that the expiry time is reset. + + state_entry = await self.state.resolve_state_groups_for_events( + event.room_id, event_ids=event.prev_event_ids() + ) + + if state_entry.state_group: + joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry) + + await self._external_cache.set( + "event_to_prev_state_group", + event.event_id, + state_entry.state_group, + expiry_ms=60 * 60 * 1000, + ) + await self._external_cache.set( + "get_joined_hosts", + str(state_entry.state_group), + list(joined_hosts), + expiry_ms=60 * 60 * 1000, + ) + async def _validate_canonical_alias( self, directory_handler, room_alias_str: str, expected_room_id: str ) -> None: diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py new file mode 100644 index 0000000000..34fa3ff5b3 --- /dev/null +++ b/synapse/replication/tcp/external_cache.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Any, Optional + +from prometheus_client import Counter + +from synapse.logging.context import make_deferred_yieldable +from synapse.util import json_decoder, json_encoder + +if TYPE_CHECKING: + from synapse.server import HomeServer + +set_counter = Counter( + "synapse_external_cache_set", + "Number of times we set a cache", + labelnames=["cache_name"], +) + +get_counter = Counter( + "synapse_external_cache_get", + "Number of times we get a cache", + labelnames=["cache_name", "hit"], +) + + +logger = logging.getLogger(__name__) + + +class ExternalCache: + """A cache backed by an external Redis. Does nothing if no Redis is + configured. + """ + + def __init__(self, hs: "HomeServer"): + self._redis_connection = hs.get_outbound_redis_connection() + + def _get_redis_key(self, cache_name: str, key: str) -> str: + return "cache_v1:%s:%s" % (cache_name, key) + + def is_enabled(self) -> bool: + """Whether the external cache is used or not. + + It's safe to use the cache when this returns false, the methods will + just no-op, but the function is useful to avoid doing unnecessary work. + """ + return self._redis_connection is not None + + async def set(self, cache_name: str, key: str, value: Any, expiry_ms: int) -> None: + """Add the key/value to the named cache, with the expiry time given. + """ + + if self._redis_connection is None: + return + + set_counter.labels(cache_name).inc() + + # txredisapi requires the value to be string, bytes or numbers, so we + # encode stuff in JSON. + encoded_value = json_encoder.encode(value) + + logger.debug("Caching %s %s: %r", cache_name, key, encoded_value) + + return await make_deferred_yieldable( + self._redis_connection.set( + self._get_redis_key(cache_name, key), encoded_value, pexpire=expiry_ms, + ) + ) + + async def get(self, cache_name: str, key: str) -> Optional[Any]: + """Look up a key/value in the named cache. + """ + + if self._redis_connection is None: + return None + + result = await make_deferred_yieldable( + self._redis_connection.get(self._get_redis_key(cache_name, key)) + ) + + logger.debug("Got cache result %s %s: %r", cache_name, key, result) + + get_counter.labels(cache_name, result is not None).inc() + + if not result: + return None + + # For some reason the integers get magically converted back to integers + if isinstance(result, int): + return result + + return json_decoder.decode(result) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 58d46a5951..8ea8dcd587 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -286,13 +286,6 @@ class ReplicationCommandHandler: if hs.config.redis.redis_enabled: from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, - lazyConnection, - ) - - logger.info( - "Connecting to redis (host=%r port=%r)", - hs.config.redis_host, - hs.config.redis_port, ) # First let's ensure that we have a ReplicationStreamer started. @@ -303,13 +296,7 @@ class ReplicationCommandHandler: # connection after SUBSCRIBE is called). # First create the connection for sending commands. - outbound_redis_connection = lazyConnection( - hs=hs, - host=hs.config.redis_host, - port=hs.config.redis_port, - password=hs.config.redis.redis_password, - reconnect=True, - ) + outbound_redis_connection = hs.get_outbound_redis_connection() # Now create the factory/connection for the subscription stream. self._factory = RedisDirectTcpReplicationClientFactory( diff --git a/synapse/server.py b/synapse/server.py index 9cdda83aa1..9bdd3177d7 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -103,6 +103,7 @@ from synapse.notifier import Notifier from synapse.push.action_generator import ActionGenerator from synapse.push.pusherpool import PusherPool from synapse.replication.tcp.client import ReplicationDataHandler +from synapse.replication.tcp.external_cache import ExternalCache from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.resource import ReplicationStreamer from synapse.replication.tcp.streams import STREAMS_MAP, Stream @@ -128,6 +129,8 @@ from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) if TYPE_CHECKING: + from txredisapi import RedisProtocol + from synapse.handlers.oidc_handler import OidcHandler from synapse.handlers.saml_handler import SamlHandler @@ -716,6 +719,33 @@ class HomeServer(metaclass=abc.ABCMeta): def get_account_data_handler(self) -> AccountDataHandler: return AccountDataHandler(self) + @cache_in_self + def get_external_cache(self) -> ExternalCache: + return ExternalCache(self) + + @cache_in_self + def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]: + if not self.config.redis.redis_enabled: + return None + + # We only want to import redis module if we're using it, as we have + # `txredisapi` as an optional dependency. + from synapse.replication.tcp.redis import lazyConnection + + logger.info( + "Connecting to redis (host=%r port=%r) for external cache", + self.config.redis_host, + self.config.redis_port, + ) + + return lazyConnection( + hs=self, + host=self.config.redis_host, + port=self.config.redis_port, + password=self.config.redis.redis_password, + reconnect=True, + ) + async def remove_pusher(self, app_id: str, push_key: str, user_id: str): return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 84f59c7d85..3bd9ff8ca0 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -310,6 +310,7 @@ class StateHandler: state_group_before_event = None state_group_before_event_prev_group = None deltas_to_state_group_before_event = None + entry = None else: # otherwise, we'll need to resolve the state across the prev_events. @@ -340,9 +341,13 @@ class StateHandler: current_state_ids=state_ids_before_event, ) - # XXX: can we update the state cache entry for the new state group? or - # could we set a flag on resolve_state_groups_for_events to tell it to - # always make a state group? + # Assign the new state group to the cached state entry. + # + # Note that this can race in that we could generate multiple state + # groups for the same state entry, but that is just inefficient + # rather than dangerous. + if entry and entry.state_group is None: + entry.state_group = state_group_before_event # # now if it's not a state event, we're done diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 3379189785..d5dce1f83f 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -212,6 +212,9 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): # Fake in memory Redis server that servers can connect to. self._redis_server = FakeRedisPubSubServer() + # We may have an attempt to connect to redis for the external cache already. + self.connect_any_redis_attempts() + store = self.hs.get_datastore() self.database_pool = store.db_pool @@ -401,25 +404,23 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): fake one. """ clients = self.reactor.tcpClients - self.assertEqual(len(clients), 1) - (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) - self.assertEqual(host, "localhost") - self.assertEqual(port, 6379) + while clients: + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "localhost") + self.assertEqual(port, 6379) - client_protocol = client_factory.buildProtocol(None) - server_protocol = self._redis_server.buildProtocol(None) + client_protocol = client_factory.buildProtocol(None) + server_protocol = self._redis_server.buildProtocol(None) - client_to_server_transport = FakeTransport( - server_protocol, self.reactor, client_protocol - ) - client_protocol.makeConnection(client_to_server_transport) - - server_to_client_transport = FakeTransport( - client_protocol, self.reactor, server_protocol - ) - server_protocol.makeConnection(server_to_client_transport) + client_to_server_transport = FakeTransport( + server_protocol, self.reactor, client_protocol + ) + client_protocol.makeConnection(client_to_server_transport) - return client_to_server_transport, server_to_client_transport + server_to_client_transport = FakeTransport( + client_protocol, self.reactor, server_protocol + ) + server_protocol.makeConnection(server_to_client_transport) class TestReplicationDataHandler(GenericWorkerReplicationHandler): @@ -624,6 +625,12 @@ class FakeRedisPubSubProtocol(Protocol): (channel,) = args self._server.add_subscriber(self) self.send(["subscribe", channel, 1]) + + # Since we use SET/GET to cache things we can safely no-op them. + elif command == b"SET": + self.send("OK") + elif command == b"GET": + self.send(None) else: raise Exception("Unknown command") @@ -645,6 +652,8 @@ class FakeRedisPubSubProtocol(Protocol): # We assume bytes are just unicode strings. obj = obj.decode("utf-8") + if obj is None: + return "$-1\r\n" if isinstance(obj, str): return "${len}\r\n{str}\r\n".format(len=len(obj), str=obj) if isinstance(obj, int): -- cgit 1.5.1 From b60bb28bbc3d916586a913970298baba483efc1f Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Tue, 2 Feb 2021 04:16:29 -0700 Subject: Add an admin API to get the current room state (#9168) This could arguably replace the existing admin API for `/members`, however that is out of scope of this change. This sort of endpoint is ideal for moderation use cases as well as other applications, such as needing to retrieve various bits of information about a room to perform a task (like syncing power levels between two places). This endpoint exposes nothing more than an admin would be able to access with a `select *` query on their database. --- changelog.d/9168.feature | 1 + docs/admin_api/rooms.md | 30 ++++++++++++++++++++++++++++++ synapse/handlers/message.py | 2 +- synapse/rest/admin/__init__.py | 2 ++ synapse/rest/admin/rooms.py | 39 +++++++++++++++++++++++++++++++++++++++ tests/rest/admin/test_room.py | 15 +++++++++++++++ 6 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9168.feature (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/9168.feature b/changelog.d/9168.feature new file mode 100644 index 0000000000..8be1950eee --- /dev/null +++ b/changelog.d/9168.feature @@ -0,0 +1 @@ +Add an admin API for retrieving the current room state of a room. \ No newline at end of file diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index f34cec1ff7..3832b36407 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -368,6 +368,36 @@ Response: } ``` +# Room State API + +The Room State admin API allows server admins to get a list of all state events in a room. + +The response includes the following fields: + +* `state` - The current state of the room at the time of request. + +## Usage + +A standard request: + +``` +GET /_synapse/admin/v1/rooms//state + +{} +``` + +Response: + +```json +{ + "state": [ + {"type": "m.room.create", "state_key": "", "etc": true}, + {"type": "m.room.power_levels", "state_key": "", "etc": true}, + {"type": "m.room.name", "state_key": "", "etc": true} + ] +} +``` + # Delete Room API The Delete Room admin API allows server admins to remove rooms from server diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e2a7d567fa..a15336bf00 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -174,7 +174,7 @@ class MessageHandler: raise NotFoundError("Can't find event for token %s" % (at_token,)) visible_events = await filter_events_for_client( - self.storage, user_id, last_events, filter_send_to_client=False + self.storage, user_id, last_events, filter_send_to_client=False, ) event = last_events[0] diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 57e0a10837..f5c5d164f9 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -44,6 +44,7 @@ from synapse.rest.admin.rooms import ( MakeRoomAdminRestServlet, RoomMembersRestServlet, RoomRestServlet, + RoomStateRestServlet, ShutdownRoomRestServlet, ) from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet @@ -213,6 +214,7 @@ def register_servlets(hs, http_server): """ register_servlets_for_client_rest_resource(hs, http_server) ListRoomRestServlet(hs).register(http_server) + RoomStateRestServlet(hs).register(http_server) RoomRestServlet(hs).register(http_server) RoomMembersRestServlet(hs).register(http_server) DeleteRoomRestServlet(hs).register(http_server) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index f14915d47e..3e57e6a4d0 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -292,6 +292,45 @@ class RoomMembersRestServlet(RestServlet): return 200, ret +class RoomStateRestServlet(RestServlet): + """ + Get full state within a room. + """ + + PATTERNS = admin_patterns("/rooms/(?P[^/]+)/state") + + def __init__(self, hs: "HomeServer"): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self._event_serializer = hs.get_event_client_serializer() + + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + ret = await self.store.get_room(room_id) + if not ret: + raise NotFoundError("Room not found") + + event_ids = await self.store.get_current_state_ids(room_id) + events = await self.store.get_events(event_ids.values()) + now = self.clock.time_msec() + room_state = await self._event_serializer.serialize_events( + events.values(), + now, + # We don't bother bundling aggregations in when asked for state + # events, as clients won't use them. + bundle_aggregations=False, + ) + ret = {"state": room_state} + + return 200, ret + + class JoinRoomAliasServlet(RestServlet): PATTERNS = admin_patterns("/join/(?P[^/]*)") diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index a0f32c5512..7c47aa7e0a 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1180,6 +1180,21 @@ class RoomTestCase(unittest.HomeserverTestCase): ) self.assertEqual(channel.json_body["total"], 3) + def test_room_state(self): + """Test that room state can be requested correctly""" + # Create two test rooms + room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok) + + url = "/_synapse/admin/v1/rooms/%s/state" % (room_id,) + channel = self.make_request( + "GET", url.encode("ascii"), access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertIn("state", channel.json_body) + # testing that the state events match is painful and not done here. We assume that + # the create_room already does the right thing, so no need to verify that we got + # the state events it created. + class JoinAliasRoomTestCase(unittest.HomeserverTestCase): -- cgit 1.5.1