diff --git a/changelog.d/7864.bugfix b/changelog.d/7864.bugfix
new file mode 100644
index 0000000000..8623355fe9
--- /dev/null
+++ b/changelog.d/7864.bugfix
@@ -0,0 +1 @@
+Fix a memory leak by limiting the length of time that messages will be queued for a remote server that has been unreachable.
diff --git a/changelog.d/8072.misc b/changelog.d/8072.misc
new file mode 100644
index 0000000000..e26764dea1
--- /dev/null
+++ b/changelog.d/8072.misc
@@ -0,0 +1 @@
+ Convert various parts of the codebase to async/await.
diff --git a/changelog.d/8081.bugfix b/changelog.d/8081.bugfix
new file mode 100644
index 0000000000..9ebcbf5b84
--- /dev/null
+++ b/changelog.d/8081.bugfix
@@ -0,0 +1 @@
+Fix `Re-starting finished log context PUT-nnnn` warning when event persistence failed.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index dd150f89a6..8cbc23d901 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -337,6 +337,28 @@ class PerDestinationQueue(object):
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
+
+ if e.retry_interval > 60 * 60 * 1000:
+ # we won't retry for another hour!
+ # (this suggests a significant outage)
+ # We drop pending PDUs and EDUs because otherwise they will
+ # rack up indefinitely.
+ # Note that:
+ # - the EDUs that are being dropped here are those that we can
+ # afford to drop (specifically, only typing notifications,
+ # read receipts and presence updates are being dropped here)
+ # - Other EDUs such as to_device messages are queued with a
+ # different mechanism
+ # - this is all volatile state that would be lost if the
+ # federation sender restarted anyway
+
+ # dropping read receipts is a bit sad but should be solved
+ # through another mechanism, because this is all volatile!
+ self._pending_pdus = []
+ self._pending_edus = []
+ self._pending_edus_keyed = {}
+ self._pending_presence = {}
+ self._pending_rrs = {}
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2643438e84..48b0fc7279 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -891,9 +891,7 @@ class EventCreationHandler(object):
except Exception:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
- run_in_background(
- self.store.remove_push_actions_from_staging, event.event_id
- )
+ await self.store.remove_push_actions_from_staging(event.event_id)
raise
async def _validate_canonical_alias(
diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py
index b8ca118716..9bd515080c 100644
--- a/tests/federation/test_complexity.py
+++ b/tests/federation/test_complexity.py
@@ -79,9 +79,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
fed_transport = self.hs.get_federation_transport_client()
# Mock out some things, because we don't want to test the whole join
- fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+ fed_transport.client.get_json = Mock(
+ side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+ )
handler.federation_handler.do_invite_join = Mock(
- return_value=make_awaitable(("", 1))
+ side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
)
d = handler._remote_join(
@@ -110,9 +112,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
fed_transport = self.hs.get_federation_transport_client()
# Mock out some things, because we don't want to test the whole join
- fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+ fed_transport.client.get_json = Mock(
+ side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+ )
handler.federation_handler.do_invite_join = Mock(
- return_value=make_awaitable(("", 1))
+ side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
)
d = handler._remote_join(
@@ -148,9 +152,11 @@ class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
fed_transport = self.hs.get_federation_transport_client()
# Mock out some things, because we don't want to test the whole join
- fed_transport.client.get_json = Mock(return_value=make_awaitable(None))
+ fed_transport.client.get_json = Mock(
+ side_effect=lambda *args, **kwargs: make_awaitable(None)
+ )
handler.federation_handler.do_invite_join = Mock(
- return_value=make_awaitable(("", 1))
+ side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
)
# Artificially raise the complexity
@@ -204,9 +210,11 @@ class RoomComplexityAdminTests(unittest.FederatingHomeserverTestCase):
fed_transport = self.hs.get_federation_transport_client()
# Mock out some things, because we don't want to test the whole join
- fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+ fed_transport.client.get_json = Mock(
+ side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+ )
handler.federation_handler.do_invite_join = Mock(
- return_value=make_awaitable(("", 1))
+ side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
)
d = handler._remote_join(
@@ -234,9 +242,11 @@ class RoomComplexityAdminTests(unittest.FederatingHomeserverTestCase):
fed_transport = self.hs.get_federation_transport_client()
# Mock out some things, because we don't want to test the whole join
- fed_transport.client.get_json = Mock(return_value=make_awaitable({"v1": 9999}))
+ fed_transport.client.get_json = Mock(
+ side_effect=lambda *args, **kwargs: make_awaitable({"v1": 9999})
+ )
handler.federation_handler.do_invite_join = Mock(
- return_value=make_awaitable(("", 1))
+ side_effect=lambda *args, **kwargs: make_awaitable(("", 1))
)
d = handler._remote_join(
|