From 0e8d78f6aa56020ffb948a4b2c6feadba2d16712 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Aug 2018 00:43:43 +0100 Subject: Logcontexts for replication command handlers Run the handlers for replication commands as background processes. This should improve the visibility in our metrics, and reduce the number of "running db transaction from sentinel context" warnings. Ideally it means converting the things that fire off deferreds into the night into things that actually return a Deferred when they are done. I've made a bit of a stab at this, but it will probably be leaky. --- synapse/app/synchrotron.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e201f18efd..39c7cbc1ba 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -332,8 +332,9 @@ class SyncReplicationHandler(ReplicationClientHandler): self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): -- cgit 1.5.1 From c334ca67bb89039b3a00b7c9a1ce610e99859653 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 18 Aug 2018 01:08:45 +1000 Subject: Integrate presence from hotfixes (#3694) --- changelog.d/3694.feature | 1 + docs/workers.rst | 8 +++ synapse/app/_base.py | 6 ++- synapse/app/frontend_proxy.py | 39 ++++++++++++++- synapse/app/synchrotron.py | 16 ++++-- synapse/config/server.py | 6 +++ synapse/federation/transaction_queue.py | 4 ++ synapse/handlers/initial_sync.py | 4 ++ synapse/handlers/presence.py | 26 +++++++--- synapse/handlers/sync.py | 3 +- synapse/rest/client/v1/presence.py | 3 +- tests/app/__init__.py | 0 tests/app/test_frontend_proxy.py | 88 +++++++++++++++++++++++++++++++++ tests/rest/client/v1/test_presence.py | 72 +++++++++++++++++++++++++++ tests/rest/client/v2_alpha/test_sync.py | 79 +++++++++++++---------------- tests/unittest.py | 8 ++- tests/utils.py | 7 +-- 17 files changed, 303 insertions(+), 67 deletions(-) create mode 100644 changelog.d/3694.feature create mode 100644 tests/app/__init__.py create mode 100644 tests/app/test_frontend_proxy.py create mode 100644 tests/rest/client/v1/test_presence.py (limited to 'synapse/app/synchrotron.py') diff --git a/changelog.d/3694.feature b/changelog.d/3694.feature new file mode 100644 index 0000000000..916a342ff4 --- /dev/null +++ b/changelog.d/3694.feature @@ -0,0 +1 @@ +Synapse's presence functionality can now be disabled with the "use_presence" configuration option. diff --git a/docs/workers.rst b/docs/workers.rst index ac9efb621f..aec319dd84 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -241,6 +241,14 @@ regular expressions:: ^/_matrix/client/(api/v1|r0|unstable)/keys/upload +If ``use_presence`` is False in the homeserver config, it can also handle REST +endpoints matching the following regular expressions:: + + ^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status + +This "stub" presence handler will pass through ``GET`` request but make the +``PUT`` effectively a no-op. + It will proxy any requests it cannot handle to the main synapse instance. It must therefore be configured with the location of the main instance, via the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 391bd14c5c..7c866e246a 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port): logger.info("Metrics now reporting on %s:%d", host, port) -def listen_tcp(bind_addresses, port, factory, backlog=50): +def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): """ Create a TCP socket for a port and several addresses """ @@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50): check_bind_error(e, address, bind_addresses) -def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50): +def listen_ssl( + bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50 +): """ Create an SSL socket for a port and several addresses """ diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 671fbbcb2a..8d484c1cd4 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.frontend_proxy") +class PresenceStatusStubServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/presence/(?P[^/]*)/status") + + def __init__(self, hs): + super(PresenceStatusStubServlet, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + self.auth = hs.get_auth() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) + headers = { + "Authorization": auth_headers, + } + result = yield self.http_client.get_json( + self.main_uri + request.uri, + headers=headers, + ) + defer.returnValue((200, result)) + + @defer.inlineCallbacks + def on_PUT(self, request, user_id): + yield self.auth.get_user_by_req(request) + defer.returnValue((200, {})) + + class KeyUploadServlet(RestServlet): PATTERNS = client_v2_patterns("/keys/upload(/(?P[^/]+))?$") @@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) + + # If presence is disabled, use the stub servlet that does + # not allow sending presence + if not self.config.use_presence: + PresenceStatusStubServlet(self).register(resource) + resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, @@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), + reactor=self.get_reactor() ) logger.info("Synapse client reader now listening on port %d", port) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e201f18efd..cade09d60e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -114,7 +114,10 @@ class SynchrotronPresence(object): logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): - self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + if self.hs.config.use_presence: + self.hs.get_tcp_replication().send_user_sync( + user_id, is_syncing, last_sync_ms + ) def mark_as_coming_online(self, user_id): """A user has started syncing. Send a UserSync to the master, unless they @@ -211,10 +214,13 @@ class SynchrotronPresence(object): yield self.notify_from_replication(states, stream_id) def get_currently_syncing_users(self): - return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) - if count > 0 - ] + if self.hs.config.use_presence: + return [ + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + if count > 0 + ] + else: + return set() class SynchrotronTyping(object): diff --git a/synapse/config/server.py b/synapse/config/server.py index a41c48e69c..68a612e594 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -49,6 +49,9 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + # Whether to enable user presence. + self.use_presence = config.get("use_presence", True) + # Whether to update the user directory or not. This should be set to # false only if we are updating the user directory in a worker self.update_user_directory = config.get("update_user_directory", True) @@ -250,6 +253,9 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # Set to false to disable presence tracking on this homeserver. + use_presence: true + # The GC threshold parameters to pass to `gc.set_threshold`, if defined # gc_thresholds: [700, 10, 10] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f603c8a368..94d7423d01 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -58,6 +58,7 @@ class TransactionQueue(object): """ def __init__(self, hs): + self.hs = hs self.server_name = hs.hostname self.store = hs.get_datastore() @@ -308,6 +309,9 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ + if not self.hs.config.use_presence: + # No-op if presence is disabled. + return # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 1fb17fd9a5..e009395207 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): + # If presence is disabled, return an empty list + if not self.hs.config.use_presence: + defer.returnValue([]) + states = yield presence_handler.get_states( [m.user_id for m in room_members], as_event=True, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 3671d24f60..ba3856674d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -395,6 +395,10 @@ class PresenceHandler(object): """We've seen the user do something that indicates they're interacting with the app. """ + # If presence is disabled, no-op + if not self.hs.config.use_presence: + return + user_id = user.to_string() bump_active_time_counter.inc() @@ -424,6 +428,11 @@ class PresenceHandler(object): Useful for streams that are not associated with an actual client that is being used by a user. """ + # Override if it should affect the user's presence, if presence is + # disabled. + if not self.hs.config.use_presence: + affect_presence = False + if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 @@ -469,13 +478,16 @@ class PresenceHandler(object): Returns: set(str): A set of user_id strings. """ - syncing_user_ids = { - user_id for user_id, count in self.user_to_num_current_syncs.items() - if count - } - for user_ids in self.external_process_to_current_syncs.values(): - syncing_user_ids.update(user_ids) - return syncing_user_ids + if self.hs.config.use_presence: + syncing_user_ids = { + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count + } + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) + return syncing_user_ids + else: + return set() @defer.inlineCallbacks def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ac3edf0cc9..648debc8aa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -185,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ class SyncHandler(object): def __init__(self, hs): + self.hs_config = hs.config self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.presence_handler = hs.get_presence_handler() @@ -860,7 +861,7 @@ class SyncHandler(object): since_token is None and sync_config.filter_collection.blocks_all_presence() ) - if not block_all_presence_data: + if self.hs_config.use_presence and not block_all_presence_data: yield self._generate_sync_entry_for_presence( sync_result_builder, newly_joined_rooms, newly_joined_users ) diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index a14f0c807e..b5a6d6aebf 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -84,7 +84,8 @@ class PresenceStatusRestServlet(ClientV1RestServlet): except Exception: raise SynapseError(400, "Unable to parse state") - yield self.presence_handler.set_state(user, state) + if self.hs.config.use_presence: + yield self.presence_handler.set_state(user, state) defer.returnValue((200, {})) diff --git a/tests/app/__init__.py b/tests/app/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/app/test_frontend_proxy.py b/tests/app/test_frontend_proxy.py new file mode 100644 index 0000000000..76b5090fff --- /dev/null +++ b/tests/app/test_frontend_proxy.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.app.frontend_proxy import FrontendProxyServer + +from tests.unittest import HomeserverTestCase + + +class FrontendProxyTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver( + http_client=None, homeserverToUse=FrontendProxyServer + ) + + return hs + + def test_listen_http_with_presence_enabled(self): + """ + When presence is on, the stub servlet will not register. + """ + # Presence is on + self.hs.config.use_presence = True + + config = { + "port": 8080, + "bind_addresses": ["0.0.0.0"], + "resources": [{"names": ["client"]}], + } + + # Listen with the config + self.hs._listen_http(config) + + # Grab the resource from the site that was told to listen + self.assertEqual(len(self.reactor.tcpServers), 1) + site = self.reactor.tcpServers[0][1] + self.resource = ( + site.resource.children["_matrix"].children["client"].children["r0"] + ) + + request, channel = self.make_request("PUT", "presence/a/status") + self.render(request) + + # 400 + unrecognised, because nothing is registered + self.assertEqual(channel.code, 400) + self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") + + def test_listen_http_with_presence_disabled(self): + """ + When presence is on, the stub servlet will register. + """ + # Presence is off + self.hs.config.use_presence = False + + config = { + "port": 8080, + "bind_addresses": ["0.0.0.0"], + "resources": [{"names": ["client"]}], + } + + # Listen with the config + self.hs._listen_http(config) + + # Grab the resource from the site that was told to listen + self.assertEqual(len(self.reactor.tcpServers), 1) + site = self.reactor.tcpServers[0][1] + self.resource = ( + site.resource.children["_matrix"].children["client"].children["r0"] + ) + + request, channel = self.make_request("PUT", "presence/a/status") + self.render(request) + + # 401, because the stub servlet still checks authentication + self.assertEqual(channel.code, 401) + self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN") diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py new file mode 100644 index 0000000000..66c2b68707 --- /dev/null +++ b/tests/rest/client/v1/test_presence.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from synapse.rest.client.v1 import presence +from synapse.types import UserID + +from tests import unittest + + +class PresenceTestCase(unittest.HomeserverTestCase): + """ Tests presence REST API. """ + + user_id = "@sid:red" + + user = UserID.from_string(user_id) + servlets = [presence.register_servlets] + + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver( + "red", http_client=None, federation_client=Mock() + ) + + hs.presence_handler = Mock() + + return hs + + def test_put_presence(self): + """ + PUT to the status endpoint with use_presence enabled will call + set_state on the presence handler. + """ + self.hs.config.use_presence = True + + body = {"presence": "here", "status_msg": "beep boop"} + request, channel = self.make_request( + "PUT", "/presence/%s/status" % (self.user_id,), body + ) + self.render(request) + + self.assertEqual(channel.code, 200) + self.assertEqual(self.hs.presence_handler.set_state.call_count, 1) + + def test_put_presence_disabled(self): + """ + PUT to the status endpoint with use_presence disbled will NOT call + set_state on the presence handler. + """ + self.hs.config.use_presence = False + + body = {"presence": "here", "status_msg": "beep boop"} + request, channel = self.make_request( + "PUT", "/presence/%s/status" % (self.user_id,), body + ) + self.render(request) + + self.assertEqual(channel.code, 200) + self.assertEqual(self.hs.presence_handler.set_state.call_count, 0) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 9f3d8bd1db..560b1fba96 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -13,71 +13,58 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse.types -from synapse.http.server import JsonResource +from mock import Mock + from synapse.rest.client.v2_alpha import sync -from synapse.types import UserID -from synapse.util import Clock from tests import unittest -from tests.server import ( - ThreadedMemoryReactorClock as MemoryReactorClock, - make_request, - render, - setup_test_homeserver, -) - -PATH_PREFIX = "/_matrix/client/v2_alpha" -class FilterTestCase(unittest.TestCase): +class FilterTestCase(unittest.HomeserverTestCase): - USER_ID = "@apple:test" - TO_REGISTER = [sync] + user_id = "@apple:test" + servlets = [sync.register_servlets] - def setUp(self): - self.clock = MemoryReactorClock() - self.hs_clock = Clock(self.clock) + def make_homeserver(self, reactor, clock): - self.hs = setup_test_homeserver( - self.addCleanup, http_client=None, clock=self.hs_clock, reactor=self.clock + hs = self.setup_test_homeserver( + "red", http_client=None, federation_client=Mock() ) + return hs - self.auth = self.hs.get_auth() - - def get_user_by_access_token(token=None, allow_guest=False): - return { - "user": UserID.from_string(self.USER_ID), - "token_id": 1, - "is_guest": False, - } - - def get_user_by_req(request, allow_guest=False, rights="access"): - return synapse.types.create_requester( - UserID.from_string(self.USER_ID), 1, False, None - ) - - self.auth.get_user_by_access_token = get_user_by_access_token - self.auth.get_user_by_req = get_user_by_req + def test_sync_argless(self): + request, channel = self.make_request("GET", "/sync") + self.render(request) - self.store = self.hs.get_datastore() - self.filtering = self.hs.get_filtering() - self.resource = JsonResource(self.hs) + self.assertEqual(channel.code, 200) + self.assertTrue( + set( + [ + "next_batch", + "rooms", + "presence", + "account_data", + "to_device", + "device_lists", + ] + ).issubset(set(channel.json_body.keys())) + ) - for r in self.TO_REGISTER: - r.register_servlets(self.hs, self.resource) + def test_sync_presence_disabled(self): + """ + When presence is disabled, the key does not appear in /sync. + """ + self.hs.config.use_presence = False - def test_sync_argless(self): - request, channel = make_request("GET", "/_matrix/client/r0/sync") - render(request, self.resource, self.clock) + request, channel = self.make_request("GET", "/sync") + self.render(request) - self.assertEqual(channel.result["code"], b"200") + self.assertEqual(channel.code, 200) self.assertTrue( set( [ "next_batch", "rooms", - "presence", "account_data", "to_device", "device_lists", diff --git a/tests/unittest.py b/tests/unittest.py index e6afe3b96d..d852e2465a 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -18,6 +18,8 @@ import logging from mock import Mock +from canonicaljson import json + import twisted import twisted.logger from twisted.trial import unittest @@ -241,11 +243,15 @@ class HomeserverTestCase(TestCase): method (bytes/unicode): The HTTP request method ("verb"). path (bytes/unicode): The HTTP path, suitably URL encoded (e.g. escaped UTF-8 & spaces and such). - content (bytes): The body of the request. + content (bytes or dict): The body of the request. JSON-encoded, if + a dict. Returns: A synapse.http.site.SynapseRequest. """ + if isinstance(content, dict): + content = json.dumps(content).encode('utf8') + return make_request(method, path, content) def render(self, request): diff --git a/tests/utils.py b/tests/utils.py index 6f8b1de3e7..bb0fc74054 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -93,7 +93,8 @@ def setupdb(): @defer.inlineCallbacks def setup_test_homeserver( - cleanup_func, name="test", datastore=None, config=None, reactor=None, **kargs + cleanup_func, name="test", datastore=None, config=None, reactor=None, + homeserverToUse=HomeServer, **kargs ): """ Setup a homeserver suitable for running tests against. Keyword arguments @@ -192,7 +193,7 @@ def setup_test_homeserver( config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection if datastore is None: - hs = HomeServer( + hs = homeserverToUse( name, config=config, db_config=config.database_config, @@ -235,7 +236,7 @@ def setup_test_homeserver( hs.setup() else: - hs = HomeServer( + hs = homeserverToUse( name, db_pool=None, datastore=datastore, -- cgit 1.5.1