summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7864.bugfix1
-rw-r--r--changelog.d/8072.misc1
-rw-r--r--changelog.d/8081.bugfix1
-rw-r--r--synapse/federation/sender/per_destination_queue.py22
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--tests/federation/test_complexity.py30
6 files changed, 46 insertions, 13 deletions
diff --git a/changelog.d/7864.bugfix b/changelog.d/7864.bugfix
new file mode 100644
index 0000000000..8623355fe9
--- /dev/null
+++ b/changelog.d/7864.bugfix
@@ -0,0 +1 @@
+Fix a memory leak by limiting the length of time that messages will be queued for a remote server that has been unreachable.
diff --git a/changelog.d/8072.misc b/changelog.d/8072.misc
new file mode 100644
index 0000000000..e26764dea1
--- /dev/null
+++ b/changelog.d/8072.misc
@@ -0,0 +1 @@
+ Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8081.bugfix b/changelog.d/8081.bugfix
new file mode 100644
index 0000000000..9ebcbf5b84
--- /dev/null
+++ b/changelog.d/8081.bugfix
@@ -0,0 +1 @@
+Fix `Re-starting finished log context PUT-nnnn` warning when event persistence failed.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index dd150f89a6..8cbc23d901 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -337,6 +337,28 @@ class PerDestinationQueue(object):
                     (e.retry_last_ts + e.retry_interval) / 1000.0
                 ),
             )
+
+            if e.retry_interval > 60 * 60 * 1000:
+                # we won't retry for another hour!
+                # (this suggests a significant outage)
+                # We drop pending PDUs and EDUs because otherwise they will
+                # rack up indefinitely.
+                # Note that:
+                # - the EDUs that are being dropped here are those that we can
+                #   afford to drop (specifically, only typing notifications,
+                #   read receipts and presence updates are being dropped here)
+                # - Other EDUs such as to_device messages are queued with a
+                #   different mechanism
+                # - this is all volatile state that would be lost if the
+                #   federation sender restarted anyway
+
+                # dropping read receipts is a bit sad but should be solved
+                # through another mechanism, because this is all volatile!
+                self._pending_pdus = []
+                self._pending_edus = []
+                self._pending_edus_keyed = {}
+                self._pending_presence = {}
+                self._pending_rrs = {}
         except FederationDeniedError as e:
             logger.info(e)
         except HttpResponseException as e:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2643438e84..48b0fc7279 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -891,9 +891,7 @@ class EventCreationHandler(object):
         except Exception:
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
-            run_in_background(
-                self.store.remove_push_actions_from_staging, event.event_id
-            )
+            await self.store.remove_push_actions_from_staging(event.event_id)
             raise
 
     async def _validate_canonical_alias(
diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py
index b8ca118716..9bd515080c 100644
--- a/tests/federation/test_complexity.py
+++ b/tests/federation/test_complexity.py
@@ -79,9 +79,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
         fed_transport = self.hs.get_federation_transport_client()
 
         # Mock out some things, because we don't want to test the whole join
-        fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+        fed_transport.client.get_json = Mock(
+            side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+        )
         handler.federation_handler.do_invite_join = Mock(
-            return_value=make_awaitable(("", 1))
+            side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
         )
 
         d = handler._remote_join(
@@ -110,9 +112,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
         fed_transport = self.hs.get_federation_transport_client()
 
         # Mock out some things, because we don't want to test the whole join
-        fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+        fed_transport.client.get_json = Mock(
+            side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+        )
         handler.federation_handler.do_invite_join = Mock(
-            return_value=make_awaitable(("", 1))
+            side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
         )
 
         d = handler._remote_join(
@@ -148,9 +152,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
         fed_transport = self.hs.get_federation_transport_client()
 
         # Mock out some things, because we don't want to test the whole join
-        fed_transport.client.get_json = Mock(return_value=make_awaitable(None))
+        fed_transport.client.get_json = Mock(
+            side_effect=lambda *args, **kwargs: make_awaitable(None)
+        )
         handler.federation_handler.do_invite_join = Mock(
-            return_value=make_awaitable(("", 1))
+            side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
         )
 
         # Artificially raise the complexity
@@ -204,9 +210,11 @@ class RoomComplexityAdminTests(unittest.FederatingHomeserverTestCase):
         fed_transport = self.hs.get_federation_transport_client()
 
         # Mock out some things, because we don't want to test the whole join
-        fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+        fed_transport.client.get_json = Mock(
+            side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+        )
         handler.federation_handler.do_invite_join = Mock(
-            return_value=make_awaitable(("", 1))
+            side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
         )
 
         d = handler._remote_join(
@@ -234,9 +242,11 @@ class RoomComplexityAdminTests(unittest.FederatingHomeserverTestCase):
         fed_transport = self.hs.get_federation_transport_client()
 
         # Mock out some things, because we don't want to test the whole join
-        fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+        fed_transport.client.get_json = Mock(
+            side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+        )
         handler.federation_handler.do_invite_join = Mock(
-            return_value=make_awaitable(("", 1))
+            side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
         )
 
         d = handler._remote_join(