diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/app/_base.py | 99 | ||||
-rw-r--r-- | synapse/app/appservice.py | 50 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 53 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 53 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 57 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 64 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 114 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 53 | ||||
-rw-r--r-- | synapse/app/pusher.py | 57 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 69 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 53 | ||||
-rw-r--r-- | synapse/config/server.py | 23 | ||||
-rw-r--r-- | synapse/config/workers.py | 1 | ||||
-rw-r--r-- | synapse/crypto/keyring.py | 64 | ||||
-rw-r--r-- | synapse/handlers/device.py | 68 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 126 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 56 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 20 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 3 | ||||
-rw-r--r-- | synapse/rest/client/v1/admin.py | 16 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/keys.py | 6 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/sync.py | 5 | ||||
-rw-r--r-- | synapse/visibility.py | 14 |
24 files changed, 580 insertions, 546 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py new file mode 100644 index 0000000000..cd0e815919 --- /dev/null +++ b/synapse/app/_base.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import gc +import logging + +import affinity +from daemonize import Daemonize +from synapse.util import PreserveLoggingContext +from synapse.util.rlimit import change_resource_limit +from twisted.internet import reactor + + +def start_worker_reactor(appname, config): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor. Pulls configuration from the 'worker' settings in 'config'. + + Args: + appname (str): application name which will be sent to syslog + config (synapse.config.Config): config object + """ + + logger = logging.getLogger(config.worker_app) + + start_reactor( + appname, + config.soft_file_limit, + config.gc_thresholds, + config.worker_pid_file, + config.worker_daemonize, + config.worker_cpu_affinity, + logger, + ) + + +def start_reactor( + appname, + soft_file_limit, + gc_thresholds, + pid_file, + daemonize, + cpu_affinity, + logger, +): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor + + Args: + appname (str): application name which will be sent to syslog + soft_file_limit (int): + gc_thresholds: + pid_file (str): name of pid file to write to if daemonize is True + daemonize (bool): true to run the reactor in a background process + cpu_affinity (int|None): cpu affinity mask + logger (logging.Logger): logger instance to pass to Daemonize + """ + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + if cpu_affinity is not None: + logger.info("Setting CPU affinity to %s" % cpu_affinity) + affinity.set_process_affinity_mask(0, cpu_affinity) + change_resource_limit(soft_file_limit) + if gc_thresholds: + gc.set_threshold(*gc_thresholds) + reactor.run() + + if daemonize: + daemon = Daemonize( + app=appname, + pid=pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9a476efa63..ba2657bbad 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -13,38 +13,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.appservice") @@ -181,36 +174,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-appservice", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-appservice", config) if __name__ == '__main__': diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 09bc1935f1..129cfa901f 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -13,47 +13,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite +from synapse.crypto import context_factory from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.client_reader") @@ -183,36 +175,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-client-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-client-reader", config) if __name__ == '__main__': diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index eb392e1c9d..40cebe6f4a 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -13,44 +13,36 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.urls import FEDERATION_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import FEDERATION_PREFIX -from synapse.federation.transport.server import TransportLayerServer -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_reader") @@ -172,36 +164,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-reader", config) if __name__ == '__main__': diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 03327dc47a..389e3909d1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -13,44 +13,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_sender") @@ -213,36 +206,12 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-sender", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-sender", config) class FederationSenderHandler(object): diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 132f18a979..bee4c47498 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -13,48 +13,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.errors import SynapseError +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite +from synapse.crypto import context_factory from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.servlet import ( + RestServlet, parse_json_object_from_request, +) +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.crypto import context_factory -from synapse.api.errors import SynapseError -from synapse.http.servlet import ( - RestServlet, parse_json_object_from_request, -) -from synapse.rest.client.v2_alpha._base import client_v2_patterns - -from synapse import events - - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - - logger = logging.getLogger("synapse.app.frontend_proxy") @@ -234,36 +225,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-frontend-proxy", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-frontend-proxy", config) if __name__ == '__main__': diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 081e7cce59..84ad8f04a0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -13,61 +13,48 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import synapse - import gc import logging import os import sys +import synapse import synapse.config.logger +from synapse import events +from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \ + LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \ + STATIC_PREFIX, WEB_CLIENT_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError - -from synapse.python_dependencies import ( - check_requirements, CONDITIONAL_REQUIREMENTS -) - -from synapse.rest import ClientRestResource -from synapse.storage.engines import create_engine, IncorrectDatabaseSetup -from synapse.storage import are_all_users_on_domain -from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database - -from synapse.server import HomeServer - -from twisted.internet import reactor, defer -from twisted.application import service -from twisted.web.resource import Resource, EncodingResourceWrapper -from twisted.web.static import File -from twisted.web.server import GzipEncoderFactory -from synapse.http.server import RootRedirect -from synapse.rest.media.v0.content_repository import ContentRepoResource -from synapse.rest.media.v1.media_repository import MediaRepositoryResource -from synapse.rest.key.v1.server_key_resource import LocalKey -from synapse.rest.key.v2 import KeyApiV2Resource -from synapse.api.urls import ( - FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX, - SERVER_KEY_V2_PREFIX, -) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.federation.transport.server import TransportLayerServer +from synapse.http.server import RootRedirect +from synapse.http.site import SynapseSite from synapse.metrics import register_memory_metrics -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ + check_requirements from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory -from synapse.federation.transport.server import TransportLayerServer - -from synapse.util.rlimit import change_resource_limit -from synapse.util.versionstring import get_version_string +from synapse.rest import ClientRestResource +from synapse.rest.key.v1.server_key_resource import LocalKey +from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.rest.media.v0.content_repository import ContentRepoResource +from synapse.rest.media.v1.media_repository import MediaRepositoryResource +from synapse.server import HomeServer +from synapse.storage import are_all_users_on_domain +from synapse.storage.engines import IncorrectDatabaseSetup, create_engine +from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole - -from synapse.http.site import SynapseSite - -from synapse import events - -from daemonize import Daemonize +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from twisted.application import service +from twisted.internet import defer, reactor +from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.server import GzipEncoderFactory +from twisted.web.static import File logger = logging.getLogger("synapse.app.homeserver") @@ -446,37 +433,18 @@ def run(hs): # be quite busy the first few minutes clock.call_later(5 * 60, phone_stats_home) - def in_thread(): - # Uncomment to enable tracing of log context changes. - # sys.settrace(logcontext_tracer) - - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - change_resource_limit(hs.config.soft_file_limit) - if hs.config.gc_thresholds: - gc.set_threshold(*hs.config.gc_thresholds) - reactor.run() - - if hs.config.daemonize: - - if hs.config.print_pidfile: - print (hs.config.pid_file) - - daemon = Daemonize( - app="synapse-homeserver", - pid=hs.config.pid_file, - action=lambda: in_thread(), - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - in_thread() + if hs.config.daemonize and hs.config.print_pidfile: + print (hs.config.pid_file) + + _base.start_reactor( + "synapse-homeserver", + hs.config.soft_file_limit, + hs.config.gc_thresholds, + hs.config.pid_file, + hs.config.daemonize, + hs.config.cpu_affinity, + logger, + ) def main(): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index f57ec784fe..36c18bdbcb 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -13,14 +13,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.urls import ( + CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX +) +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -33,27 +40,12 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import ( - CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX -) -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.media_repository") @@ -180,36 +172,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-media-repository", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-media-repository", config) if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f9114acfcb..db9a4d16f4 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -13,41 +13,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.roommember import RoomMemberStore +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.storage.engines import create_engine +from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.storage.engines import create_engine +from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn, \ - PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.pusher") @@ -244,18 +236,6 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_pusherpool().start() ps.get_datastore().start_profiling() @@ -263,18 +243,7 @@ def start(config_options): reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-pusher", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-pusher", config) if __name__ == '__main__': diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d06a05acd9..576ac6fb7e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -13,57 +13,51 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import logging +import sys import synapse - from synapse.api.constants import EventTypes +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.handlers.presence import PresenceHandler, get_interested_parties -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.rest.client.v2_alpha import sync -from synapse.rest.client.v1 import events -from synapse.rest.client.v1.room import RoomInitialSyncRestServlet -from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.filtering import SlavedFilteringStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1 import events +from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.rest.client.v1.room import RoomInitialSyncRestServlet +from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import contextlib -import gc - logger = logging.getLogger("synapse.app.synchrotron") @@ -446,36 +440,13 @@ def start(config_options): ss.setup() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_datastore().start_profiling() ss.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-synchrotron", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-synchrotron", config) if __name__ == '__main__': diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 8c6300db9d..be661a70c7 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -14,16 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse +import logging +import sys -from synapse.server import HomeServer +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v2_alpha import user_directory +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.util.caches.stream_change_cache import StreamChangeCache - -from synapse import events - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.user_dir") @@ -233,36 +227,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-user-dir", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-user-dir", config) if __name__ == '__main__': diff --git a/synapse/config/server.py b/synapse/config/server.py index 28b4e5f50c..89d61a0503 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +30,7 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") + self.cpu_affinity = config.get("cpu_affinity") # Whether to send federation traffic out in this process. This only # applies to some federation traffic, and so shouldn't be used to @@ -147,6 +149,27 @@ class ServerConfig(Config): # When running as a daemon, the file to store the pid in pid_file: %(pid_file)s + # CPU affinity mask. Setting this restricts the CPUs on which the + # process will be scheduled. It is represented as a bitmask, with the + # lowest order bit corresponding to the first logical CPU and the + # highest order bit corresponding to the last logical CPU. Not all CPUs + # may exist on a given system but a mask may specify more CPUs than are + # present. + # + # For example: + # 0x00000001 is processor #0, + # 0x00000003 is processors #0 and #1, + # 0xFFFFFFFF is all processors (#0 through #31). + # + # Pinning a Python process to a single CPU is desirable, because Python + # is inherently single-threaded due to the GIL, and can suffer a + # 30-40%% slowdown due to cache blow-out and thread context switching + # if the scheduler happens to schedule the underlying threads across + # different cores. See + # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/. + # + # cpu_affinity: 0xFFFFFFFF + # Whether to serve a web client from the HTTP/HTTPS root resource. web_client: True diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 99d5d8aaeb..c5a5a8919c 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -33,6 +33,7 @@ class WorkerConfig(Config): self.worker_name = config.get("worker_name", self.worker_app) self.worker_main_http_uri = config.get("worker_main_http_uri", None) + self.worker_cpu_affinity = config.get("worker_cpu_affinity") if self.worker_listeners: for listener in self.worker_listeners: diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1bb27edc0f..51851d04e5 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,10 +16,9 @@ from synapse.crypto.keyclient import fetch_server_key from synapse.api.errors import SynapseError, Codes -from synapse.util import unwrapFirstError -from synapse.util.async import ObservableDeferred +from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( - preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, + preserve_context_over_fn, PreserveLoggingContext, preserve_fn ) from synapse.util.metrics import Measure @@ -74,6 +74,11 @@ class Keyring(object): self.perspective_servers = self.config.perspectives self.hs = hs + # map from server name to Deferred. Has an entry for each server with + # an ongoing key download; the Deferred completes once the download + # completes. + # + # These are regular, logcontext-agnostic Deferreds. self.key_downloads = {} def verify_json_for_server(self, server_name, json_object): @@ -82,7 +87,7 @@ class Keyring(object): )[0] def verify_json_objects_for_server(self, server_and_json): - """Bulk verfies signatures of json objects, bulk fetching keys as + """Bulk verifies signatures of json objects, bulk fetching keys as necessary. Args: @@ -212,7 +217,13 @@ class Keyring(object): Args: server_names (list): list of server_names we want to lookup server_to_deferred (dict): server_name to deferred which gets - resolved once we've finished looking up keys for that server + resolved once we've finished looking up keys for that server. + The Deferreds should be regular twisted ones which call their + callbacks with no logcontext. + + Returns: a Deferred which resolves once all key lookups for the given + servers have completed. Follows the synapse rules of logcontext + preservation. """ while True: wait_on = [ @@ -226,15 +237,13 @@ class Keyring(object): else: break - for server_name, deferred in server_to_deferred.items(): - d = ObservableDeferred(preserve_context_over_deferred(deferred)) - self.key_downloads[server_name] = d - - def rm(r, server_name): - self.key_downloads.pop(server_name, None) - return r + def rm(r, server_name_): + self.key_downloads.pop(server_name_, None) + return r - d.addBoth(rm, server_name) + for server_name, deferred in server_to_deferred.items(): + self.key_downloads[server_name] = deferred + deferred.addBoth(rm, server_name) def get_server_verify_keys(self, verify_requests): """Tries to find at least one key for each verify request @@ -333,7 +342,7 @@ class Keyring(object): Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from server_name -> key_id -> VerifyKey """ - res = yield preserve_context_over_deferred(defer.gatherResults( + res = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store.get_server_verify_keys)( server_name, key_ids @@ -341,7 +350,7 @@ class Keyring(object): for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(dict(res)) @@ -362,13 +371,13 @@ class Keyring(object): ) defer.returnValue({}) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(get_key)(p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) union_of_keys = {} for result in results: @@ -402,13 +411,13 @@ class Keyring(object): defer.returnValue(keys) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(get_key)(server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) merged = {} for result in results: @@ -485,7 +494,7 @@ class Keyring(object): for server_name, response_keys in processed_response.items(): keys.setdefault(server_name, {}).update(response_keys) - yield preserve_context_over_deferred(defer.gatherResults( + yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=server_name, @@ -495,7 +504,7 @@ class Keyring(object): for server_name, response_keys in keys.items() ], consumeErrors=True - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(keys) @@ -543,7 +552,7 @@ class Keyring(object): keys.update(response_keys) - yield preserve_context_over_deferred(defer.gatherResults( + yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=key_server_name, @@ -553,7 +562,7 @@ class Keyring(object): for key_server_name, verify_keys in keys.items() ], consumeErrors=True - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(keys) @@ -619,7 +628,7 @@ class Keyring(object): response_keys.update(verify_keys) response_keys.update(old_verify_keys) - yield preserve_context_over_deferred(defer.gatherResults( + yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store.store_server_keys_json)( server_name=server_name, @@ -632,7 +641,7 @@ class Keyring(object): for key_id in updated_key_ids ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) results[server_name] = response_keys @@ -710,7 +719,6 @@ class Keyring(object): defer.returnValue(verify_keys) - @defer.inlineCallbacks def store_keys(self, server_name, from_server, verify_keys): """Store a collection of verify keys for a given server Args: @@ -721,7 +729,7 @@ class Keyring(object): A deferred that completes when the keys are stored. """ # TODO(markjh): Store whether the keys have expired. - yield preserve_context_over_deferred(defer.gatherResults( + return logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key @@ -729,4 +737,4 @@ class Keyring(object): for key_id, key in verify_keys.items() ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed60d494ff..dac4b3f4e0 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler): user_id (str) from_token (StreamToken) """ + now_token = yield self.hs.get_event_sources().get_current_token() + room_ids = yield self.store.get_rooms_for_user(user_id) # First we check if any devices have changed @@ -280,11 +282,30 @@ class DeviceHandler(BaseHandler): # Then work out if any users have since joined rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + member_events = yield self.store.get_membership_changes_for_user( + user_id, from_token.room_key, now_token.room_key + ) + rooms_changed.update(event.room_id for event in member_events) + stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key).stream + from_token.room_key + ).stream possibly_changed = set(changed) + possibly_left = set() for room_id in rooms_changed: + current_state_ids = yield self.store.get_current_state_ids(room_id) + + # The user may have left the room + # TODO: Check if they actually did or if we were just invited. + if room_id not in room_ids: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_left.add(state_key) + continue + # Fetch the current state at the time. try: event_ids = yield self.store.get_forward_extremeties_for_room( @@ -295,8 +316,6 @@ class DeviceHandler(BaseHandler): # ordering: treat it the same as a new room event_ids = [] - current_state_ids = yield self.store.get_current_state_ids(room_id) - # special-case for an empty prev state: include all members # in the changed list if not event_ids: @@ -307,9 +326,25 @@ class DeviceHandler(BaseHandler): possibly_changed.add(state_key) continue + current_member_id = current_state_ids.get((EventTypes.Member, user_id)) + if not current_member_id: + continue + # mapping from event_id -> state_dict prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + # Check if we've joined the room? If so we just blindly add all the users to + # the "possibly changed" users. + for state_dict in prev_state_ids.itervalues(): + member_event = state_dict.get((EventTypes.Member, user_id), None) + if not member_event or member_event != current_member_id: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + break + # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. @@ -320,19 +355,30 @@ class DeviceHandler(BaseHandler): # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.values(): + for state_dict in prev_state_ids.itervalues(): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: - possibly_changed.add(state_key) + if state_key != user_id: + possibly_changed.add(state_key) break - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) + if possibly_changed or possibly_left: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - defer.returnValue(users_who_share_room & possibly_changed) + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = (possibly_changed | possibly_left) - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b790a7c2ef..4669199b2d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1606,7 +1606,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - if event.type == EventTypes.GuestAccess: + if event.type == EventTypes.GuestAccess and not context.rejected: yield self.maybe_kick_guest_users(event) defer.returnValue(context) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d7b90a35ea..f2e4ffcec6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -119,6 +119,16 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ return bool(self.join or self.invite or self.leave) +class DeviceLists(collections.namedtuple("DeviceLists", [ + "changed", # list of user_ids whose devices may have changed + "left", # list of user_ids whose devices we no longer track +])): + __slots__ = [] + + def __nonzero__(self): + return bool(self.changed or self.left) + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -296,6 +306,11 @@ class SyncHandler(object): timeline_limit = sync_config.filter_collection.timeline_limit() block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline() + # Pull out the current state, as we always want to include those events + # in the timeline if they're there. + current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = frozenset(current_state_ids.itervalues()) + if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: @@ -307,6 +322,7 @@ class SyncHandler(object): self.store, sync_config.user.to_string(), recents, + always_include_ids=current_state_ids, ) else: recents = [] @@ -342,6 +358,7 @@ class SyncHandler(object): self.store, sync_config.user.to_string(), loaded_recents, + always_include_ids=current_state_ids, ) loaded_recents.extend(recents) recents = loaded_recents @@ -548,7 +565,8 @@ class SyncHandler(object): res = yield self._generate_sync_entry_for_rooms( sync_result_builder, account_data_by_room ) - newly_joined_rooms, newly_joined_users = res + newly_joined_rooms, newly_joined_users, _, _ = res + _, _, newly_left_rooms, newly_left_users = res block_all_presence_data = ( since_token is None and @@ -562,7 +580,11 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) device_lists = yield self._generate_sync_entry_for_device_list( - sync_result_builder + sync_result_builder, + newly_joined_rooms=newly_joined_rooms, + newly_joined_users=newly_joined_users, + newly_left_rooms=newly_left_rooms, + newly_left_users=newly_left_users, ) device_id = sync_config.device_id @@ -635,25 +657,50 @@ class SyncHandler(object): @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks - def _generate_sync_entry_for_device_list(self, sync_result_builder): + def _generate_sync_entry_for_device_list(self, sync_result_builder, + newly_joined_rooms, newly_joined_users, + newly_left_rooms, newly_left_users): user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - room_ids = yield self.store.get_rooms_for_user(user_id) - - user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) - for other_user_id in changed: - other_room_ids = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(other_room_ids): - user_ids_changed.add(other_user_id) - defer.returnValue(user_ids_changed) + # TODO: Be more clever than this, i.e. remove users who we already + # share a room with? + for room_id in newly_joined_rooms: + joined_users = yield self.state.get_current_user_in_room(room_id) + newly_joined_users.update(joined_users) + + for room_id in newly_left_rooms: + left_users = yield self.state.get_current_user_in_room(room_id) + newly_left_users.update(left_users) + + # TODO: Check that these users are actually new, i.e. either they + # weren't in the previous sync *or* they left and rejoined. + changed.update(newly_joined_users) + + if not changed and not newly_left_users: + defer.returnValue(DeviceLists( + changed=[], + left=newly_left_users, + )) + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + defer.returnValue(DeviceLists( + changed=users_who_share_room & changed, + left=set(newly_left_users) - users_who_share_room, + )) else: - defer.returnValue([]) + defer.returnValue(DeviceLists( + changed=[], + left=[], + )) @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): @@ -817,8 +864,8 @@ class SyncHandler(object): account_data_by_room(dict): Dictionary of per room account data Returns: - Deferred(tuple): Returns a 2-tuple of - `(newly_joined_rooms, newly_joined_users)` + Deferred(tuple): Returns a 4-tuple of + `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)` """ user_id = sync_result_builder.sync_config.user.to_string() block_all_room_ephemeral = ( @@ -849,7 +896,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - defer.returnValue(([], [])) + defer.returnValue(([], [], [], [])) ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, @@ -862,7 +909,7 @@ class SyncHandler(object): if since_token: res = yield self._get_rooms_changed(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms, newly_left_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, since_token.account_data_key, @@ -870,6 +917,7 @@ class SyncHandler(object): else: res = yield self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res + newly_left_rooms = [] tags_by_room = yield self.store.get_tags_for_user(user_id) @@ -890,17 +938,30 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() + newly_left_users = set() if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.values() + joined_sync.timeline.events, joined_sync.state.itervalues() ) for event in it: if event.type == EventTypes.Member: if event.membership == Membership.JOIN: newly_joined_users.add(event.state_key) - - defer.returnValue((newly_joined_rooms, newly_joined_users)) + else: + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership == Membership.JOIN: + newly_left_users.add(event.state_key) + + newly_left_users -= newly_joined_users + + defer.returnValue(( + newly_joined_rooms, + newly_joined_users, + newly_left_rooms, + newly_left_users, + )) @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): @@ -970,15 +1031,17 @@ class SyncHandler(object): mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) newly_joined_rooms = [] + newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.items(): + for room_id, events in mem_change_events_by_room_id.iteritems(): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure # we do send down the room, and with full state, where necessary + old_state_ids = None if room_id in joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) @@ -996,6 +1059,26 @@ class SyncHandler(object): if not non_joins: continue + # Check if we have left the room. This can either be because we were + # joined before *or* that we since joined and then left. + if events[-1].membership != Membership.JOIN: + if has_join: + newly_left_rooms.append(room_id) + else: + if not old_state_ids: + old_state_ids = yield self.get_state_at(room_id, since_token) + old_mem_ev_id = old_state_ids.get( + (EventTypes.Member, user_id), + None, + ) + old_mem_ev = None + if old_mem_ev_id: + old_mem_ev = yield self.store.get_event( + old_mem_ev_id, allow_none=True + ) + if old_mem_ev and old_mem_ev.membership == Membership.JOIN: + newly_left_rooms.append(room_id) + # Only bother if we're still currently invited should_invite = non_joins[-1].membership == Membership.INVITE if should_invite: @@ -1073,7 +1156,7 @@ class SyncHandler(object): upto_token=since_token, )) - defer.returnValue((room_entries, invited, newly_joined_rooms)) + defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builder, ignored_users): @@ -1322,6 +1405,7 @@ class SyncResultBuilder(object): self.archived = [] self.device = [] self.groups = None + self.to_device = [] class RoomSyncResultBuilder(object): diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 803ac3e75b..b0d64aa6c4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -20,6 +20,8 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes, Membership +from synapse.metrics import get_metrics_for +from synapse.util.caches import metrics as cache_metrics from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer @@ -31,6 +33,23 @@ logger = logging.getLogger(__name__) rules_by_room = {} +push_metrics = get_metrics_for(__name__) + +push_rules_invalidation_counter = push_metrics.register_counter( + "push_rules_invalidation_counter" +) +push_rules_state_size_counter = push_metrics.register_counter( + "push_rules_state_size_counter" +) + +# Measures whether we use the fast path of using state deltas, or if we have to +# recalculate from scratch +push_rules_delta_state_cache_metric = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values + cache_name="push_rules_delta_state_cache_metric", +) + class BulkPushRuleEvaluator(object): """Calculates the outcome of push rules for an event for all users in the @@ -41,6 +60,12 @@ class BulkPushRuleEvaluator(object): self.hs = hs self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # There's not good value for this + cache_name="room_push_rule_cache", + ) + @defer.inlineCallbacks def _get_rules_for_event(self, event, context): """This gets the rules for all users in the room at the time of the event, @@ -78,7 +103,10 @@ class BulkPushRuleEvaluator(object): # It's important that RulesForRoom gets added to self._get_rules_for_room.cache # before any lookup methods get called on it as otherwise there may be # a race if invalidate_all gets called (which assumes its in the cache) - return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache) + return RulesForRoom( + self.hs, room_id, self._get_rules_for_room.cache, + self.room_push_rule_cache_metrics, + ) @defer.inlineCallbacks def action_for_event_by_user(self, event, context): @@ -161,17 +189,19 @@ class RulesForRoom(object): the entire cache for the room. """ - def __init__(self, hs, room_id, rules_for_room_cache): + def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics): """ Args: hs (HomeServer) room_id (str) rules_for_room_cache(Cache): The cache object that caches these RoomsForUser objects. + room_push_rule_cache_metrics (CacheMetric) """ self.room_id = room_id self.is_mine_id = hs.is_mine_id self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = room_push_rule_cache_metrics self.linearizer = Linearizer(name="rules_for_room") @@ -213,11 +243,19 @@ class RulesForRoom(object): """ state_group = context.state_group + if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() + defer.returnValue(self.rules_by_user) + with (yield self.linearizer.queue(())): if state_group and self.state_group == state_group: logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() defer.returnValue(self.rules_by_user) + self.room_push_rule_cache_metrics.inc_misses() + ret_rules_by_user = {} missing_member_event_ids = {} if state_group and self.state_group == context.prev_group: @@ -225,8 +263,13 @@ class RulesForRoom(object): # results. ret_rules_by_user = self.rules_by_user current_state_ids = context.delta_ids + + push_rules_delta_state_cache_metric.inc_hits() else: current_state_ids = context.current_state_ids + push_rules_delta_state_cache_metric.inc_misses() + + push_rules_state_size_counter.inc_by(len(current_state_ids)) logger.debug( "Looking for member changes in %r %r", state_group, current_state_ids @@ -273,6 +316,14 @@ class RulesForRoom(object): yield self._update_rules_with_member_event_ids( ret_rules_by_user, missing_member_event_ids, state_group, event ) + else: + # The push rules didn't change but lets update the cache anyway + self.update_cache( + self.sequence, + members={}, # There were no membership changes + rules_by_user=ret_rules_by_user, + state_group=state_group + ) if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -371,6 +422,7 @@ class RulesForRoom(object): self.state_group = object() self.member_map = {} self.rules_by_user = {} + push_rules_invalidation_counter.inc() def update_cache(self, sequence, members, rules_by_user, state_group): if sequence == self.sequence: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 8a5d473108..62c41cd9db 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -244,6 +244,26 @@ class HttpPusher(object): @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): + if self.data.get('format') == 'event_id_only': + d = { + 'notification': { + 'event_id': event.event_id, + 'room_id': event.room_id, + 'counts': { + 'unread': badge, + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + } + ] + } + } + defer.returnValue(d) + ctx = yield push_tools.get_context_for_event( self.store, self.state_handler, event, self.user_id ) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index ed7f1c89ad..630e92c90e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -31,7 +31,7 @@ REQUIREMENTS = { "pyyaml": ["yaml"], "pyasn1": ["pyasn1"], "daemonize": ["daemonize"], - "py-bcrypt": ["bcrypt"], + "bcrypt": ["bcrypt"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], "ujson": ["ujson"], @@ -40,6 +40,7 @@ REQUIREMENTS = { "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], + "affinity": ["affinity"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 7d786e8de3..465b25033d 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -168,7 +168,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): DEFAULT_MESSAGE = ( "Sharing illegal content on this server is not permitted and rooms in" - " violatation will be blocked." + " violation will be blocked." ) def __init__(self, hs): @@ -296,7 +296,7 @@ class QuarantineMediaInRoom(ClientV1RestServlet): class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/reset_password/ @user:to_reset_password?access_token=admin_access_token @@ -319,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): """Post request to allow an administrator reset password for a user. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. """ UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) @@ -343,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): class GetUsersPaginatedRestServlet(ClientV1RestServlet): """Get request to get specific number of users from Synapse. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/ @admin:user?access_token=admin_access_token&start=0&limit=10 @@ -362,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, target_user_id): """Get request to get specific number of users from Synapse. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. """ target_user = UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) @@ -395,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): """Post request to get specific number of users from Synapse.. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/ @admin:user?access_token=admin_access_token @@ -433,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): class SearchUsersRestServlet(ClientV1RestServlet): """Get request to search user table for specific users according to search term. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/search_users/ @admin:user?access_token=admin_access_token&term=alice @@ -453,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet): def on_GET(self, request, target_user_id): """Get request to search user table for specific users according to search term. - This need a user have a administrator access in Synapse. + This needs user to have a administrator access in Synapse. """ target_user = UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 6a3cfe84f8..943e87e7fd 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet): user_id = requester.user.to_string() - changed = yield self.device_handler.get_user_ids_changed( + results = yield self.device_handler.get_user_ids_changed( user_id, from_token, ) - defer.returnValue((200, { - "changed": list(changed), - })) + defer.returnValue((200, results)) class OneTimeKeyServlet(RestServlet): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 5f208a4c1c..a1e0e53b33 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet): filter_id = parse_string(request, "filter", default=None) full_state = parse_boolean(request, "full_state", default=False) - logger.info( + logger.debug( "/sync: user=%r, timeout=%r, since=%r," " set_presence=%r, filter_id=%r, device_id=%r" % ( user, timeout, since, set_presence, filter_id, device_id @@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet): "account_data": {"events": sync_result.account_data}, "to_device": {"events": sync_result.to_device}, "device_lists": { - "changed": list(sync_result.device_lists), + "changed": list(sync_result.device_lists.changed), + "left": list(sync_result.device_lists.left), }, "presence": SyncRestServlet.encode_presence( sync_result.presence, time_now diff --git a/synapse/visibility.py b/synapse/visibility.py index 5590b866ed..d7dbdc77ff 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -43,7 +43,8 @@ MEMBERSHIP_PRIORITY = ( @defer.inlineCallbacks -def filter_events_for_clients(store, user_tuples, events, event_id_to_state): +def filter_events_for_clients(store, user_tuples, events, event_id_to_state, + always_include_ids=frozenset()): """ Returns dict of user_id -> list of events that user is allowed to see. @@ -54,6 +55,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): * the user has not been a member of the room since the given events events ([synapse.events.EventBase]): list of events to filter + always_include_ids (set(event_id)): set of event ids to specifically + include (unless sender is ignored) """ forgotten = yield preserve_context_over_deferred(defer.gatherResults([ defer.maybeDeferred( @@ -91,6 +94,9 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): if not event.is_state() and event.sender in ignore_list: return False + if event.event_id in always_include_ids: + return True + state = event_id_to_state[event.event_id] # get the room_visibility at the time of the event. @@ -189,7 +195,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks -def filter_events_for_client(store, user_id, events, is_peeking=False): +def filter_events_for_client(store, user_id, events, is_peeking=False, + always_include_ids=frozenset()): """ Check which events a user is allowed to see @@ -213,6 +220,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False): types=types ) res = yield filter_events_for_clients( - store, [(user_id, is_peeking)], events, event_id_to_state + store, [(user_id, is_peeking)], events, event_id_to_state, + always_include_ids=always_include_ids, ) defer.returnValue(res.get(user_id, [])) |