From 76addadd7c807a3412e6a104db0fdc9b79888688 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jul 2021 10:18:25 +0100 Subject: Add some metrics to staging area (#10284) --- synapse/storage/databases/main/event_federation.py | 39 ++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f2d27ee893..08d75b0d41 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,6 +16,8 @@ import logging from queue import Empty, PriorityQueue from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple +from prometheus_client import Gauge + from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.api.room_versions import RoomVersion @@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter +oldest_pdu_in_federation_staging = Gauge( + "synapse_federation_server_oldest_inbound_pdu_in_staging", + "The age in seconds since we received the oldest pdu in the federation staging area", +) + +number_pdus_in_federation_queue = Gauge( + "synapse_federation_server_number_inbound_pdu_in_staging", + "The total number of events in the inbound federation staging", +) + logger = logging.getLogger(__name__) @@ -54,6 +66,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas 500000, "_event_auth_cache", size_callback=len ) # type: LruCache[str, List[Tuple[str, int]]] + self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) + async def get_auth_chain( self, room_id: str, event_ids: Collection[str], include_given: bool = False ) -> List[EventBase]: @@ -1193,6 +1207,31 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return origin, event + @wrap_as_background_process("_get_stats_for_federation_staging") + async def _get_stats_for_federation_staging(self): + """Update the prometheus metrics for the inbound federation staging area.""" + + def _get_stats_for_federation_staging_txn(txn): + txn.execute( + "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging" + ) + (count,) = txn.fetchone() + + txn.execute( + "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging" + ) + + (age,) = txn.fetchone() + + return count, age + + count, age = await self.db_pool.runInteraction( + "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn + ) + + number_pdus_in_federation_queue.set(count) + oldest_pdu_in_federation_staging.set(age) + class EventFederationStore(EventFederationWorkerStore): """Responsible for storing and serving up the various graphs associated -- cgit 1.5.1 From c65067d67307de7688fa39246426370421e56452 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jul 2021 13:02:37 +0100 Subject: Handle old staged inbound events (#10303) We might have events in the staging area if the service was restarted while there were unhandled events in the staging area. Fixes #10295 --- changelog.d/10303.bugfix | 1 + synapse/federation/federation_server.py | 67 ++++++++++++++++++---- synapse/storage/databases/main/event_federation.py | 9 +++ 3 files changed, 67 insertions(+), 10 deletions(-) create mode 100644 changelog.d/10303.bugfix (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/10303.bugfix b/changelog.d/10303.bugfix new file mode 100644 index 0000000000..c0577c9f73 --- /dev/null +++ b/changelog.d/10303.bugfix @@ -0,0 +1 @@ +Ensure that inbound events from federation that were being processed when Synapse was restarted get promptly processed on start up. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b312d0b809..bf67d0f574 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -148,6 +148,41 @@ class FederationServer(FederationBase): self._room_prejoin_state_types = hs.config.api.room_prejoin_state + # Whether we have started handling old events in the staging area. + self._started_handling_of_staged_events = False + + @wrap_as_background_process("_handle_old_staged_events") + async def _handle_old_staged_events(self) -> None: + """Handle old staged events by fetching all rooms that have staged + events and start the processing of each of those rooms. + """ + + # Get all the rooms IDs with staged events. + room_ids = await self.store.get_all_rooms_with_staged_incoming_events() + + # We then shuffle them so that if there are multiple instances doing + # this work they're less likely to collide. + random.shuffle(room_ids) + + for room_id in room_ids: + room_version = await self.store.get_room_version(room_id) + + # Try and acquire the processing lock for the room, if we get it start a + # background process for handling the events in the room. + lock = await self.store.try_acquire_lock( + _INBOUND_EVENT_HANDLING_LOCK_NAME, room_id + ) + if lock: + logger.info("Handling old staged inbound events in %s", room_id) + self._process_incoming_pdus_in_room_inner( + room_id, + room_version, + lock, + ) + + # We pause a bit so that we don't start handling all rooms at once. + await self._clock.sleep(random.uniform(0, 0.1)) + async def on_backfill_request( self, origin: str, room_id: str, versions: List[str], limit: int ) -> Tuple[int, Dict[str, Any]]: @@ -166,6 +201,12 @@ class FederationServer(FederationBase): async def on_incoming_transaction( self, origin: str, transaction_data: JsonDict ) -> Tuple[int, Dict[str, Any]]: + # If we receive a transaction we should make sure that kick off handling + # any old events in the staging area. + if not self._started_handling_of_staged_events: + self._started_handling_of_staged_events = True + self._handle_old_staged_events() + # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() @@ -882,25 +923,28 @@ class FederationServer(FederationBase): room_id: str, room_version: RoomVersion, lock: Lock, - latest_origin: str, - latest_event: EventBase, + latest_origin: Optional[str] = None, + latest_event: Optional[EventBase] = None, ) -> None: """Process events in the staging area for the given room. The latest_origin and latest_event args are the latest origin and event - received. + received (or None to simply pull the next event from the database). """ # The common path is for the event we just received be the only event in # the room, so instead of pulling the event out of the DB and parsing # the event we just pull out the next event ID and check if that matches. - next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room( - room_id - ) - if next_origin == latest_origin and next_event_id == latest_event.event_id: - origin = latest_origin - event = latest_event - else: + if latest_event is not None and latest_origin is not None: + ( + next_origin, + next_event_id, + ) = await self.store.get_next_staged_event_id_for_room(room_id) + if next_origin != latest_origin or next_event_id != latest_event.event_id: + latest_origin = None + latest_event = None + + if latest_origin is None or latest_event is None: next = await self.store.get_next_staged_event_for_room( room_id, room_version ) @@ -908,6 +952,9 @@ class FederationServer(FederationBase): return origin, event = next + else: + origin = latest_origin + event = latest_event # We loop round until there are no more events in the room in the # staging area, or we fail to get the lock (which means another process diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 08d75b0d41..c4474df975 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1207,6 +1207,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return origin, event + async def get_all_rooms_with_staged_incoming_events(self) -> List[str]: + """Get the room IDs of all events currently staged.""" + return await self.db_pool.simple_select_onecol( + table="federation_inbound_events_staging", + keyvalues={}, + retcol="DISTINCT room_id", + desc="get_all_rooms_with_staged_incoming_events", + ) + @wrap_as_background_process("_get_stats_for_federation_staging") async def _get_stats_for_federation_staging(self): """Update the prometheus metrics for the inbound federation staging area.""" -- cgit 1.5.1 From bcb0962a7250d6c1430ad42f5ed234ffea8f2468 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 6 Jul 2021 14:08:53 +0200 Subject: Fix deactivate a user if he does not have a profile (#10252) --- changelog.d/10252.bugfix | 1 + synapse/storage/databases/main/profile.py | 8 +-- tests/rest/admin/test_user.py | 86 ++++++++++++++++++++++++------- 3 files changed, 73 insertions(+), 22 deletions(-) create mode 100644 changelog.d/10252.bugfix (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/10252.bugfix b/changelog.d/10252.bugfix new file mode 100644 index 0000000000..c8ddd14528 --- /dev/null +++ b/changelog.d/10252.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.26.0 where only users who have set profile information could be deactivated with erasure enabled. diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 9b4e95e134..ba7075caa5 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -73,20 +73,20 @@ class ProfileWorkerStore(SQLBaseStore): async def set_profile_displayname( self, user_localpart: str, new_displayname: Optional[str] ) -> None: - await self.db_pool.simple_update_one( + await self.db_pool.simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + values={"displayname": new_displayname}, desc="set_profile_displayname", ) async def set_profile_avatar_url( self, user_localpart: str, new_avatar_url: Optional[str] ) -> None: - await self.db_pool.simple_update_one( + await self.db_pool.simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, + values={"avatar_url": new_avatar_url}, desc="set_profile_avatar_url", ) diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index a34d051734..4fccce34fd 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -939,7 +939,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): """ channel = self.make_request("POST", self.url, b"{}") - self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(401, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) def test_requester_is_not_admin(self): @@ -950,7 +950,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): channel = self.make_request("POST", url, access_token=self.other_user_token) - self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual("You are not a server admin", channel.json_body["error"]) channel = self.make_request( @@ -960,7 +960,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): content=b"{}", ) - self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual("You are not a server admin", channel.json_body["error"]) def test_user_does_not_exist(self): @@ -990,7 +990,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"]) def test_user_is_not_local(self): @@ -1006,7 +1006,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): def test_deactivate_user_erase_true(self): """ - Test deactivating an user and set `erase` to `true` + Test deactivating a user and set `erase` to `true` """ # Get user @@ -1016,24 +1016,22 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(False, channel.json_body["deactivated"]) self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"]) self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"]) self.assertEqual("User1", channel.json_body["displayname"]) - # Deactivate user - body = json.dumps({"erase": True}) - + # Deactivate and erase user channel = self.make_request( "POST", self.url, access_token=self.admin_user_tok, - content=body.encode(encoding="utf_8"), + content={"erase": True}, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) # Get user channel = self.make_request( @@ -1042,7 +1040,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(True, channel.json_body["deactivated"]) self.assertEqual(0, len(channel.json_body["threepids"])) @@ -1053,7 +1051,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): def test_deactivate_user_erase_false(self): """ - Test deactivating an user and set `erase` to `false` + Test deactivating a user and set `erase` to `false` """ # Get user @@ -1063,7 +1061,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(False, channel.json_body["deactivated"]) self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"]) @@ -1071,13 +1069,11 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual("User1", channel.json_body["displayname"]) # Deactivate user - body = json.dumps({"erase": False}) - channel = self.make_request( "POST", self.url, access_token=self.admin_user_tok, - content=body.encode(encoding="utf_8"), + content={"erase": False}, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1089,7 +1085,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(True, channel.json_body["deactivated"]) self.assertEqual(0, len(channel.json_body["threepids"])) @@ -1098,6 +1094,60 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self._is_erased("@user:test", False) + def test_deactivate_user_erase_true_no_profile(self): + """ + Test deactivating a user and set `erase` to `true` + if user has no profile information (stored in the database table `profiles`). + """ + + # Users normally have an entry in `profiles`, but occasionally they are created without one. + # To test deactivation for users without a profile, we delete the profile information for our user. + self.get_success( + self.store.db_pool.simple_delete_one( + table="profiles", keyvalues={"user_id": "user"} + ) + ) + + # Get user + channel = self.make_request( + "GET", + self.url_other_user, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(False, channel.json_body["deactivated"]) + self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"]) + self.assertIsNone(channel.json_body["avatar_url"]) + self.assertIsNone(channel.json_body["displayname"]) + + # Deactivate and erase user + channel = self.make_request( + "POST", + self.url, + access_token=self.admin_user_tok, + content={"erase": True}, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + + # Get user + channel = self.make_request( + "GET", + self.url_other_user, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(True, channel.json_body["deactivated"]) + self.assertEqual(0, len(channel.json_body["threepids"])) + self.assertIsNone(channel.json_body["avatar_url"]) + self.assertIsNone(channel.json_body["displayname"]) + + self._is_erased("@user:test", True) + def _is_erased(self, user_id: str, expect: bool) -> None: """Assert that the user is erased or not""" d = self.store.is_user_erased(user_id) -- cgit 1.5.1