summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2024-03-22 13:24:11 +0000
committerGitHub <noreply@github.com>2024-03-22 13:24:11 +0000
commitb5322b4daf2e13310200e57eb427568cb6a92ddf (patch)
tree8a8aed36f136df07d0fea011581bc4cadba821af /synapse/util
parentAdd OIDC config to add extra parameters to the authorize URL (#16971) (diff)
downloadsynapse-b5322b4daf2e13310200e57eb427568cb6a92ddf.tar.xz
Ensure that pending to-device events are sent over federation at startup (#16925)
Fixes https://github.com/element-hq/synapse/issues/16680, as well as a
related bug, where servers which we had *never* successfully sent an
event to would not be retried.

In order to fix the case of pending to-device messages, we hook into the
existing `wake_destinations_needing_catchup` process, by extending it to
look for destinations that have pending to-device messages. The
federation transmission loop then attempts to send the pending to-device
messages as normal.
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py44
1 files changed, 41 insertions, 3 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 9e374354ec..e0d876e84b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -117,7 +117,11 @@ class Clock:
         return int(self.time() * 1000)
 
     def looping_call(
-        self, f: Callable[P, object], msec: float, *args: P.args, **kwargs: P.kwargs
+        self,
+        f: Callable[P, object],
+        msec: float,
+        *args: P.args,
+        **kwargs: P.kwargs,
     ) -> LoopingCall:
         """Call a function repeatedly.
 
@@ -134,12 +138,46 @@ class Clock:
         Args:
             f: The function to call repeatedly.
             msec: How long to wait between calls in milliseconds.
-            *args: Postional arguments to pass to function.
+            *args: Positional arguments to pass to function.
             **kwargs: Key arguments to pass to function.
         """
+        return self._looping_call_common(f, msec, False, *args, **kwargs)
+
+    def looping_call_now(
+        self,
+        f: Callable[P, object],
+        msec: float,
+        *args: P.args,
+        **kwargs: P.kwargs,
+    ) -> LoopingCall:
+        """Call a function immediately, and then repeatedly thereafter.
+
+        As with `looping_call`: subsequent calls are not scheduled until after the
+        the Awaitable returned by a previous call has finished.
+
+        Also as with `looping_call`: the function is called with no logcontext and
+        you probably want to wrap it in `run_as_background_process`.
+
+        Args:
+            f: The function to call repeatedly.
+            msec: How long to wait between calls in milliseconds.
+            *args: Positional arguments to pass to function.
+            **kwargs: Key arguments to pass to function.
+        """
+        return self._looping_call_common(f, msec, True, *args, **kwargs)
+
+    def _looping_call_common(
+        self,
+        f: Callable[P, object],
+        msec: float,
+        now: bool,
+        *args: P.args,
+        **kwargs: P.kwargs,
+    ) -> LoopingCall:
+        """Common functionality for `looping_call` and `looping_call_now`"""
         call = task.LoopingCall(f, *args, **kwargs)
         call.clock = self._reactor
-        d = call.start(msec / 1000.0, now=False)
+        d = call.start(msec / 1000.0, now=now)
         d.addErrback(log_failure, "Looping call died", consumeErrors=False)
         return call