diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2024-03-22 13:24:11 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-22 13:24:11 +0000 |
commit | b5322b4daf2e13310200e57eb427568cb6a92ddf (patch) | |
tree | 8a8aed36f136df07d0fea011581bc4cadba821af /synapse/util | |
parent | Add OIDC config to add extra parameters to the authorize URL (#16971) (diff) | |
download | synapse-b5322b4daf2e13310200e57eb427568cb6a92ddf.tar.xz |
Ensure that pending to-device events are sent over federation at startup (#16925)
Fixes https://github.com/element-hq/synapse/issues/16680, as well as a related bug, where servers which we had *never* successfully sent an event to would not be retried. In order to fix the case of pending to-device messages, we hook into the existing `wake_destinations_needing_catchup` process, by extending it to look for destinations that have pending to-device messages. The federation transmission loop then attempts to send the pending to-device messages as normal.
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 44 |
1 files changed, 41 insertions, 3 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 9e374354ec..e0d876e84b 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -117,7 +117,11 @@ class Clock: return int(self.time() * 1000) def looping_call( - self, f: Callable[P, object], msec: float, *args: P.args, **kwargs: P.kwargs + self, + f: Callable[P, object], + msec: float, + *args: P.args, + **kwargs: P.kwargs, ) -> LoopingCall: """Call a function repeatedly. @@ -134,12 +138,46 @@ class Clock: Args: f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. - *args: Postional arguments to pass to function. + *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ + return self._looping_call_common(f, msec, False, *args, **kwargs) + + def looping_call_now( + self, + f: Callable[P, object], + msec: float, + *args: P.args, + **kwargs: P.kwargs, + ) -> LoopingCall: + """Call a function immediately, and then repeatedly thereafter. + + As with `looping_call`: subsequent calls are not scheduled until after the + the Awaitable returned by a previous call has finished. + + Also as with `looping_call`: the function is called with no logcontext and + you probably want to wrap it in `run_as_background_process`. + + Args: + f: The function to call repeatedly. + msec: How long to wait between calls in milliseconds. + *args: Positional arguments to pass to function. + **kwargs: Key arguments to pass to function. + """ + return self._looping_call_common(f, msec, True, *args, **kwargs) + + def _looping_call_common( + self, + f: Callable[P, object], + msec: float, + now: bool, + *args: P.args, + **kwargs: P.kwargs, + ) -> LoopingCall: + """Common functionality for `looping_call` and `looping_call_now`""" call = task.LoopingCall(f, *args, **kwargs) call.clock = self._reactor - d = call.start(msec / 1000.0, now=False) + d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) return call |