diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index e3da45b416..72858cca1f 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -519,6 +519,14 @@ class Auth(object):
)
def is_server_admin(self, user):
+ """ Check if the given user is a local server admin.
+
+ Args:
+ user (str): mxid of user to check
+
+ Returns:
+ bool: True if the user is an admin
+ """
return self.store.is_server_admin(user)
@defer.inlineCallbacks
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
new file mode 100644
index 0000000000..cd0e815919
--- /dev/null
+++ b/synapse/app/_base.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import gc
+import logging
+
+import affinity
+from daemonize import Daemonize
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+from twisted.internet import reactor
+
+
+def start_worker_reactor(appname, config):
+ """ Run the reactor in the main process
+
+ Daemonizes if necessary, and then configures some resources, before starting
+ the reactor. Pulls configuration from the 'worker' settings in 'config'.
+
+ Args:
+ appname (str): application name which will be sent to syslog
+ config (synapse.config.Config): config object
+ """
+
+ logger = logging.getLogger(config.worker_app)
+
+ start_reactor(
+ appname,
+ config.soft_file_limit,
+ config.gc_thresholds,
+ config.worker_pid_file,
+ config.worker_daemonize,
+ config.worker_cpu_affinity,
+ logger,
+ )
+
+
+def start_reactor(
+ appname,
+ soft_file_limit,
+ gc_thresholds,
+ pid_file,
+ daemonize,
+ cpu_affinity,
+ logger,
+):
+ """ Run the reactor in the main process
+
+ Daemonizes if necessary, and then configures some resources, before starting
+ the reactor
+
+ Args:
+ appname (str): application name which will be sent to syslog
+ soft_file_limit (int):
+ gc_thresholds:
+ pid_file (str): name of pid file to write to if daemonize is True
+ daemonize (bool): true to run the reactor in a background process
+ cpu_affinity (int|None): cpu affinity mask
+ logger (logging.Logger): logger instance to pass to Daemonize
+ """
+
+ def run():
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
+ logger.info("Running")
+ if cpu_affinity is not None:
+ logger.info("Setting CPU affinity to %s" % cpu_affinity)
+ affinity.set_process_affinity_mask(0, cpu_affinity)
+ change_resource_limit(soft_file_limit)
+ if gc_thresholds:
+ gc.set_threshold(*gc_thresholds)
+ reactor.run()
+
+ if daemonize:
+ daemon = Daemonize(
+ app=appname,
+ pid=pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a476efa63..ba2657bbad 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -13,38 +13,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.appservice")
@@ -181,36 +174,13 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-appservice",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-appservice", config)
if __name__ == '__main__':
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 09bc1935f1..129cfa901f 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -13,47 +13,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.client_reader")
@@ -183,36 +175,13 @@ def start(config_options):
ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-client-reader",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-client-reader", config)
if __name__ == '__main__':
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index eb392e1c9d..40cebe6f4a 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -13,44 +13,36 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.federation_reader")
@@ -172,36 +164,13 @@ def start(config_options):
ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-federation-reader",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-federation-reader", config)
if __name__ == '__main__':
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 03327dc47a..389e3909d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -13,44 +13,37 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
from synapse.federation import send_queue
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.federation_sender")
@@ -213,36 +206,12 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
-
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-federation-sender",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-federation-sender", config)
class FederationSenderHandler(object):
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 132f18a979..bee4c47498 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -13,48 +13,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
+from synapse import events
+from synapse.api.errors import SynapseError
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.servlet import (
+ RestServlet, parse_json_object_from_request,
+)
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-from synapse.api.errors import SynapseError
-from synapse.http.servlet import (
- RestServlet, parse_json_object_from_request,
-)
-from synapse.rest.client.v2_alpha._base import client_v2_patterns
-
-from synapse import events
-
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
-
logger = logging.getLogger("synapse.app.frontend_proxy")
@@ -234,36 +225,13 @@ def start(config_options):
ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-frontend-proxy",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-frontend-proxy", config)
if __name__ == '__main__':
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 081e7cce59..84ad8f04a0 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -13,61 +13,48 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import synapse
-
import gc
import logging
import os
import sys
+import synapse
import synapse.config.logger
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
+ LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
+ STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.app import _base
from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
- check_requirements, CONDITIONAL_REQUIREMENTS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
-
-from twisted.internet import reactor, defer
-from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
-from twisted.web.server import GzipEncoderFactory
-from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
-from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.api.urls import (
- FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
- SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
- SERVER_KEY_V2_PREFIX,
-)
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.http.server import RootRedirect
+from synapse.http.site import SynapseSite
from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
+ check_requirements
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.federation.transport.server import TransportLayerServer
-
-from synapse.util.rlimit import change_resource_limit
-from synapse.util.versionstring import get_version_string
+from synapse.rest import ClientRestResource
+from synapse.rest.key.v1.server_key_resource import LocalKey
+from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
logger = logging.getLogger("synapse.app.homeserver")
@@ -446,37 +433,18 @@ def run(hs):
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
- def in_thread():
- # Uncomment to enable tracing of log context changes.
- # sys.settrace(logcontext_tracer)
-
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- change_resource_limit(hs.config.soft_file_limit)
- if hs.config.gc_thresholds:
- gc.set_threshold(*hs.config.gc_thresholds)
- reactor.run()
-
- if hs.config.daemonize:
-
- if hs.config.print_pidfile:
- print (hs.config.pid_file)
-
- daemon = Daemonize(
- app="synapse-homeserver",
- pid=hs.config.pid_file,
- action=lambda: in_thread(),
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
-
- daemon.start()
- else:
- in_thread()
+ if hs.config.daemonize and hs.config.print_pidfile:
+ print (hs.config.pid_file)
+
+ _base.start_reactor(
+ "synapse-homeserver",
+ hs.config.soft_file_limit,
+ hs.config.gc_thresholds,
+ hs.config.pid_file,
+ hs.config.daemonize,
+ hs.config.cpu_affinity,
+ logger,
+ )
def main():
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index f57ec784fe..36c18bdbcb 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -13,14 +13,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
+from synapse import events
+from synapse.api.urls import (
+ CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -33,27 +40,12 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.api.urls import (
- CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.media_repository")
@@ -180,36 +172,13 @@ def start(config_options):
ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-media-repository",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-media-repository", config)
if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f9114acfcb..db9a4d16f4 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -13,41 +13,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.roommember import RoomMemberStore
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
-from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.storage.engines import create_engine
+from synapse.server import HomeServer
from synapse.storage import DataStore
+from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
- PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.pusher")
@@ -244,18 +236,6 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
@@ -263,18 +243,7 @@ def start(config_options):
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-pusher",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-pusher", config)
if __name__ == '__main__':
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4bdd99a966..80e4ba5336 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -13,56 +13,50 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
+import logging
+import sys
import synapse
-
from synapse.api.constants import EventTypes
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler, get_interested_parties
-from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.rest.client.v2_alpha import sync
-from synapse.rest.client.v1 import events
-from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
-from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
-from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import contextlib
-import gc
-
logger = logging.getLogger("synapse.app.synchrotron")
@@ -440,36 +434,13 @@ def start(config_options):
ss.setup()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_datastore().start_profiling()
ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-synchrotron",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-synchrotron", config)
if __name__ == '__main__':
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 8c6300db9d..be661a70c7 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -14,16 +14,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse
+import logging
+import sys
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v2_alpha import user_directory
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from synapse import events
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.user_dir")
@@ -233,36 +227,13 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-user-dir",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-user-dir", config)
if __name__ == '__main__':
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 28b4e5f50c..c9a1715f1f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ class ServerConfig(Config):
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.public_baseurl = config.get("public_baseurl")
+ self.cpu_affinity = config.get("cpu_affinity")
# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
@@ -41,6 +43,12 @@ class ServerConfig(Config):
self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
+ # Whether we should block invites sent to users on this server
+ # (other than those sent by local server admins)
+ self.block_non_admin_invites = config.get(
+ "block_non_admin_invites", False,
+ )
+
if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/':
self.public_baseurl += '/'
@@ -147,6 +155,27 @@ class ServerConfig(Config):
# When running as a daemon, the file to store the pid in
pid_file: %(pid_file)s
+ # CPU affinity mask. Setting this restricts the CPUs on which the
+ # process will be scheduled. It is represented as a bitmask, with the
+ # lowest order bit corresponding to the first logical CPU and the
+ # highest order bit corresponding to the last logical CPU. Not all CPUs
+ # may exist on a given system but a mask may specify more CPUs than are
+ # present.
+ #
+ # For example:
+ # 0x00000001 is processor #0,
+ # 0x00000003 is processors #0 and #1,
+ # 0xFFFFFFFF is all processors (#0 through #31).
+ #
+ # Pinning a Python process to a single CPU is desirable, because Python
+ # is inherently single-threaded due to the GIL, and can suffer a
+ # 30-40%% slowdown due to cache blow-out and thread context switching
+ # if the scheduler happens to schedule the underlying threads across
+ # different cores. See
+ # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/.
+ #
+ # cpu_affinity: 0xFFFFFFFF
+
# Whether to serve a web client from the HTTP/HTTPS root resource.
web_client: True
@@ -171,6 +200,10 @@ class ServerConfig(Config):
# and sync operations. The default value is -1, means no upper limit.
# filter_timeline_limit: 5000
+ # Whether room invites to users on this server should be blocked
+ # (except those sent by local server admins). The default is False.
+ # block_non_admin_invites: True
+
# List of ports that Synapse should listen on, their purpose and their
# configuration.
listeners:
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 99d5d8aaeb..c5a5a8919c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -33,6 +33,7 @@ class WorkerConfig(Config):
self.worker_name = config.get("worker_name", self.worker_app)
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+ self.worker_cpu_affinity = config.get("worker_cpu_affinity")
if self.worker_listeners:
for listener in self.worker_listeners:
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index c2bd64d6c2..f1fd488b90 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -13,14 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from synapse.util import logcontext
from twisted.web.http import HTTPClient
from twisted.internet.protocol import Factory
from twisted.internet import defer, reactor
from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.logcontext import (
- preserve_context_over_fn, preserve_context_over_deferred
-)
import simplejson as json
import logging
@@ -43,14 +40,10 @@ def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
for i in range(5):
try:
- protocol = yield preserve_context_over_fn(
- endpoint.connect, factory
- )
- server_response, server_certificate = yield preserve_context_over_deferred(
- protocol.remote_key
- )
- defer.returnValue((server_response, server_certificate))
- return
+ with logcontext.PreserveLoggingContext():
+ protocol = yield endpoint.connect(factory)
+ server_response, server_certificate = yield protocol.remote_key
+ defer.returnValue((server_response, server_certificate))
except SynapseKeyClientError as e:
logger.exception("Error getting key for %r" % (server_name,))
if e.status.startswith("4"):
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index c900f4d6df..054bac456d 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,10 +16,9 @@
from synapse.crypto.keyclient import fetch_server_key
from synapse.api.errors import SynapseError, Codes
-from synapse.util import unwrapFirstError
-from synapse.util.async import ObservableDeferred
+from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import (
- preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
+ PreserveLoggingContext,
preserve_fn
)
from synapse.util.metrics import Measure
@@ -57,7 +57,8 @@ Attributes:
json_object(dict): The JSON object to verify.
deferred(twisted.internet.defer.Deferred):
A deferred (server_name, key_id, verify_key) tuple that resolves when
- a verify key has been fetched
+ a verify key has been fetched. The deferreds' callbacks are run with no
+ logcontext.
"""
@@ -74,23 +75,32 @@ class Keyring(object):
self.perspective_servers = self.config.perspectives
self.hs = hs
+ # map from server name to Deferred. Has an entry for each server with
+ # an ongoing key download; the Deferred completes once the download
+ # completes.
+ #
+ # These are regular, logcontext-agnostic Deferreds.
self.key_downloads = {}
def verify_json_for_server(self, server_name, json_object):
- return self.verify_json_objects_for_server(
- [(server_name, json_object)]
- )[0]
+ return logcontext.make_deferred_yieldable(
+ self.verify_json_objects_for_server(
+ [(server_name, json_object)]
+ )[0]
+ )
def verify_json_objects_for_server(self, server_and_json):
- """Bulk verfies signatures of json objects, bulk fetching keys as
+ """Bulk verifies signatures of json objects, bulk fetching keys as
necessary.
Args:
server_and_json (list): List of pairs of (server_name, json_object)
Returns:
- list of deferreds indicating success or failure to verify each
- json object's signature for the given server_name.
+ List<Deferred>: for each input pair, a deferred indicating success
+ or failure to verify each json object's signature for the given
+ server_name. The deferreds run their callbacks in the sentinel
+ logcontext.
"""
verify_requests = []
@@ -117,93 +127,71 @@ class Keyring(object):
verify_requests.append(verify_request)
- @defer.inlineCallbacks
- def handle_key_deferred(verify_request):
- server_name = verify_request.server_name
- try:
- _, key_id, verify_key = yield verify_request.deferred
- except IOError as e:
- logger.warn(
- "Got IOError when downloading keys for %s: %s %s",
- server_name, type(e).__name__, str(e.message),
- )
- raise SynapseError(
- 502,
- "Error downloading keys for %s" % (server_name,),
- Codes.UNAUTHORIZED,
- )
- except Exception as e:
- logger.exception(
- "Got Exception when downloading keys for %s: %s %s",
- server_name, type(e).__name__, str(e.message),
- )
- raise SynapseError(
- 401,
- "No key for %s with id %s" % (server_name, key_ids),
- Codes.UNAUTHORIZED,
- )
+ preserve_fn(self._start_key_lookups)(verify_requests)
+
+ # Pass those keys to handle_key_deferred so that the json object
+ # signatures can be verified
+ handle = preserve_fn(_handle_key_deferred)
+ return [
+ handle(rq) for rq in verify_requests
+ ]
- json_object = verify_request.json_object
+ @defer.inlineCallbacks
+ def _start_key_lookups(self, verify_requests):
+ """Sets off the key fetches for each verify request
- logger.debug("Got key %s %s:%s for server %s, verifying" % (
- key_id, verify_key.alg, verify_key.version, server_name,
- ))
- try:
- verify_signed_json(json_object, server_name, verify_key)
- except:
- raise SynapseError(
- 401,
- "Invalid signature for server %s with key %s:%s" % (
- server_name, verify_key.alg, verify_key.version
- ),
- Codes.UNAUTHORIZED,
- )
+ Once each fetch completes, verify_request.deferred will be resolved.
+
+ Args:
+ verify_requests (List[VerifyKeyRequest]):
+ """
+ # create a deferred for each server we're going to look up the keys
+ # for; we'll resolve them once we have completed our lookups.
+ # These will be passed into wait_for_previous_lookups to block
+ # any other lookups until we have finished.
+ # The deferreds are called with no logcontext.
server_to_deferred = {
- server_name: defer.Deferred()
- for server_name, _ in server_and_json
+ rq.server_name: defer.Deferred()
+ for rq in verify_requests
}
- with PreserveLoggingContext():
+ # We want to wait for any previous lookups to complete before
+ # proceeding.
+ yield self.wait_for_previous_lookups(
+ [rq.server_name for rq in verify_requests],
+ server_to_deferred,
+ )
- # We want to wait for any previous lookups to complete before
- # proceeding.
- wait_on_deferred = self.wait_for_previous_lookups(
- [server_name for server_name, _ in server_and_json],
- server_to_deferred,
- )
+ # Actually start fetching keys.
+ self._get_server_verify_keys(verify_requests)
- # Actually start fetching keys.
- wait_on_deferred.addBoth(
- lambda _: self.get_server_verify_keys(verify_requests)
- )
+ # When we've finished fetching all the keys for a given server_name,
+ # resolve the deferred passed to `wait_for_previous_lookups` so that
+ # any lookups waiting will proceed.
+ #
+ # map from server name to a set of request ids
+ server_to_request_ids = {}
- # When we've finished fetching all the keys for a given server_name,
- # resolve the deferred passed to `wait_for_previous_lookups` so that
- # any lookups waiting will proceed.
- server_to_request_ids = {}
-
- def remove_deferreds(res, server_name, verify_request):
- request_id = id(verify_request)
- server_to_request_ids[server_name].discard(request_id)
- if not server_to_request_ids[server_name]:
- d = server_to_deferred.pop(server_name, None)
- if d:
- d.callback(None)
- return res
-
- for verify_request in verify_requests:
- server_name = verify_request.server_name
- request_id = id(verify_request)
- server_to_request_ids.setdefault(server_name, set()).add(request_id)
- deferred.addBoth(remove_deferreds, server_name, verify_request)
+ for verify_request in verify_requests:
+ server_name = verify_request.server_name
+ request_id = id(verify_request)
+ server_to_request_ids.setdefault(server_name, set()).add(request_id)
- # Pass those keys to handle_key_deferred so that the json object
- # signatures can be verified
- return [
- preserve_context_over_fn(handle_key_deferred, verify_request)
- for verify_request in verify_requests
- ]
+ def remove_deferreds(res, verify_request):
+ server_name = verify_request.server_name
+ request_id = id(verify_request)
+ server_to_request_ids[server_name].discard(request_id)
+ if not server_to_request_ids[server_name]:
+ d = server_to_deferred.pop(server_name, None)
+ if d:
+ d.callback(None)
+ return res
+
+ for verify_request in verify_requests:
+ verify_request.deferred.addBoth(
+ remove_deferreds, verify_request,
+ )
@defer.inlineCallbacks
def wait_for_previous_lookups(self, server_names, server_to_deferred):
@@ -212,7 +200,13 @@ class Keyring(object):
Args:
server_names (list): list of server_names we want to lookup
server_to_deferred (dict): server_name to deferred which gets
- resolved once we've finished looking up keys for that server
+ resolved once we've finished looking up keys for that server.
+ The Deferreds should be regular twisted ones which call their
+ callbacks with no logcontext.
+
+ Returns: a Deferred which resolves once all key lookups for the given
+ servers have completed. Follows the synapse rules of logcontext
+ preservation.
"""
while True:
wait_on = [
@@ -226,17 +220,15 @@ class Keyring(object):
else:
break
- for server_name, deferred in server_to_deferred.items():
- d = ObservableDeferred(preserve_context_over_deferred(deferred))
- self.key_downloads[server_name] = d
-
- def rm(r, server_name):
- self.key_downloads.pop(server_name, None)
- return r
+ def rm(r, server_name_):
+ self.key_downloads.pop(server_name_, None)
+ return r
- d.addBoth(rm, server_name)
+ for server_name, deferred in server_to_deferred.items():
+ self.key_downloads[server_name] = deferred
+ deferred.addBoth(rm, server_name)
- def get_server_verify_keys(self, verify_requests):
+ def _get_server_verify_keys(self, verify_requests):
"""Tries to find at least one key for each verify request
For each verify_request, verify_request.deferred is called back with
@@ -305,21 +297,23 @@ class Keyring(object):
if not missing_keys:
break
- for verify_request in requests_missing_keys:
- verify_request.deferred.errback(SynapseError(
- 401,
- "No key for %s with id %s" % (
- verify_request.server_name, verify_request.key_ids,
- ),
- Codes.UNAUTHORIZED,
- ))
+ with PreserveLoggingContext():
+ for verify_request in requests_missing_keys:
+ verify_request.deferred.errback(SynapseError(
+ 401,
+ "No key for %s with id %s" % (
+ verify_request.server_name, verify_request.key_ids,
+ ),
+ Codes.UNAUTHORIZED,
+ ))
def on_err(err):
- for verify_request in verify_requests:
- if not verify_request.deferred.called:
- verify_request.deferred.errback(err)
+ with PreserveLoggingContext():
+ for verify_request in verify_requests:
+ if not verify_request.deferred.called:
+ verify_request.deferred.errback(err)
- do_iterations().addErrback(on_err)
+ preserve_fn(do_iterations)().addErrback(on_err)
@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
@@ -333,7 +327,7 @@ class Keyring(object):
Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
server_name -> key_id -> VerifyKey
"""
- res = yield preserve_context_over_deferred(defer.gatherResults(
+ res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.get_server_verify_keys)(
server_name, key_ids
@@ -341,7 +335,7 @@ class Keyring(object):
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
defer.returnValue(dict(res))
@@ -362,13 +356,13 @@ class Keyring(object):
)
defer.returnValue({})
- results = yield preserve_context_over_deferred(defer.gatherResults(
+ results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
union_of_keys = {}
for result in results:
@@ -402,13 +396,13 @@ class Keyring(object):
defer.returnValue(keys)
- results = yield preserve_context_over_deferred(defer.gatherResults(
+ results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
merged = {}
for result in results:
@@ -485,7 +479,7 @@ class Keyring(object):
for server_name, response_keys in processed_response.items():
keys.setdefault(server_name, {}).update(response_keys)
- yield preserve_context_over_deferred(defer.gatherResults(
+ yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
server_name=server_name,
@@ -495,7 +489,7 @@ class Keyring(object):
for server_name, response_keys in keys.items()
],
consumeErrors=True
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
defer.returnValue(keys)
@@ -543,7 +537,7 @@ class Keyring(object):
keys.update(response_keys)
- yield preserve_context_over_deferred(defer.gatherResults(
+ yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
server_name=key_server_name,
@@ -553,7 +547,7 @@ class Keyring(object):
for key_server_name, verify_keys in keys.items()
],
consumeErrors=True
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
defer.returnValue(keys)
@@ -619,7 +613,7 @@ class Keyring(object):
response_keys.update(verify_keys)
response_keys.update(old_verify_keys)
- yield preserve_context_over_deferred(defer.gatherResults(
+ yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_keys_json)(
server_name=server_name,
@@ -632,7 +626,7 @@ class Keyring(object):
for key_id in updated_key_ids
],
consumeErrors=True,
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
results[server_name] = response_keys
@@ -710,7 +704,6 @@ class Keyring(object):
defer.returnValue(verify_keys)
- @defer.inlineCallbacks
def store_keys(self, server_name, from_server, verify_keys):
"""Store a collection of verify keys for a given server
Args:
@@ -721,7 +714,7 @@ class Keyring(object):
A deferred that completes when the keys are stored.
"""
# TODO(markjh): Store whether the keys have expired.
- yield preserve_context_over_deferred(defer.gatherResults(
+ return logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_verify_key)(
server_name, server_name, key.time_added, key
@@ -729,4 +722,48 @@ class Keyring(object):
for key_id, key in verify_keys.items()
],
consumeErrors=True,
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
+
+
+@defer.inlineCallbacks
+def _handle_key_deferred(verify_request):
+ server_name = verify_request.server_name
+ try:
+ with PreserveLoggingContext():
+ _, key_id, verify_key = yield verify_request.deferred
+ except IOError as e:
+ logger.warn(
+ "Got IOError when downloading keys for %s: %s %s",
+ server_name, type(e).__name__, str(e.message),
+ )
+ raise SynapseError(
+ 502,
+ "Error downloading keys for %s" % (server_name,),
+ Codes.UNAUTHORIZED,
+ )
+ except Exception as e:
+ logger.exception(
+ "Got Exception when downloading keys for %s: %s %s",
+ server_name, type(e).__name__, str(e.message),
+ )
+ raise SynapseError(
+ 401,
+ "No key for %s with id %s" % (server_name, verify_request.key_ids),
+ Codes.UNAUTHORIZED,
+ )
+
+ json_object = verify_request.json_object
+
+ logger.debug("Got key %s %s:%s for server %s, verifying" % (
+ key_id, verify_key.alg, verify_key.version, server_name,
+ ))
+ try:
+ verify_signed_json(json_object, server_name, verify_key)
+ except:
+ raise SynapseError(
+ 401,
+ "Invalid signature for server %s with key %s:%s" % (
+ server_name, verify_key.alg, verify_key.version
+ ),
+ Codes.UNAUTHORIZED,
+ )
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
new file mode 100644
index 0000000000..56fa9e556e
--- /dev/null
+++ b/synapse/events/spamcheck.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def check_event_for_spam(event):
+ """Checks if a given event is considered "spammy" by this server.
+
+ If the server considers an event spammy, then it will be rejected if
+ sent by a local user. If it is sent by a user on another server, then
+ users receive a blank event.
+
+ Args:
+ event (synapse.events.EventBase): the event to be checked
+
+ Returns:
+ bool: True if the event is spammy.
+ """
+ if not hasattr(event, "content") or "body" not in event.content:
+ return False
+
+ # for example:
+ #
+ # if "the third flower is green" in event.content["body"]:
+ # return True
+
+ return False
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 2339cc9034..babd9ea078 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -12,21 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
-from twisted.internet import defer
-
-from synapse.events.utils import prune_event
-
-from synapse.crypto.event_signing import check_event_content_hash
-
-from synapse.api.errors import SynapseError
-
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-
import logging
+from synapse.api.errors import SynapseError
+from synapse.crypto.event_signing import check_event_content_hash
+from synapse.events import spamcheck
+from synapse.events.utils import prune_event
+from synapse.util import unwrapFirstError, logcontext
+from twisted.internet import defer
logger = logging.getLogger(__name__)
@@ -57,56 +50,52 @@ class FederationBase(object):
"""
deferreds = self._check_sigs_and_hashes(pdus)
- def callback(pdu):
- return pdu
+ @defer.inlineCallbacks
+ def handle_check_result(pdu, deferred):
+ try:
+ res = yield logcontext.make_deferred_yieldable(deferred)
+ except SynapseError:
+ res = None
- def errback(failure, pdu):
- failure.trap(SynapseError)
- return None
-
- def try_local_db(res, pdu):
if not res:
# Check local db.
- return self.store.get_event(
+ res = yield self.store.get_event(
pdu.event_id,
allow_rejected=True,
allow_none=True,
)
- return res
- def try_remote(res, pdu):
if not res and pdu.origin != origin:
- return self.get_pdu(
- destinations=[pdu.origin],
- event_id=pdu.event_id,
- outlier=outlier,
- timeout=10000,
- ).addErrback(lambda e: None)
- return res
-
- def warn(res, pdu):
+ try:
+ res = yield self.get_pdu(
+ destinations=[pdu.origin],
+ event_id=pdu.event_id,
+ outlier=outlier,
+ timeout=10000,
+ )
+ except SynapseError:
+ pass
+
if not res:
logger.warn(
"Failed to find copy of %s with valid signature",
pdu.event_id,
)
- return res
- for pdu, deferred in zip(pdus, deferreds):
- deferred.addCallbacks(
- callback, errback, errbackArgs=[pdu]
- ).addCallback(
- try_local_db, pdu
- ).addCallback(
- try_remote, pdu
- ).addCallback(
- warn, pdu
- )
+ defer.returnValue(res)
- valid_pdus = yield preserve_context_over_deferred(defer.gatherResults(
- deferreds,
- consumeErrors=True
- )).addErrback(unwrapFirstError)
+ handle = logcontext.preserve_fn(handle_check_result)
+ deferreds2 = [
+ handle(pdu, deferred)
+ for pdu, deferred in zip(pdus, deferreds)
+ ]
+
+ valid_pdus = yield logcontext.make_deferred_yieldable(
+ defer.gatherResults(
+ deferreds2,
+ consumeErrors=True,
+ )
+ ).addErrback(unwrapFirstError)
if include_none:
defer.returnValue(valid_pdus)
@@ -114,15 +103,24 @@ class FederationBase(object):
defer.returnValue([p for p in valid_pdus if p])
def _check_sigs_and_hash(self, pdu):
- return self._check_sigs_and_hashes([pdu])[0]
+ return logcontext.make_deferred_yieldable(
+ self._check_sigs_and_hashes([pdu])[0],
+ )
def _check_sigs_and_hashes(self, pdus):
- """Throws a SynapseError if a PDU does not have the correct
- signatures.
+ """Checks that each of the received events is correctly signed by the
+ sending server.
+
+ Args:
+ pdus (list[FrozenEvent]): the events to be checked
Returns:
- FrozenEvent: Either the given event or it redacted if it failed the
- content hash check.
+ list[Deferred]: for each input event, a deferred which:
+ * returns the original event if the checks pass
+ * returns a redacted version of the event (if the signature
+ matched but the hash did not)
+ * throws a SynapseError if the signature check failed.
+ The deferreds run their callbacks in the sentinel logcontext.
"""
redacted_pdus = [
@@ -130,26 +128,38 @@ class FederationBase(object):
for pdu in pdus
]
- deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([
+ deferreds = self.keyring.verify_json_objects_for_server([
(p.origin, p.get_pdu_json())
for p in redacted_pdus
])
+ ctx = logcontext.LoggingContext.current_context()
+
def callback(_, pdu, redacted):
- if not check_event_content_hash(pdu):
- logger.warn(
- "Event content has been tampered, redacting %s: %s",
- pdu.event_id, pdu.get_pdu_json()
- )
- return redacted
- return pdu
+ with logcontext.PreserveLoggingContext(ctx):
+ if not check_event_content_hash(pdu):
+ logger.warn(
+ "Event content has been tampered, redacting %s: %s",
+ pdu.event_id, pdu.get_pdu_json()
+ )
+ return redacted
+
+ if spamcheck.check_event_for_spam(pdu):
+ logger.warn(
+ "Event contains spam, redacting %s: %s",
+ pdu.event_id, pdu.get_pdu_json()
+ )
+ return redacted
+
+ return pdu
def errback(failure, pdu):
failure.trap(SynapseError)
- logger.warn(
- "Signature check failed for %s",
- pdu.event_id,
- )
+ with logcontext.PreserveLoggingContext(ctx):
+ logger.warn(
+ "Signature check failed for %s",
+ pdu.event_id,
+ )
return failure
for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 861441708b..7c5e5d957f 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -22,7 +22,7 @@ from synapse.api.constants import Membership
from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
-from synapse.util import unwrapFirstError
+from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@@ -189,10 +189,10 @@ class FederationClient(FederationBase):
]
# FIXME: We should handle signature failures more gracefully.
- pdus[:] = yield preserve_context_over_deferred(defer.gatherResults(
+ pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults(
self._check_sigs_and_hashes(pdus),
consumeErrors=True,
- )).addErrback(unwrapFirstError)
+ ).addErrback(unwrapFirstError))
defer.returnValue(pdus)
@@ -252,7 +252,7 @@ class FederationClient(FederationBase):
pdu = pdu_list[0]
# Check signatures are correct.
- signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
+ signed_pdu = yield self._check_sigs_and_hash(pdu)
break
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ed60d494ff..dac4b3f4e0 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler):
user_id (str)
from_token (StreamToken)
"""
+ now_token = yield self.hs.get_event_sources().get_current_token()
+
room_ids = yield self.store.get_rooms_for_user(user_id)
# First we check if any devices have changed
@@ -280,11 +282,30 @@ class DeviceHandler(BaseHandler):
# Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+ member_events = yield self.store.get_membership_changes_for_user(
+ user_id, from_token.room_key, now_token.room_key
+ )
+ rooms_changed.update(event.room_id for event in member_events)
+
stream_ordering = RoomStreamToken.parse_stream_token(
- from_token.room_key).stream
+ from_token.room_key
+ ).stream
possibly_changed = set(changed)
+ possibly_left = set()
for room_id in rooms_changed:
+ current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+ # The user may have left the room
+ # TODO: Check if they actually did or if we were just invited.
+ if room_id not in room_ids:
+ for key, event_id in current_state_ids.iteritems():
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_left.add(state_key)
+ continue
+
# Fetch the current state at the time.
try:
event_ids = yield self.store.get_forward_extremeties_for_room(
@@ -295,8 +316,6 @@ class DeviceHandler(BaseHandler):
# ordering: treat it the same as a new room
event_ids = []
- current_state_ids = yield self.store.get_current_state_ids(room_id)
-
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
@@ -307,9 +326,25 @@ class DeviceHandler(BaseHandler):
possibly_changed.add(state_key)
continue
+ current_member_id = current_state_ids.get((EventTypes.Member, user_id))
+ if not current_member_id:
+ continue
+
# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+ # Check if we've joined the room? If so we just blindly add all the users to
+ # the "possibly changed" users.
+ for state_dict in prev_state_ids.itervalues():
+ member_event = state_dict.get((EventTypes.Member, user_id), None)
+ if not member_event or member_event != current_member_id:
+ for key, event_id in current_state_ids.iteritems():
+ etype, state_key = key
+ if etype != EventTypes.Member:
+ continue
+ possibly_changed.add(state_key)
+ break
+
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
@@ -320,19 +355,30 @@ class DeviceHandler(BaseHandler):
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
- for state_dict in prev_state_ids.values():
+ for state_dict in prev_state_ids.itervalues():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id:
- possibly_changed.add(state_key)
+ if state_key != user_id:
+ possibly_changed.add(state_key)
break
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
+ if possibly_changed or possibly_left:
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
- # Take the intersection of the users whose devices may have changed
- # and those that actually still share a room with the user
- defer.returnValue(users_who_share_room & possibly_changed)
+ # Take the intersection of the users whose devices may have changed
+ # and those that actually still share a room with the user
+ possibly_joined = possibly_changed & users_who_share_room
+ possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+ else:
+ possibly_joined = []
+ possibly_left = []
+
+ defer.returnValue({
+ "changed": list(possibly_joined),
+ "left": list(possibly_left),
+ })
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b790a7c2ef..18f87cad67 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1074,6 +1074,9 @@ class FederationHandler(BaseHandler):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
+ if self.hs.config.block_non_admin_invites:
+ raise SynapseError(403, "This server does not accept room invites")
+
membership = event.content.get("membership")
if event.type != EventTypes.Member or membership != Membership.INVITE:
raise SynapseError(400, "The event was not an m.room.member invite event")
@@ -1606,7 +1609,7 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
- if event.type == EventTypes.GuestAccess:
+ if event.type == EventTypes.GuestAccess and not context.rejected:
yield self.maybe_kick_guest_users(event)
defer.returnValue(context)
@@ -2090,6 +2093,14 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+ """Handle an exchange_third_party_invite request from a remote server
+
+ The remote server will call this when it wants to turn a 3pid invite
+ into a normal m.room.member invite.
+
+ Returns:
+ Deferred: resolves (to None)
+ """
builder = self.event_builder_factory.new(event_dict)
message_handler = self.hs.get_handlers().message_handler
@@ -2108,9 +2119,12 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
+ # XXX we send the invite here, but send_membership_event also sends it,
+ # so we end up making two requests. I think this is redundant.
returned_invite = yield self.send_invite(origin, event)
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
+
member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.send_membership_event(None, event, context)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index be4f123c54..da18bf23db 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from synapse.events import spamcheck
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
@@ -321,6 +321,12 @@ class MessageHandler(BaseHandler):
token_id=requester.access_token_id,
txn_id=txn_id
)
+
+ if spamcheck.check_event_for_spam(event):
+ raise SynapseError(
+ 403, "Spam is not permitted here", Codes.FORBIDDEN
+ )
+
yield self.send_nonmember_event(
requester,
event,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b3f979b246..9a498c2d3e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -191,6 +191,8 @@ class RoomMemberHandler(BaseHandler):
if action in ["kick", "unban"]:
effective_membership_state = "leave"
+ # if this is a join with a 3pid signature, we may need to turn a 3pid
+ # invite into a normal invite before we can handle the join.
if third_party_signed is not None:
replication = self.hs.get_replication_layer()
yield replication.exchange_third_party_invite(
@@ -208,6 +210,16 @@ class RoomMemberHandler(BaseHandler):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
+ if (effective_membership_state == "invite" and
+ self.hs.config.block_non_admin_invites):
+ is_requester_admin = yield self.auth.is_server_admin(
+ requester.user,
+ )
+ if not is_requester_admin:
+ raise SynapseError(
+ 403, "Invites have been disabled on this server",
+ )
+
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
@@ -471,6 +483,16 @@ class RoomMemberHandler(BaseHandler):
requester,
txn_id
):
+ if self.hs.config.block_non_admin_invites:
+ is_requester_admin = yield self.auth.is_server_admin(
+ requester.user,
+ )
+ if not is_requester_admin:
+ raise SynapseError(
+ 403, "Invites have been disabled on this server",
+ Codes.FORBIDDEN,
+ )
+
invitee = yield self._lookup_3pid(
id_server, medium, address
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e6df1819b9..af1b527840 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -108,6 +108,16 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
return True
+class DeviceLists(collections.namedtuple("DeviceLists", [
+ "changed", # list of user_ids whose devices may have changed
+ "left", # list of user_ids whose devices we no longer track
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ return bool(self.changed or self.left)
+
+
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
@@ -283,6 +293,11 @@ class SyncHandler(object):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
+ # Pull out the current state, as we always want to include those events
+ # in the timeline if they're there.
+ current_state_ids = yield self.state.get_current_state_ids(room_id)
+ current_state_ids = frozenset(current_state_ids.itervalues())
+
if recents is None or newly_joined_room or timeline_limit < len(recents):
limited = True
else:
@@ -294,6 +309,7 @@ class SyncHandler(object):
self.store,
sync_config.user.to_string(),
recents,
+ always_include_ids=current_state_ids,
)
else:
recents = []
@@ -329,6 +345,7 @@ class SyncHandler(object):
self.store,
sync_config.user.to_string(),
loaded_recents,
+ always_include_ids=current_state_ids,
)
loaded_recents.extend(recents)
recents = loaded_recents
@@ -535,7 +552,8 @@ class SyncHandler(object):
res = yield self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)
- newly_joined_rooms, newly_joined_users = res
+ newly_joined_rooms, newly_joined_users, _, _ = res
+ _, _, newly_left_rooms, newly_left_users = res
block_all_presence_data = (
since_token is None and
@@ -549,7 +567,11 @@ class SyncHandler(object):
yield self._generate_sync_entry_for_to_device(sync_result_builder)
device_lists = yield self._generate_sync_entry_for_device_list(
- sync_result_builder
+ sync_result_builder,
+ newly_joined_rooms=newly_joined_rooms,
+ newly_joined_users=newly_joined_users,
+ newly_left_rooms=newly_left_rooms,
+ newly_left_users=newly_left_users,
)
device_id = sync_config.device_id
@@ -574,7 +596,9 @@ class SyncHandler(object):
@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
- def _generate_sync_entry_for_device_list(self, sync_result_builder):
+ def _generate_sync_entry_for_device_list(self, sync_result_builder,
+ newly_joined_rooms, newly_joined_users,
+ newly_left_rooms, newly_left_users):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
@@ -582,16 +606,40 @@ class SyncHandler(object):
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
)
- if not changed:
- defer.returnValue([])
+
+ # TODO: Be more clever than this, i.e. remove users who we already
+ # share a room with?
+ for room_id in newly_joined_rooms:
+ joined_users = yield self.state.get_current_user_in_room(room_id)
+ newly_joined_users.update(joined_users)
+
+ for room_id in newly_left_rooms:
+ left_users = yield self.state.get_current_user_in_room(room_id)
+ newly_left_users.update(left_users)
+
+ # TODO: Check that these users are actually new, i.e. either they
+ # weren't in the previous sync *or* they left and rejoined.
+ changed.update(newly_joined_users)
+
+ if not changed and not newly_left_users:
+ defer.returnValue(DeviceLists(
+ changed=[],
+ left=newly_left_users,
+ ))
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
- defer.returnValue(users_who_share_room & changed)
+ defer.returnValue(DeviceLists(
+ changed=users_who_share_room & changed,
+ left=set(newly_left_users) - users_who_share_room,
+ ))
else:
- defer.returnValue([])
+ defer.returnValue(DeviceLists(
+ changed=[],
+ left=[],
+ ))
@defer.inlineCallbacks
def _generate_sync_entry_for_to_device(self, sync_result_builder):
@@ -755,8 +803,8 @@ class SyncHandler(object):
account_data_by_room(dict): Dictionary of per room account data
Returns:
- Deferred(tuple): Returns a 2-tuple of
- `(newly_joined_rooms, newly_joined_users)`
+ Deferred(tuple): Returns a 4-tuple of
+ `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
@@ -787,7 +835,7 @@ class SyncHandler(object):
)
if not tags_by_room:
logger.debug("no-oping sync")
- defer.returnValue(([], []))
+ defer.returnValue(([], [], [], []))
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id,
@@ -800,7 +848,7 @@ class SyncHandler(object):
if since_token:
res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
- room_entries, invited, newly_joined_rooms = res
+ room_entries, invited, newly_joined_rooms, newly_left_rooms = res
tags_by_room = yield self.store.get_updated_tags(
user_id, since_token.account_data_key,
@@ -808,6 +856,7 @@ class SyncHandler(object):
else:
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
room_entries, invited, newly_joined_rooms = res
+ newly_left_rooms = []
tags_by_room = yield self.store.get_tags_for_user(user_id)
@@ -828,17 +877,30 @@ class SyncHandler(object):
# Now we want to get any newly joined users
newly_joined_users = set()
+ newly_left_users = set()
if since_token:
for joined_sync in sync_result_builder.joined:
it = itertools.chain(
- joined_sync.timeline.events, joined_sync.state.values()
+ joined_sync.timeline.events, joined_sync.state.itervalues()
)
for event in it:
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
newly_joined_users.add(event.state_key)
-
- defer.returnValue((newly_joined_rooms, newly_joined_users))
+ else:
+ prev_content = event.unsigned.get("prev_content", {})
+ prev_membership = prev_content.get("membership", None)
+ if prev_membership == Membership.JOIN:
+ newly_left_users.add(event.state_key)
+
+ newly_left_users -= newly_joined_users
+
+ defer.returnValue((
+ newly_joined_rooms,
+ newly_joined_users,
+ newly_left_rooms,
+ newly_left_users,
+ ))
@defer.inlineCallbacks
def _have_rooms_changed(self, sync_result_builder):
@@ -908,15 +970,28 @@ class SyncHandler(object):
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
newly_joined_rooms = []
+ newly_left_rooms = []
room_entries = []
invited = []
- for room_id, events in mem_change_events_by_room_id.items():
+ for room_id, events in mem_change_events_by_room_id.iteritems():
non_joins = [e for e in events if e.membership != Membership.JOIN]
has_join = len(non_joins) != len(events)
# We want to figure out if we joined the room at some point since
# the last sync (even if we have since left). This is to make sure
# we do send down the room, and with full state, where necessary
+
+ old_state_ids = None
+ if room_id in joined_room_ids and non_joins:
+ # Always include if the user (re)joined the room, especially
+ # important so that device list changes are calculated correctly.
+ # If there are non join member events, but we are still in the room,
+ # then the user must have left and joined
+ newly_joined_rooms.append(room_id)
+
+ # User is in the room so we don't need to do the invite/leave checks
+ continue
+
if room_id in joined_room_ids or has_join:
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
@@ -928,12 +1003,33 @@ class SyncHandler(object):
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
- if room_id in joined_room_ids:
- continue
+ # If user is in the room then we don't need to do the invite/leave checks
+ if room_id in joined_room_ids:
+ continue
if not non_joins:
continue
+ # Check if we have left the room. This can either be because we were
+ # joined before *or* that we since joined and then left.
+ if events[-1].membership != Membership.JOIN:
+ if has_join:
+ newly_left_rooms.append(room_id)
+ else:
+ if not old_state_ids:
+ old_state_ids = yield self.get_state_at(room_id, since_token)
+ old_mem_ev_id = old_state_ids.get(
+ (EventTypes.Member, user_id),
+ None,
+ )
+ old_mem_ev = None
+ if old_mem_ev_id:
+ old_mem_ev = yield self.store.get_event(
+ old_mem_ev_id, allow_none=True
+ )
+ if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
+ newly_left_rooms.append(room_id)
+
# Only bother if we're still currently invited
should_invite = non_joins[-1].membership == Membership.INVITE
if should_invite:
@@ -1011,7 +1107,7 @@ class SyncHandler(object):
upto_token=since_token,
))
- defer.returnValue((room_entries, invited, newly_joined_rooms))
+ defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
@defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1259,6 +1355,7 @@ class SyncResultBuilder(object):
self.invited = []
self.archived = []
self.device = []
+ self.to_device = []
class RoomSyncResultBuilder(object):
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index d8923c9abb..241b17f2cb 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import socket
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
@@ -30,7 +31,10 @@ logger = logging.getLogger(__name__)
SERVER_CACHE = {}
-
+# our record of an individual server which can be tried to reach a destination.
+#
+# "host" is actually a dotted-quad or ipv6 address string. Except when there's
+# no SRV record, in which case it is the original hostname.
_Server = collections.namedtuple(
"_Server", "priority weight host port expires"
)
@@ -219,9 +223,10 @@ class SRVClientEndpoint(object):
return self.default_server
else:
raise ConnectError(
- "Not server available for %s" % self.service_name
+ "No server available for %s" % self.service_name
)
+ # look for all servers with the same priority
min_priority = self.servers[0].priority
weight_indexes = list(
(index, server.weight + 1)
@@ -231,11 +236,22 @@ class SRVClientEndpoint(object):
total_weight = sum(weight for index, weight in weight_indexes)
target_weight = random.randint(0, total_weight)
-
for index, weight in weight_indexes:
target_weight -= weight
if target_weight <= 0:
server = self.servers[index]
+ # XXX: this looks totally dubious:
+ #
+ # (a) we never reuse a server until we have been through
+ # all of the servers at the same priority, so if the
+ # weights are A: 100, B:1, we always do ABABAB instead of
+ # AAAA...AAAB (approximately).
+ #
+ # (b) After using all the servers at the lowest priority,
+ # we move onto the next priority. We should only use the
+ # second priority if servers at the top priority are
+ # unreachable.
+ #
del self.servers[index]
self.used_servers.append(server)
return server
@@ -280,26 +296,21 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
continue
payload = answer.payload
- host = str(payload.target)
- srv_ttl = answer.ttl
- try:
- answers, _, _ = yield dns_client.lookupAddress(host)
- except DNSNameError:
- continue
+ hosts = yield _get_hosts_for_srv_record(
+ dns_client, str(payload.target)
+ )
- for answer in answers:
- if answer.type == dns.A and answer.payload:
- ip = answer.payload.dottedQuad()
- host_ttl = min(srv_ttl, answer.ttl)
+ for (ip, ttl) in hosts:
+ host_ttl = min(answer.ttl, ttl)
- servers.append(_Server(
- host=ip,
- port=int(payload.port),
- priority=int(payload.priority),
- weight=int(payload.weight),
- expires=int(clock.time()) + host_ttl,
- ))
+ servers.append(_Server(
+ host=ip,
+ port=int(payload.port),
+ priority=int(payload.priority),
+ weight=int(payload.weight),
+ expires=int(clock.time()) + host_ttl,
+ ))
servers.sort()
cache[service_name] = list(servers)
@@ -317,3 +328,68 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
raise e
defer.returnValue(servers)
+
+
+@defer.inlineCallbacks
+def _get_hosts_for_srv_record(dns_client, host):
+ """Look up each of the hosts in a SRV record
+
+ Args:
+ dns_client (twisted.names.dns.IResolver):
+ host (basestring): host to look up
+
+ Returns:
+ Deferred[list[(str, int)]]: a list of (host, ttl) pairs
+
+ """
+ ip4_servers = []
+ ip6_servers = []
+
+ def cb(res):
+ # lookupAddress and lookupIP6Address return a three-tuple
+ # giving the answer, authority, and additional sections of the
+ # response.
+ #
+ # we only care about the answers.
+
+ return res[0]
+
+ def eb(res):
+ res.trap(DNSNameError)
+ return []
+
+ # no logcontexts here, so we can safely fire these off and gatherResults
+ d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
+ d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
+ results = yield defer.gatherResults([d1, d2], consumeErrors=True)
+
+ for result in results:
+ for answer in result:
+ if not answer.payload:
+ continue
+
+ try:
+ if answer.type == dns.A:
+ ip = answer.payload.dottedQuad()
+ ip4_servers.append((ip, answer.ttl))
+ elif answer.type == dns.AAAA:
+ ip = socket.inet_ntop(
+ socket.AF_INET6, answer.payload.address,
+ )
+ ip6_servers.append((ip, answer.ttl))
+ else:
+ # the most likely candidate here is a CNAME record.
+ # rfc2782 says srvs may not point to aliases.
+ logger.warn(
+ "Ignoring unexpected DNS record type %s for %s",
+ answer.type, host,
+ )
+ continue
+ except Exception as e:
+ logger.warn("Ignoring invalid DNS response for %s: %s",
+ host, e)
+ continue
+
+ # keep the ipv4 results before the ipv6 results, mostly to match historical
+ # behaviour.
+ defer.returnValue(ip4_servers + ip6_servers)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 8a5d473108..62c41cd9db 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -244,6 +244,26 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _build_notification_dict(self, event, tweaks, badge):
+ if self.data.get('format') == 'event_id_only':
+ d = {
+ 'notification': {
+ 'event_id': event.event_id,
+ 'room_id': event.room_id,
+ 'counts': {
+ 'unread': badge,
+ },
+ 'devices': [
+ {
+ 'app_id': self.app_id,
+ 'pushkey': self.pushkey,
+ 'pushkey_ts': long(self.pushkey_ts / 1000),
+ 'data': self.data_minus_url,
+ }
+ ]
+ }
+ }
+ defer.returnValue(d)
+
ctx = yield push_tools.get_context_for_event(
self.store, self.state_handler, event, self.user_id
)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index ed7f1c89ad..630e92c90e 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -31,7 +31,7 @@ REQUIREMENTS = {
"pyyaml": ["yaml"],
"pyasn1": ["pyasn1"],
"daemonize": ["daemonize"],
- "py-bcrypt": ["bcrypt"],
+ "bcrypt": ["bcrypt"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
"ujson": ["ujson"],
@@ -40,6 +40,7 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
+ "affinity": ["affinity"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 7d786e8de3..465b25033d 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -168,7 +168,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
- " violatation will be blocked."
+ " violation will be blocked."
)
def __init__(self, hs):
@@ -296,7 +296,7 @@ class QuarantineMediaInRoom(ClientV1RestServlet):
class ResetPasswordRestServlet(ClientV1RestServlet):
"""Post request to allow an administrator reset password for a user.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
@user:to_reset_password?access_token=admin_access_token
@@ -319,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
"""Post request to allow an administrator reset password for a user.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
"""
UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
@@ -343,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
class GetUsersPaginatedRestServlet(ClientV1RestServlet):
"""Get request to get specific number of users from Synapse.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
@admin:user?access_token=admin_access_token&start=0&limit=10
@@ -362,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, target_user_id):
"""Get request to get specific number of users from Synapse.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
"""
target_user = UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
@@ -395,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
"""Post request to get specific number of users from Synapse..
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
@admin:user?access_token=admin_access_token
@@ -433,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
class SearchUsersRestServlet(ClientV1RestServlet):
"""Get request to search user table for specific users according to
search term.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/search_users/
@admin:user?access_token=admin_access_token&term=alice
@@ -453,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
def on_GET(self, request, target_user_id):
"""Get request to search user table for specific users according to
search term.
- This need a user have a administrator access in Synapse.
+ This needs user to have a administrator access in Synapse.
"""
target_user = UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6a3cfe84f8..943e87e7fd 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet):
user_id = requester.user.to_string()
- changed = yield self.device_handler.get_user_ids_changed(
+ results = yield self.device_handler.get_user_ids_changed(
user_id, from_token,
)
- defer.returnValue((200, {
- "changed": list(changed),
- }))
+ defer.returnValue((200, results))
class OneTimeKeyServlet(RestServlet):
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 6dcc407451..978af9c280 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet):
filter_id = parse_string(request, "filter", default=None)
full_state = parse_boolean(request, "full_state", default=False)
- logger.info(
+ logger.debug(
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, filter_id=%r, device_id=%r" % (
user, timeout, since, set_presence, filter_id, device_id
@@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet):
"account_data": {"events": sync_result.account_data},
"to_device": {"events": sync_result.to_device},
"device_lists": {
- "changed": list(sync_result.device_lists),
+ "changed": list(sync_result.device_lists.changed),
+ "left": list(sync_result.device_lists.left),
},
"presence": SyncRestServlet.encode_presence(
sync_result.presence, time_now
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 3b5e0a4fb9..87aeaf71d6 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -113,30 +113,37 @@ class KeyStore(SQLBaseStore):
keys[key_id] = key
defer.returnValue(keys)
- @defer.inlineCallbacks
def store_server_verify_key(self, server_name, from_server, time_now_ms,
verify_key):
"""Stores a NACL verification key for the given server.
Args:
server_name (str): The name of the server.
- key_id (str): The version of the key for the server.
from_server (str): Where the verification key was looked up
- ts_now_ms (int): The time now in milliseconds
- verification_key (VerifyKey): The NACL verify key.
+ time_now_ms (int): The time now in milliseconds
+ verify_key (nacl.signing.VerifyKey): The NACL verify key.
"""
- yield self._simple_upsert(
- table="server_signature_keys",
- keyvalues={
- "server_name": server_name,
- "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
- },
- values={
- "from_server": from_server,
- "ts_added_ms": time_now_ms,
- "verify_key": buffer(verify_key.encode()),
- },
- desc="store_server_verify_key",
- )
+ key_id = "%s:%s" % (verify_key.alg, verify_key.version)
+
+ def _txn(txn):
+ self._simple_upsert_txn(
+ txn,
+ table="server_signature_keys",
+ keyvalues={
+ "server_name": server_name,
+ "key_id": key_id,
+ },
+ values={
+ "from_server": from_server,
+ "ts_added_ms": time_now_ms,
+ "verify_key": buffer(verify_key.encode()),
+ },
+ )
+ txn.call_after(
+ self._get_server_verify_key.invalidate,
+ (server_name, key_id)
+ )
+
+ return self.runInteraction("store_server_verify_key", _txn)
def store_server_keys_json(self, server_name, key_id, from_server,
ts_now_ms, ts_expires_ms, key_json_bytes):
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 5590b866ed..d7dbdc77ff 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -43,7 +43,8 @@ MEMBERSHIP_PRIORITY = (
@defer.inlineCallbacks
-def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state,
+ always_include_ids=frozenset()):
""" Returns dict of user_id -> list of events that user is allowed to
see.
@@ -54,6 +55,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
* the user has not been a member of the room since the
given events
events ([synapse.events.EventBase]): list of events to filter
+ always_include_ids (set(event_id)): set of event ids to specifically
+ include (unless sender is ignored)
"""
forgotten = yield preserve_context_over_deferred(defer.gatherResults([
defer.maybeDeferred(
@@ -91,6 +94,9 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
if not event.is_state() and event.sender in ignore_list:
return False
+ if event.event_id in always_include_ids:
+ return True
+
state = event_id_to_state[event.event_id]
# get the room_visibility at the time of the event.
@@ -189,7 +195,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
@defer.inlineCallbacks
-def filter_events_for_client(store, user_id, events, is_peeking=False):
+def filter_events_for_client(store, user_id, events, is_peeking=False,
+ always_include_ids=frozenset()):
"""
Check which events a user is allowed to see
@@ -213,6 +220,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False):
types=types
)
res = yield filter_events_for_clients(
- store, [(user_id, is_peeking)], events, event_id_to_state
+ store, [(user_id, is_peeking)], events, event_id_to_state,
+ always_include_ids=always_include_ids,
)
defer.returnValue(res.get(user_id, []))
|