diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d751c9772b..2906b93f6a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -22,11 +22,10 @@ from typing import Dict, Iterable
from typing_extensions import ContextManager
from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
import synapse
import synapse.events
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.api.urls import (
CLIENT_API_PREFIX,
FEDERATION_PREFIX,
@@ -40,14 +39,22 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
-from synapse.handlers.presence import BasePresenceHandler, get_interested_parties
-from synapse.http.server import JsonResource
+from synapse.handlers.presence import (
+ BasePresenceHandler,
+ PresenceState,
+ get_interested_parties,
+)
+from synapse.http.server import JsonResource, OptionsResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
+from synapse.replication.http.presence import (
+ ReplicationBumpPresenceActiveTime,
+ ReplicationPresenceSetState,
+)
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -202,9 +209,14 @@ class KeyUploadServlet(RestServlet):
# is there.
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
headers = {"Authorization": auth_headers}
- result = await self.http_client.post_json_get_json(
- self.main_uri + request.uri.decode("ascii"), body, headers=headers
- )
+ try:
+ result = await self.http_client.post_json_get_json(
+ self.main_uri + request.uri.decode("ascii"), body, headers=headers
+ )
+ except HttpResponseException as e:
+ raise e.to_synapse() from e
+ except RequestSendFailed as e:
+ raise SynapseError(502, "Failed to talk to master") from e
return 200, result
else:
@@ -243,6 +255,9 @@ class GenericWorkerPresence(BasePresenceHandler):
# but we haven't notified the master of that yet
self.users_going_offline = {}
+ self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
+ self._set_state_client = ReplicationPresenceSetState.make_client(hs)
+
self._send_stop_syncing_loop = self.clock.looping_call(
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)
@@ -300,10 +315,6 @@ class GenericWorkerPresence(BasePresenceHandler):
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)
- def set_state(self, user, state, ignore_status_msg=False):
- # TODO Hows this supposed to work?
- return defer.succeed(None)
-
async def user_syncing(
self, user_id: str, affect_presence: bool
) -> ContextManager[None]:
@@ -382,6 +393,42 @@ class GenericWorkerPresence(BasePresenceHandler):
if count > 0
]
+ async def set_state(self, target_user, state, ignore_status_msg=False):
+ """Set the presence state of the user.
+ """
+ presence = state["presence"]
+
+ valid_presence = (
+ PresenceState.ONLINE,
+ PresenceState.UNAVAILABLE,
+ PresenceState.OFFLINE,
+ )
+ if presence not in valid_presence:
+ raise SynapseError(400, "Invalid presence state")
+
+ user_id = target_user.to_string()
+
+ # If presence is disabled, no-op
+ if not self.hs.config.use_presence:
+ return
+
+ # Proxy request to master
+ await self._set_state_client(
+ user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
+ )
+
+ async def bump_presence_active_time(self, user):
+ """We've seen the user do something that indicates they're interacting
+ with the app.
+ """
+ # If presence is disabled, no-op
+ if not self.hs.config.use_presence:
+ return
+
+ # Proxy request to master
+ user_id = user.to_string()
+ await self._bump_active_client(user_id=user_id)
+
class GenericWorkerTyping(object):
def __init__(self, hs):
@@ -561,7 +608,7 @@ class GenericWorkerServer(HomeServer):
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
- root_resource = create_resource_tree(resources, NoResource())
+ root_resource = create_resource_tree(resources, OptionsResource())
_base.listen_tcp(
bind_addresses,
|