summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py71
1 files changed, 59 insertions, 12 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py

index d751c9772b..2906b93f6a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -22,11 +22,10 @@ from typing import Dict, Iterable from typing_extensions import ContextManager from twisted.internet import defer, reactor -from twisted.web.resource import NoResource import synapse import synapse.events -from synapse.api.errors import SynapseError +from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.api.urls import ( CLIENT_API_PREFIX, FEDERATION_PREFIX, @@ -40,14 +39,22 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer -from synapse.handlers.presence import BasePresenceHandler, get_interested_parties -from synapse.http.server import JsonResource +from synapse.handlers.presence import ( + BasePresenceHandler, + PresenceState, + get_interested_parties, +) +from synapse.http.server import JsonResource, OptionsResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource +from synapse.replication.http.presence import ( + ReplicationBumpPresenceActiveTime, + ReplicationPresenceSetState, +) from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -202,9 +209,14 @@ class KeyUploadServlet(RestServlet): # is there. auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) headers = {"Authorization": auth_headers} - result = await self.http_client.post_json_get_json( - self.main_uri + request.uri.decode("ascii"), body, headers=headers - ) + try: + result = await self.http_client.post_json_get_json( + self.main_uri + request.uri.decode("ascii"), body, headers=headers + ) + except HttpResponseException as e: + raise e.to_synapse() from e + except RequestSendFailed as e: + raise SynapseError(502, "Failed to talk to master") from e return 200, result else: @@ -243,6 +255,9 @@ class GenericWorkerPresence(BasePresenceHandler): # but we haven't notified the master of that yet self.users_going_offline = {} + self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) + self._set_state_client = ReplicationPresenceSetState.make_client(hs) + self._send_stop_syncing_loop = self.clock.looping_call( self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) @@ -300,10 +315,6 @@ class GenericWorkerPresence(BasePresenceHandler): self.users_going_offline.pop(user_id, None) self.send_user_sync(user_id, False, last_sync_ms) - def set_state(self, user, state, ignore_status_msg=False): - # TODO Hows this supposed to work? - return defer.succeed(None) - async def user_syncing( self, user_id: str, affect_presence: bool ) -> ContextManager[None]: @@ -382,6 +393,42 @@ class GenericWorkerPresence(BasePresenceHandler): if count > 0 ] + async def set_state(self, target_user, state, ignore_status_msg=False): + """Set the presence state of the user. + """ + presence = state["presence"] + + valid_presence = ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + ) + if presence not in valid_presence: + raise SynapseError(400, "Invalid presence state") + + user_id = target_user.to_string() + + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + await self._set_state_client( + user_id=user_id, state=state, ignore_status_msg=ignore_status_msg + ) + + async def bump_presence_active_time(self, user): + """We've seen the user do something that indicates they're interacting + with the app. + """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + + # Proxy request to master + user_id = user.to_string() + await self._bump_active_client(user_id=user_id) + class GenericWorkerTyping(object): def __init__(self, hs): @@ -561,7 +608,7 @@ class GenericWorkerServer(HomeServer): if name == "replication": resources[REPLICATION_PREFIX] = ReplicationRestResource(self) - root_resource = create_resource_tree(resources, NoResource()) + root_resource = create_resource_tree(resources, OptionsResource()) _base.listen_tcp( bind_addresses,