diff options
33 files changed, 804 insertions, 559 deletions
diff --git a/UPGRADE.rst b/UPGRADE.rst index 62b22e9108..2efe7ea60f 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -5,39 +5,48 @@ Before upgrading check if any special steps are required to upgrade from the what you currently have installed to current version of synapse. The extra instructions that may be required are listed later in this document. -If synapse was installed in a virtualenv then active that virtualenv before -upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then run: +1. If synapse was installed in a virtualenv then active that virtualenv before + upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then + run: -.. code:: bash + .. code:: bash - source ~/.synapse/bin/activate + source ~/.synapse/bin/activate -If synapse was installed using pip then upgrade to the latest version by -running: +2. If synapse was installed using pip then upgrade to the latest version by + running: -.. code:: bash + .. code:: bash - pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master + pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master -If synapse was installed using git then upgrade to the latest version by -running: + # restart synapse + synctl restart -.. code:: bash - # Pull the latest version of the master branch. - git pull - # Update the versions of synapse's python dependencies. - python synapse/python_dependencies.py | xargs -n1 pip install --upgrade - -To check whether your update was sucessfull, run: + If synapse was installed using git then upgrade to the latest version by + running: -.. code:: bash + .. code:: bash + + # Pull the latest version of the master branch. + git pull + # Update the versions of synapse's python dependencies. + python synapse/python_dependencies.py | xargs pip install --upgrade - # replace your.server.domain with ther domain of your synapse homeserver - curl https://<your.server.domain>/_matrix/federation/v1/version + # restart synapse + ./synctl restart -So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version. +To check whether your update was sucessful, you can check the Server header +returned by the Client-Server API: + +.. code:: bash + + # replace <host.name> with the hostname of your synapse homeserver. + # You may need to specify a port (eg, :8448) if your server is not + # configured on port 443. + curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:" Upgrading to v0.15.0 ==================== @@ -77,7 +86,7 @@ It has been replaced by specifying a list of application service registrations i ``homeserver.yaml``:: app_service_config_files: ["registration-01.yaml", "registration-02.yaml"] - + Where ``registration-01.yaml`` looks like:: url: <String> # e.g. "https://my.application.service.com" @@ -166,7 +175,7 @@ This release completely changes the database schema and so requires upgrading it before starting the new version of the homeserver. The script "database-prepare-for-0.5.0.sh" should be used to upgrade the -database. This will save all user information, such as logins and profiles, +database. This will save all user information, such as logins and profiles, but will otherwise purge the database. This includes messages, which rooms the home server was a member of and room alias mappings. @@ -175,18 +184,18 @@ file and ask for help in #matrix:matrix.org. The upgrade process is, unfortunately, non trivial and requires human intervention to resolve any resulting conflicts during the upgrade process. -Before running the command the homeserver should be first completely +Before running the command the homeserver should be first completely shutdown. To run it, simply specify the location of the database, e.g.: ./scripts/database-prepare-for-0.5.0.sh "homeserver.db" -Once this has successfully completed it will be safe to restart the -homeserver. You may notice that the homeserver takes a few seconds longer to +Once this has successfully completed it will be safe to restart the +homeserver. You may notice that the homeserver takes a few seconds longer to restart than usual as it reinitializes the database. On startup of the new version, users can either rejoin remote rooms using room aliases or by being reinvited. Alternatively, if any other homeserver sends a -message to a room that the homeserver was previously in the local HS will +message to a room that the homeserver was previously in the local HS will automatically rejoin the room. Upgrading to v0.4.0 @@ -245,7 +254,7 @@ automatically generate default config use:: --config-path homeserver.config \ --generate-config -This config can be edited if desired, for example to specify a different SSL +This config can be edited if desired, for example to specify a different SSL certificate to use. Once done you can run the home server using:: $ python synapse/app/homeserver.py --config-path homeserver.config @@ -266,20 +275,20 @@ This release completely changes the database schema and so requires upgrading it before starting the new version of the homeserver. The script "database-prepare-for-0.0.1.sh" should be used to upgrade the -database. This will save all user information, such as logins and profiles, +database. This will save all user information, such as logins and profiles, but will otherwise purge the database. This includes messages, which rooms the home server was a member of and room alias mappings. -Before running the command the homeserver should be first completely +Before running the command the homeserver should be first completely shutdown. To run it, simply specify the location of the database, e.g.: ./scripts/database-prepare-for-0.0.1.sh "homeserver.db" -Once this has successfully completed it will be safe to restart the -homeserver. You may notice that the homeserver takes a few seconds longer to +Once this has successfully completed it will be safe to restart the +homeserver. You may notice that the homeserver takes a few seconds longer to restart than usual as it reinitializes the database. On startup of the new version, users can either rejoin remote rooms using room aliases or by being reinvited. Alternatively, if any other homeserver sends a -message to a room that the homeserver was previously in the local HS will +message to a room that the homeserver was previously in the local HS will automatically rejoin the room. diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index d1ab42d3af..82a90ef6fa 100644..100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -1,10 +1,30 @@ +#!/usr/bin/env python +# +# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import argparse import nacl.signing import json import base64 import requests import sys import srvlookup - +import yaml def encode_base64(input_bytes): """Encode bytes as a base64 string without any padding.""" @@ -120,11 +140,13 @@ def get_json(origin_name, origin_key, destination, path): origin_name, key, sig, ) authorization_headers.append(bytes(header)) - sys.stderr.write(header) - sys.stderr.write("\n") + print ("Authorization: %s" % header, file=sys.stderr) + + dest = lookup(destination, path) + print ("Requesting %s" % dest, file=sys.stderr) result = requests.get( - lookup(destination, path), + dest, headers={"Authorization": authorization_headers[0]}, verify=False, ) @@ -133,17 +155,66 @@ def get_json(origin_name, origin_key, destination, path): def main(): - origin_name, keyfile, destination, path = sys.argv[1:] + parser = argparse.ArgumentParser( + description= + "Signs and sends a federation request to a matrix homeserver", + ) + + parser.add_argument( + "-N", "--server-name", + help="Name to give as the local homeserver. If unspecified, will be " + "read from the config file.", + ) + + parser.add_argument( + "-k", "--signing-key-path", + help="Path to the file containing the private ed25519 key to sign the " + "request with.", + ) + + parser.add_argument( + "-c", "--config", + default="homeserver.yaml", + help="Path to server config file. Ignored if --server-name and " + "--signing-key-path are both given.", + ) - with open(keyfile) as f: + parser.add_argument( + "-d", "--destination", + default="matrix.org", + help="name of the remote homeserver. We will do SRV lookups and " + "connect appropriately.", + ) + + parser.add_argument( + "path", + help="request path. We will add '/_matrix/federation/v1/' to this." + ) + + args = parser.parse_args() + + if not args.server_name or not args.signing_key_path: + read_args_from_config(args) + + with open(args.signing_key_path) as f: key = read_signing_keys(f)[0] result = get_json( - origin_name, key, destination, "/_matrix/federation/v1/" + path + args.server_name, key, args.destination, "/_matrix/federation/v1/" + args.path ) json.dump(result, sys.stdout) - print "" + print ("") + + +def read_args_from_config(args): + with open(args.config, 'r') as fh: + config = yaml.safe_load(fh) + if not args.server_name: + args.server_name = config['server_name'] + if not args.signing_key_path: + args.signing_key_path = config['signing_key_path'] + if __name__ == "__main__": main() diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 7d158a46a4..bc167b59af 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -252,6 +252,25 @@ class Porter(object): ) return + if table in ( + "user_directory", "user_directory_search", "users_who_share_rooms", + "users_in_pubic_room", + ): + # We don't port these tables, as they're a faff and we can regenreate + # them anyway. + self.progress.update(table, table_size) # Mark table as done + return + + if table == "user_directory_stream_pos": + # We need to make sure there is a single row, `(X, null), as that is + # what synapse expects to be there. + yield self.postgres_store._simple_insert( + table=table, + values={"stream_id": None}, + ) + self.progress.update(table, table_size) # Mark table as done + return + forward_select = ( "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index f8266d1c81..e3da45b416 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -209,7 +209,7 @@ class Auth(object): )[0] if user and access_token and ip_addr: self.store.insert_client_ip( - user=user, + user_id=user.to_string(), access_token=access_token, ip=ip_addr, user_agent=user_agent, diff --git a/synapse/app/_base.py b/synapse/app/_base.py new file mode 100644 index 0000000000..cd0e815919 --- /dev/null +++ b/synapse/app/_base.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import gc +import logging + +import affinity +from daemonize import Daemonize +from synapse.util import PreserveLoggingContext +from synapse.util.rlimit import change_resource_limit +from twisted.internet import reactor + + +def start_worker_reactor(appname, config): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor. Pulls configuration from the 'worker' settings in 'config'. + + Args: + appname (str): application name which will be sent to syslog + config (synapse.config.Config): config object + """ + + logger = logging.getLogger(config.worker_app) + + start_reactor( + appname, + config.soft_file_limit, + config.gc_thresholds, + config.worker_pid_file, + config.worker_daemonize, + config.worker_cpu_affinity, + logger, + ) + + +def start_reactor( + appname, + soft_file_limit, + gc_thresholds, + pid_file, + daemonize, + cpu_affinity, + logger, +): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor + + Args: + appname (str): application name which will be sent to syslog + soft_file_limit (int): + gc_thresholds: + pid_file (str): name of pid file to write to if daemonize is True + daemonize (bool): true to run the reactor in a background process + cpu_affinity (int|None): cpu affinity mask + logger (logging.Logger): logger instance to pass to Daemonize + """ + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + if cpu_affinity is not None: + logger.info("Setting CPU affinity to %s" % cpu_affinity) + affinity.set_process_affinity_mask(0, cpu_affinity) + change_resource_limit(soft_file_limit) + if gc_thresholds: + gc.set_threshold(*gc_thresholds) + reactor.run() + + if daemonize: + daemon = Daemonize( + app=appname, + pid=pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9a476efa63..ba2657bbad 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -13,38 +13,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.appservice") @@ -181,36 +174,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-appservice", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-appservice", config) if __name__ == '__main__': diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 09bc1935f1..129cfa901f 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -13,47 +13,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite +from synapse.crypto import context_factory from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.client_reader") @@ -183,36 +175,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-client-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-client-reader", config) if __name__ == '__main__': diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index eb392e1c9d..40cebe6f4a 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -13,44 +13,36 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.urls import FEDERATION_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import FEDERATION_PREFIX -from synapse.federation.transport.server import TransportLayerServer -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_reader") @@ -172,36 +164,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-reader", config) if __name__ == '__main__': diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 03327dc47a..389e3909d1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -13,44 +13,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_sender") @@ -213,36 +206,12 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-sender", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-sender", config) class FederationSenderHandler(object): diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py new file mode 100644 index 0000000000..bee4c47498 --- /dev/null +++ b/synapse/app/frontend_proxy.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import sys + +import synapse +from synapse import events +from synapse.api.errors import SynapseError +from synapse.app import _base +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.http.server import JsonResource +from synapse.http.servlet import ( + RestServlet, parse_json_object_from_request, +) +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha._base import client_v2_patterns +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.versionstring import get_version_string +from twisted.internet import defer, reactor +from twisted.web.resource import Resource + +logger = logging.getLogger("synapse.app.frontend_proxy") + + +class KeyUploadServlet(RestServlet): + PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$", + releases=()) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(KeyUploadServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.http_client = hs.get_simple_http_client() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_POST(self, request, device_id): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. + if (requester.device_id is not None and + device_id != requester.device_id): + logger.warning("Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, device_id) + else: + device_id = requester.device_id + + if device_id is None: + raise SynapseError( + 400, + "To upload keys, you must pass device_id when authenticating" + ) + + if body: + # They're actually trying to upload something, proxy to main synapse. + result = yield self.http_client.post_json_get_json( + self.main_uri + request.uri, + body, + ) + + defer.returnValue((200, result)) + else: + # Just interested in counts. + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + defer.returnValue((200, {"one_time_key_counts": result})) + + +class FrontendProxySlavedStore( + SlavedDeviceStore, + SlavedClientIpStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + BaseSlavedStore, +): + pass + + +class FrontendProxyServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + KeyUploadServlet(self).register(resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + for address in bind_addresses: + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=address + ) + + logger.info("Synapse client reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + bind_addresses = listener["bind_addresses"] + + for address in bind_addresses: + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=address + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse frontend proxy", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.frontend_proxy" + + assert config.worker_main_http_uri is not None + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FrontendProxyServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def start(): + ss.get_state_handler().start_caching() + ss.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + _base.start_worker_reactor("synapse-frontend-proxy", config) + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 081e7cce59..84ad8f04a0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -13,61 +13,48 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import synapse - import gc import logging import os import sys +import synapse import synapse.config.logger +from synapse import events +from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \ + LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \ + STATIC_PREFIX, WEB_CLIENT_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError - -from synapse.python_dependencies import ( - check_requirements, CONDITIONAL_REQUIREMENTS -) - -from synapse.rest import ClientRestResource -from synapse.storage.engines import create_engine, IncorrectDatabaseSetup -from synapse.storage import are_all_users_on_domain -from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database - -from synapse.server import HomeServer - -from twisted.internet import reactor, defer -from twisted.application import service -from twisted.web.resource import Resource, EncodingResourceWrapper -from twisted.web.static import File -from twisted.web.server import GzipEncoderFactory -from synapse.http.server import RootRedirect -from synapse.rest.media.v0.content_repository import ContentRepoResource -from synapse.rest.media.v1.media_repository import MediaRepositoryResource -from synapse.rest.key.v1.server_key_resource import LocalKey -from synapse.rest.key.v2 import KeyApiV2Resource -from synapse.api.urls import ( - FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX, - SERVER_KEY_V2_PREFIX, -) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.federation.transport.server import TransportLayerServer +from synapse.http.server import RootRedirect +from synapse.http.site import SynapseSite from synapse.metrics import register_memory_metrics -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ + check_requirements from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory -from synapse.federation.transport.server import TransportLayerServer - -from synapse.util.rlimit import change_resource_limit -from synapse.util.versionstring import get_version_string +from synapse.rest import ClientRestResource +from synapse.rest.key.v1.server_key_resource import LocalKey +from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.rest.media.v0.content_repository import ContentRepoResource +from synapse.rest.media.v1.media_repository import MediaRepositoryResource +from synapse.server import HomeServer +from synapse.storage import are_all_users_on_domain +from synapse.storage.engines import IncorrectDatabaseSetup, create_engine +from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole - -from synapse.http.site import SynapseSite - -from synapse import events - -from daemonize import Daemonize +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from twisted.application import service +from twisted.internet import defer, reactor +from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.server import GzipEncoderFactory +from twisted.web.static import File logger = logging.getLogger("synapse.app.homeserver") @@ -446,37 +433,18 @@ def run(hs): # be quite busy the first few minutes clock.call_later(5 * 60, phone_stats_home) - def in_thread(): - # Uncomment to enable tracing of log context changes. - # sys.settrace(logcontext_tracer) - - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - change_resource_limit(hs.config.soft_file_limit) - if hs.config.gc_thresholds: - gc.set_threshold(*hs.config.gc_thresholds) - reactor.run() - - if hs.config.daemonize: - - if hs.config.print_pidfile: - print (hs.config.pid_file) - - daemon = Daemonize( - app="synapse-homeserver", - pid=hs.config.pid_file, - action=lambda: in_thread(), - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - in_thread() + if hs.config.daemonize and hs.config.print_pidfile: + print (hs.config.pid_file) + + _base.start_reactor( + "synapse-homeserver", + hs.config.soft_file_limit, + hs.config.gc_thresholds, + hs.config.pid_file, + hs.config.daemonize, + hs.config.cpu_affinity, + logger, + ) def main(): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index f57ec784fe..36c18bdbcb 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -13,14 +13,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.urls import ( + CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX +) +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -33,27 +40,12 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import ( - CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX -) -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.media_repository") @@ -180,36 +172,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-media-repository", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-media-repository", config) if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f9114acfcb..db9a4d16f4 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -13,41 +13,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.roommember import RoomMemberStore +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.storage.engines import create_engine +from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.storage.engines import create_engine +from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn, \ - PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.pusher") @@ -244,18 +236,6 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_pusherpool().start() ps.get_datastore().start_profiling() @@ -263,18 +243,7 @@ def start(config_options): reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-pusher", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-pusher", config) if __name__ == '__main__': diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4bdd99a966..80e4ba5336 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -13,56 +13,50 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import logging +import sys import synapse - from synapse.api.constants import EventTypes +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.handlers.presence import PresenceHandler, get_interested_parties -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.rest.client.v2_alpha import sync -from synapse.rest.client.v1 import events -from synapse.rest.client.v1.room import RoomInitialSyncRestServlet -from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.filtering import SlavedFilteringStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1 import events +from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.rest.client.v1.room import RoomInitialSyncRestServlet +from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import contextlib -import gc - logger = logging.getLogger("synapse.app.synchrotron") @@ -440,36 +434,13 @@ def start(config_options): ss.setup() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_datastore().start_profiling() ss.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-synchrotron", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-synchrotron", config) if __name__ == '__main__': diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 8c6300db9d..be661a70c7 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -14,16 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse +import logging +import sys -from synapse.server import HomeServer +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v2_alpha import user_directory +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.util.caches.stream_change_cache import StreamChangeCache - -from synapse import events - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.user_dir") @@ -233,36 +227,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-user-dir", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-user-dir", config) if __name__ == '__main__': diff --git a/synapse/config/server.py b/synapse/config/server.py index 28b4e5f50c..89d61a0503 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +30,7 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") + self.cpu_affinity = config.get("cpu_affinity") # Whether to send federation traffic out in this process. This only # applies to some federation traffic, and so shouldn't be used to @@ -147,6 +149,27 @@ class ServerConfig(Config): # When running as a daemon, the file to store the pid in pid_file: %(pid_file)s + # CPU affinity mask. Setting this restricts the CPUs on which the + # process will be scheduled. It is represented as a bitmask, with the + # lowest order bit corresponding to the first logical CPU and the + # highest order bit corresponding to the last logical CPU. Not all CPUs + # may exist on a given system but a mask may specify more CPUs than are + # present. + # + # For example: + # 0x00000001 is processor #0, + # 0x00000003 is processors #0 and #1, + # 0xFFFFFFFF is all processors (#0 through #31). + # + # Pinning a Python process to a single CPU is desirable, because Python + # is inherently single-threaded due to the GIL, and can suffer a + # 30-40%% slowdown due to cache blow-out and thread context switching + # if the scheduler happens to schedule the underlying threads across + # different cores. See + # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/. + # + # cpu_affinity: 0xFFFFFFFF + # Whether to serve a web client from the HTTP/HTTPS root resource. web_client: True diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ea48d931a1..c5a5a8919c 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -32,6 +32,9 @@ class WorkerConfig(Config): self.worker_replication_port = config.get("worker_replication_port", None) self.worker_name = config.get("worker_name", self.worker_app) + self.worker_main_http_uri = config.get("worker_main_http_uri", None) + self.worker_cpu_affinity = config.get("worker_cpu_affinity") + if self.worker_listeners: for listener in self.worker_listeners: bind_address = listener.pop("bind_address", None) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3d676e7d8b..a78f01e442 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -153,12 +153,10 @@ class Authenticator(object): class BaseFederationServlet(object): REQUIRE_AUTH = True - def __init__(self, handler, authenticator, ratelimiter, server_name, - room_list_handler): + def __init__(self, handler, authenticator, ratelimiter, server_name): self.handler = handler self.authenticator = authenticator self.ratelimiter = ratelimiter - self.room_list_handler = room_list_handler def _wrap(self, func): authenticator = self.authenticator @@ -590,7 +588,7 @@ class PublicRoomList(BaseFederationServlet): else: network_tuple = ThirdPartyInstanceID(None, None) - data = yield self.room_list_handler.get_local_public_room_list( + data = yield self.handler.get_local_public_room_list( limit, since_token, network_tuple=network_tuple ) @@ -611,7 +609,7 @@ class FederationVersionServlet(BaseFederationServlet): })) -SERVLET_CLASSES = ( +FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, FederationEventServlet, @@ -634,17 +632,27 @@ SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, OpenIdUserInfo, - PublicRoomList, FederationVersionServlet, ) +ROOM_LIST_CLASSES = ( + PublicRoomList, +) + def register_servlets(hs, resource, authenticator, ratelimiter): - for servletclass in SERVLET_CLASSES: + for servletclass in FEDERATION_SERVLET_CLASSES: servletclass( handler=hs.get_replication_layer(), authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, - room_list_handler=hs.get_room_list_handler(), + ).register(resource) + + for servletclass in ROOM_LIST_CLASSES: + servletclass( + handler=hs.get_room_list_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, ).register(resource) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 694b820d85..b790a7c2ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1413,7 +1413,7 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier() and not backfilled: yield self.action_generator.handle_push_actions_for_event( event, context ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91c6c6be3c..e6df1819b9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -579,18 +579,17 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - room_ids = yield self.store.get_rooms_for_user(user_id) - - user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) - for other_user_id in changed: - other_room_ids = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(other_room_ids): - user_ids_changed.add(other_user_id) + if not changed: + defer.returnValue([]) + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - defer.returnValue(user_ids_changed) + defer.returnValue(users_who_share_room & changed) else: defer.returnValue([]) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 9a96e6fe8f..b0d64aa6c4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -19,8 +19,9 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent -from synapse.visibility import filter_events_for_clients_context from synapse.api.constants import EventTypes, Membership +from synapse.metrics import get_metrics_for +from synapse.util.caches import metrics as cache_metrics from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer @@ -32,6 +33,23 @@ logger = logging.getLogger(__name__) rules_by_room = {} +push_metrics = get_metrics_for(__name__) + +push_rules_invalidation_counter = push_metrics.register_counter( + "push_rules_invalidation_counter" +) +push_rules_state_size_counter = push_metrics.register_counter( + "push_rules_state_size_counter" +) + +# Measures whether we use the fast path of using state deltas, or if we have to +# recalculate from scratch +push_rules_delta_state_cache_metric = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values + cache_name="push_rules_delta_state_cache_metric", +) + class BulkPushRuleEvaluator(object): """Calculates the outcome of push rules for an event for all users in the @@ -42,6 +60,12 @@ class BulkPushRuleEvaluator(object): self.hs = hs self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # There's not good value for this + cache_name="room_push_rule_cache", + ) + @defer.inlineCallbacks def _get_rules_for_event(self, event, context): """This gets the rules for all users in the room at the time of the event, @@ -79,7 +103,10 @@ class BulkPushRuleEvaluator(object): # It's important that RulesForRoom gets added to self._get_rules_for_room.cache # before any lookup methods get called on it as otherwise there may be # a race if invalidate_all gets called (which assumes its in the cache) - return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache) + return RulesForRoom( + self.hs, room_id, self._get_rules_for_room.cache, + self.room_push_rule_cache_metrics, + ) @defer.inlineCallbacks def action_for_event_by_user(self, event, context): @@ -92,15 +119,6 @@ class BulkPushRuleEvaluator(object): rules_by_user = yield self._get_rules_for_event(event, context) actions_by_user = {} - # None of these users can be peeking since this list of users comes - # from the set of users in the room, so we know for sure they're all - # actually in the room. - user_tuples = [(u, False) for u in rules_by_user] - - filtered_by_user = yield filter_events_for_clients_context( - self.store, user_tuples, [event], {event.event_id: context} - ) - room_members = yield self.store.get_joined_users_from_context( event, context ) @@ -110,6 +128,14 @@ class BulkPushRuleEvaluator(object): condition_cache = {} for uid, rules in rules_by_user.iteritems(): + if event.sender == uid: + continue + + if not event.is_state(): + is_ignored = yield self.store.is_ignored_by(event.sender, uid) + if is_ignored: + continue + display_name = None profile_info = room_members.get(uid) if profile_info: @@ -121,13 +147,6 @@ class BulkPushRuleEvaluator(object): if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) - filtered = filtered_by_user[uid] - if len(filtered) == 0: - continue - - if filtered[0].sender == uid: - continue - for rule in rules: if 'enabled' in rule and not rule['enabled']: continue @@ -170,17 +189,19 @@ class RulesForRoom(object): the entire cache for the room. """ - def __init__(self, hs, room_id, rules_for_room_cache): + def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics): """ Args: hs (HomeServer) room_id (str) rules_for_room_cache(Cache): The cache object that caches these RoomsForUser objects. + room_push_rule_cache_metrics (CacheMetric) """ self.room_id = room_id self.is_mine_id = hs.is_mine_id self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = room_push_rule_cache_metrics self.linearizer = Linearizer(name="rules_for_room") @@ -222,11 +243,19 @@ class RulesForRoom(object): """ state_group = context.state_group + if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() + defer.returnValue(self.rules_by_user) + with (yield self.linearizer.queue(())): if state_group and self.state_group == state_group: logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() defer.returnValue(self.rules_by_user) + self.room_push_rule_cache_metrics.inc_misses() + ret_rules_by_user = {} missing_member_event_ids = {} if state_group and self.state_group == context.prev_group: @@ -234,8 +263,13 @@ class RulesForRoom(object): # results. ret_rules_by_user = self.rules_by_user current_state_ids = context.delta_ids + + push_rules_delta_state_cache_metric.inc_hits() else: current_state_ids = context.current_state_ids + push_rules_delta_state_cache_metric.inc_misses() + + push_rules_state_size_counter.inc_by(len(current_state_ids)) logger.debug( "Looking for member changes in %r %r", state_group, current_state_ids @@ -282,6 +316,14 @@ class RulesForRoom(object): yield self._update_rules_with_member_event_ids( ret_rules_by_user, missing_member_event_ids, state_group, event ) + else: + # The push rules didn't change but lets update the cache anyway + self.update_cache( + self.sequence, + members={}, # There were no membership changes + rules_by_user=ret_rules_by_user, + state_group=state_group + ) if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -380,6 +422,7 @@ class RulesForRoom(object): self.state_group = object() self.member_map = {} self.rules_by_user = {} + push_rules_invalidation_counter.inc() def update_cache(self, sequence, members, rules_by_user, state_group): if sequence == self.sequence: diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 4d88046579..172c27c137 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -200,7 +200,9 @@ def _glob_to_re(glob, word_boundary): return re.compile(r, flags=re.IGNORECASE) -def _flatten_dict(d, prefix=[], result={}): +def _flatten_dict(d, prefix=[], result=None): + if result is None: + result = {} for key, value in d.items(): if isinstance(value, basestring): result[".".join(prefix + [key])] = value.lower() diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index ed7f1c89ad..630e92c90e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -31,7 +31,7 @@ REQUIREMENTS = { "pyyaml": ["yaml"], "pyasn1": ["pyasn1"], "daemonize": ["daemonize"], - "py-bcrypt": ["bcrypt"], + "bcrypt": ["bcrypt"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], "ujson": ["ujson"], @@ -40,6 +40,7 @@ REQUIREMENTS = { "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], + "affinity": ["affinity"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 65250285e8..352c9a2aa8 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -29,9 +29,8 @@ class SlavedClientIpStore(BaseSlavedStore): max_entries=50000 * CACHE_SIZE_FACTOR, ) - def insert_client_ip(self, user, access_token, ip, user_agent, device_id): + def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) - user_id = user.to_string() key = (user_id, access_token, ip) try: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index a009214e43..171227cce2 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -323,14 +323,18 @@ class UserIpCommand(Command): @classmethod def from_line(cls, line): - user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5) + user_id, jsn = line.split(" ", 1) - return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen)) + access_token, ip, user_agent, device_id, last_seen = json.loads(jsn) + + return cls( + user_id, access_token, ip, user_agent, device_id, last_seen + ) def to_line(self): - return " ".join(( - self.user_id, self.access_token, self.ip, self.device_id, - str(self.last_seen), self.user_agent, + return self.user_id + " " + json.dumps(( + self.access_token, self.ip, self.user_agent, self.device_id, + self.last_seen, )) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 062272f8dd..d59503b905 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -244,7 +244,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): becoming full. """ if self.state == ConnectionStates.CLOSED: - logger.info("[%s] Not sending, connection closed", self.id()) + logger.debug("[%s] Not sending, connection closed", self.id()) return if do_buffer and self.state != ConnectionStates.ESTABLISHED: @@ -264,7 +264,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def _queue_command(self, cmd): """Queue the command until the connection is ready to write to again. """ - logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) + logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) self.pending_commands.append(cmd) if len(self.pending_commands) > self.max_line_buffer: diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 7d786e8de3..465b25033d 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -168,7 +168,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): DEFAULT_MESSAGE = ( "Sharing illegal content on this server is not permitted and rooms in" - " violatation will be blocked." + " violation will be blocked." ) def __init__(self, hs): @@ -296,7 +296,7 @@ class QuarantineMediaInRoom(ClientV1RestServlet): class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/reset_password/ @user:to_reset_password?access_token=admin_access_token @@ -319,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): """Post request to allow an administrator reset password for a user. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. """ UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) @@ -343,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): class GetUsersPaginatedRestServlet(ClientV1RestServlet): """Get request to get specific number of users from Synapse. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/ @admin:user?access_token=admin_access_token&start=0&limit=10 @@ -362,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, target_user_id): """Get request to get specific number of users from Synapse. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. """ target_user = UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) @@ -395,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): """Post request to get specific number of users from Synapse.. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/ @admin:user?access_token=admin_access_token @@ -433,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): class SearchUsersRestServlet(ClientV1RestServlet): """Get request to search user table for specific users according to search term. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/search_users/ @admin:user?access_token=admin_access_token&term=alice @@ -453,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet): def on_GET(self, request, target_user_id): """Get request to search user table for specific users according to search term. - This need a user have a administrator access in Synapse. + This needs user to have a administrator access in Synapse. """ target_user = UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 83e209d18f..2939896f44 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet): filter_id = parse_string(request, "filter", default=None) full_state = parse_boolean(request, "full_state", default=False) - logger.info( + logger.debug( "/sync: user=%r, timeout=%r, since=%r," " set_presence=%r, filter_id=%r, device_id=%r" % ( user, timeout, since, set_presence, filter_id, device_id @@ -164,27 +164,34 @@ class SyncRestServlet(RestServlet): ) time_now = self.clock.time_msec() + response_content = self.encode_response( + time_now, sync_result, requester.access_token_id, filter + ) + + defer.returnValue((200, response_content)) - joined = self.encode_joined( - sync_result.joined, time_now, requester.access_token_id, filter.event_fields + @staticmethod + def encode_response(time_now, sync_result, access_token_id, filter): + joined = SyncRestServlet.encode_joined( + sync_result.joined, time_now, access_token_id, filter.event_fields ) - invited = self.encode_invited( - sync_result.invited, time_now, requester.access_token_id + invited = SyncRestServlet.encode_invited( + sync_result.invited, time_now, access_token_id, ) - archived = self.encode_archived( - sync_result.archived, time_now, requester.access_token_id, + archived = SyncRestServlet.encode_archived( + sync_result.archived, time_now, access_token_id, filter.event_fields, ) - response_content = { + return { "account_data": {"events": sync_result.account_data}, "to_device": {"events": sync_result.to_device}, "device_lists": { "changed": list(sync_result.device_lists), }, - "presence": self.encode_presence( + "presence": SyncRestServlet.encode_presence( sync_result.presence, time_now ), "rooms": { @@ -196,9 +203,8 @@ class SyncRestServlet(RestServlet): "next_batch": sync_result.next_batch.to_string(), } - defer.returnValue((200, response_content)) - - def encode_presence(self, events, time_now): + @staticmethod + def encode_presence(events, time_now): return { "events": [ { @@ -212,7 +218,8 @@ class SyncRestServlet(RestServlet): ] } - def encode_joined(self, rooms, time_now, token_id, event_fields): + @staticmethod + def encode_joined(rooms, time_now, token_id, event_fields): """ Encode the joined rooms in a sync result @@ -231,13 +238,14 @@ class SyncRestServlet(RestServlet): """ joined = {} for room in rooms: - joined[room.room_id] = self.encode_room( + joined[room.room_id] = SyncRestServlet.encode_room( room, time_now, token_id, only_fields=event_fields ) return joined - def encode_invited(self, rooms, time_now, token_id): + @staticmethod + def encode_invited(rooms, time_now, token_id): """ Encode the invited rooms in a sync result @@ -270,7 +278,8 @@ class SyncRestServlet(RestServlet): return invited - def encode_archived(self, rooms, time_now, token_id, event_fields): + @staticmethod + def encode_archived(rooms, time_now, token_id, event_fields): """ Encode the archived rooms in a sync result @@ -289,7 +298,7 @@ class SyncRestServlet(RestServlet): """ joined = {} for room in rooms: - joined[room.room_id] = self.encode_room( + joined[room.room_id] = SyncRestServlet.encode_room( room, time_now, token_id, joined=False, only_fields=event_fields ) diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index aa84ffc2b0..ff14e54c11 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) + + @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) + def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): + ignored_account_data = yield self.get_global_account_data_by_type_for_user( + "m.ignored_user_list", ignorer_user_id, + on_invalidate=cache_context.invalidate, + ) + if not ignored_account_data: + defer.returnValue(False) + + defer.returnValue( + ignored_user_id in ignored_account_data.get("ignored_users", {}) + ) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index fc468ea185..3c95e90eca 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -56,9 +56,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) - def insert_client_ip(self, user, access_token, ip, user_agent, device_id): - now = int(self._clock.time_msec()) - key = (user.to_string(), access_token, ip) + def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, + now=None): + if not now: + now = int(self._clock.time_msec()) + key = (user_id, access_token, ip) try: last_seen = self.client_ip_last_seen.get(key) diff --git a/synapse/visibility.py b/synapse/visibility.py index c4dd9ae2c7..5590b866ed 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -189,25 +189,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks -def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context): - user_ids = set(u[0] for u in user_tuples) - event_id_to_state = {} - for event_id, context in event_id_to_context.items(): - state = yield store.get_events([ - e_id - for key, e_id in context.current_state_ids.iteritems() - if key == (EventTypes.RoomHistoryVisibility, "") - or (key[0] == EventTypes.Member and key[1] in user_ids) - ]) - event_id_to_state[event_id] = state - - res = yield filter_events_for_clients( - store, user_tuples, events, event_id_to_state - ) - defer.returnValue(res) - - -@defer.inlineCallbacks def filter_events_for_client(store, user_id, events, is_peeking=False): """ Check which events a user is allowed to see diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 2eaaa8253c..778ff2f6e9 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -19,7 +19,6 @@ import synapse.api.errors import synapse.handlers.device import synapse.storage -from synapse import types from tests import unittest, utils user1 = "@boris:aaa" @@ -179,6 +178,6 @@ class DeviceTestCase(unittest.TestCase): if ip is not None: yield self.store.insert_client_ip( - types.UserID.from_string(user_id), + user_id, access_token, ip, "user_agent", device_id) self.clock.advance_time(1000) diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 03df697575..bd6fda6cb1 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -15,9 +15,6 @@ from twisted.internet import defer -import synapse.server -import synapse.storage -import synapse.types import tests.unittest import tests.utils @@ -39,7 +36,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase): self.clock.now = 12345678 user_id = "@user:id" yield self.store.insert_client_ip( - synapse.types.UserID.from_string(user_id), + user_id, "access_token", "ip", "user_agent", "device_id", ) |