diff --git a/synapse/notifier.py b/synapse/notifier.py
index 5f5f765bea..6132727cbd 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -15,11 +15,13 @@
import logging
from collections import namedtuple
+from typing import Callable, List
from prometheus_client import Counter
from twisted.internet import defer
+import synapse.server
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
@@ -154,7 +156,7 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
- def __init__(self, hs):
+ def __init__(self, hs: "synapse.server.HomeServer"):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
@@ -164,7 +166,12 @@ class Notifier(object):
self.store = hs.get_datastore()
self.pending_new_room_events = []
- self.replication_callbacks = []
+ # Called when there are new things to stream over replication
+ self.replication_callbacks = [] # type: List[Callable[[], None]]
+
+ # Called when remote servers have come back online after having been
+ # down.
+ self.remote_server_up_callbacks = [] # type: List[Callable[[str], None]]
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
@@ -205,7 +212,7 @@ class Notifier(object):
"synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
)
- def add_replication_callback(self, cb):
+ def add_replication_callback(self, cb: Callable[[], None]):
"""Add a callback that will be called when some new data is available.
Callback is not given any arguments. It should *not* return a Deferred - if
it needs to do any asynchronous work, a background thread should be started and
@@ -213,6 +220,12 @@ class Notifier(object):
"""
self.replication_callbacks.append(cb)
+ def add_remote_server_up_callback(self, cb: Callable[[str], None]):
+ """Add a callback that will be called when synapse detects a server
+ has been
+ """
+ self.remote_server_up_callbacks.append(cb)
+
def on_new_room_event(
self, event, room_stream_id, max_room_stream_id, extra_users=[]
):
@@ -522,3 +535,15 @@ class Notifier(object):
"""Notify the any replication listeners that there's a new event"""
for cb in self.replication_callbacks:
cb()
+
+ def notify_remote_server_up(self, server: str):
+ """Notify any replication that a remote server has come back up
+ """
+ # We call federation_sender directly rather than registering as a
+ # callback as a) we already have a reference to it and b) it introduces
+ # circular dependencies.
+ if self.federation_sender:
+ self.federation_sender.wake_destination(server)
+
+ for cb in self.remote_server_up_callbacks:
+ cb(server)
|