diff options
-rw-r--r-- | changelog.d/7864.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/8072.misc | 1 | ||||
-rw-r--r-- | changelog.d/8081.bugfix | 1 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 22 | ||||
-rw-r--r-- | synapse/handlers/message.py | 4 | ||||
-rw-r--r-- | tests/federation/test_complexity.py | 30 |
6 files changed, 46 insertions, 13 deletions
diff --git a/changelog.d/7864.bugfix b/changelog.d/7864.bugfix new file mode 100644 index 0000000000..8623355fe9 --- /dev/null +++ b/changelog.d/7864.bugfix @@ -0,0 +1 @@ +Fix a memory leak by limiting the length of time that messages will be queued for a remote server that has been unreachable. diff --git a/changelog.d/8072.misc b/changelog.d/8072.misc new file mode 100644 index 0000000000..e26764dea1 --- /dev/null +++ b/changelog.d/8072.misc @@ -0,0 +1 @@ + Convert various parts of the codebase to async/await. diff --git a/changelog.d/8081.bugfix b/changelog.d/8081.bugfix new file mode 100644 index 0000000000..9ebcbf5b84 --- /dev/null +++ b/changelog.d/8081.bugfix @@ -0,0 +1 @@ +Fix `Re-starting finished log context PUT-nnnn` warning when event persistence failed. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index dd150f89a6..8cbc23d901 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -337,6 +337,28 @@ class PerDestinationQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + + if e.retry_interval > 60 * 60 * 1000: + # we won't retry for another hour! + # (this suggests a significant outage) + # We drop pending PDUs and EDUs because otherwise they will + # rack up indefinitely. + # Note that: + # - the EDUs that are being dropped here are those that we can + # afford to drop (specifically, only typing notifications, + # read receipts and presence updates are being dropped here) + # - Other EDUs such as to_device messages are queued with a + # different mechanism + # - this is all volatile state that would be lost if the + # federation sender restarted anyway + + # dropping read receipts is a bit sad but should be solved + # through another mechanism, because this is all volatile! + self._pending_pdus = [] + self._pending_edus = [] + self._pending_edus_keyed = {} + self._pending_presence = {} + self._pending_rrs = {} except FederationDeniedError as e: logger.info(e) except HttpResponseException as e: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2643438e84..48b0fc7279 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -891,9 +891,7 @@ class EventCreationHandler(object): except Exception: # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. - run_in_background( - self.store.remove_push_actions_from_staging, event.event_id - ) + await self.store.remove_push_actions_from_staging(event.event_id) raise async def _validate_canonical_alias( diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py index b8ca118716..9bd515080c 100644 --- a/tests/federation/test_complexity.py +++ b/tests/federation/test_complexity.py @@ -79,9 +79,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase): fed_transport = self.hs.get_federation_transport_client() # Mock out some things, because we don't want to test the whole join - fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999})) + fed_transport.client.get_json = Mock( + side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999}) + ) handler.federation_handler.do_invite_join = Mock( - return_value=make_awaitable(("", 1)) + side_effect=lambda *args, **kwargs: make_awaitable(("", 1)) ) d = handler._remote_join( @@ -110,9 +112,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase): fed_transport = self.hs.get_federation_transport_client() # Mock out some things, because we don't want to test the whole join - fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999})) + fed_transport.client.get_json = Mock( + side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999}) + ) handler.federation_handler.do_invite_join = Mock( - return_value=make_awaitable(("", 1)) + side_effect=lambda *args, **kwargs: make_awaitable(("", 1)) ) d = handler._remote_join( @@ -148,9 +152,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase): fed_transport = self.hs.get_federation_transport_client() # Mock out some things, because we don't want to test the whole join - fed_transport.client.get_json = Mock(return_value=make_awaitable(None)) + fed_transport.client.get_json = Mock( + side_effect=lambda *args, **kwargs: make_awaitable(None) + ) handler.federation_handler.do_invite_join = Mock( - return_value=make_awaitable(("", 1)) + side_effect=lambda *args, **kwargs: make_awaitable(("", 1)) ) # Artificially raise the complexity @@ -204,9 +210,11 @@ class RoomComplexityAdminTests(unittest.FederatingHomeserverTestCase): fed_transport = self.hs.get_federation_transport_client() # Mock out some things, because we don't want to test the whole join - fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999})) + fed_transport.client.get_json = Mock( + side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999}) + ) handler.federation_handler.do_invite_join = Mock( - return_value=make_awaitable(("", 1)) + side_effect=lambda *args, **kwargs: make_awaitable(("", 1)) ) d = handler._remote_join( @@ -234,9 +242,11 @@ class RoomComplexityAdminTests(unittest.FederatingHomeserverTestCase): fed_transport = self.hs.get_federation_transport_client() # Mock out some things, because we don't want to test the whole join - fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999})) + fed_transport.client.get_json = Mock( + side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999}) + ) handler.federation_handler.do_invite_join = Mock( - return_value=make_awaitable(("", 1)) + side_effect=lambda *args, **kwargs: make_awaitable(("", 1)) ) d = handler._remote_join( |