diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 274d582d07..d1c2079233 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -147,7 +147,6 @@ from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.generic_worker")
@@ -282,6 +281,7 @@ class GenericWorkerPresence(BasePresenceHandler):
self.hs = hs
self.is_mine_id = hs.is_mine_id
+ self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
# The number of ongoing syncs on this process, by user id.
@@ -302,6 +302,8 @@ class GenericWorkerPresence(BasePresenceHandler):
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)
+ self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
+
hs.get_reactor().addSystemEventTrigger(
"before",
"shutdown",
@@ -394,7 +396,7 @@ class GenericWorkerPresence(BasePresenceHandler):
return _user_syncing()
async def notify_from_replication(self, states, stream_id):
- parties = await get_interested_parties(self.store, states)
+ parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
@@ -439,8 +441,12 @@ class GenericWorkerPresence(BasePresenceHandler):
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
+ PresenceState.BUSY,
)
- if presence not in valid_presence:
+
+ if presence not in valid_presence or (
+ presence == PresenceState.BUSY and not self._busy_presence_enabled
+ ):
raise SynapseError(400, "Invalid presence state")
user_id = target_user.to_string()
@@ -634,12 +640,8 @@ class GenericWorkerServer(HomeServer):
if listener.type == "http":
self._listen_http(listener)
elif listener.type == "manhole":
- _base.listen_tcp(
- listener.bind_addresses,
- listener.port,
- manhole(
- username="matrix", password="rabbithole", globals={"hs": self}
- ),
+ _base.listen_manhole(
+ listener.bind_addresses, listener.port, manhole_globals={"hs": self}
)
elif listener.type == "metrics":
if not self.get_config().enable_metrics:
@@ -786,13 +788,6 @@ class FederationSenderHandler:
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
- def on_start(self):
- # There may be some events that are persisted but haven't been sent,
- # so send them now.
- self.federation_sender.notify_new_events(
- self.store.get_room_max_stream_ordering()
- )
-
def wake_destination(self, server: str):
self.federation_sender.wake_destination(server)
|