diff --git a/synapse/__init__.py b/synapse/__init__.py
index 2a40bab3f0..dbf22eca00 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.21.0"
+__version__ = "0.22.1"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 0c297cb022..e3da45b416 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -23,7 +23,8 @@ from synapse import event_auth
from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.api.errors import AuthError, Codes
from synapse.types import UserID
-from synapse.util import logcontext
+from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -39,6 +40,10 @@ AuthEventTypes = (
GUEST_DEVICE_ID = "guest_device"
+class _InvalidMacaroonException(Exception):
+ pass
+
+
class Auth(object):
"""
FIXME: This class contains a mix of functions for authenticating users
@@ -51,6 +56,9 @@ class Auth(object):
self.state = hs.get_state_handler()
self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
+ self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
+ register_cache("token_cache", self.token_cache)
+
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
auth_events_ids = yield self.compute_auth_events(
@@ -200,8 +208,8 @@ class Auth(object):
default=[""]
)[0]
if user and access_token and ip_addr:
- logcontext.preserve_fn(self.store.insert_client_ip)(
- user=user,
+ self.store.insert_client_ip(
+ user_id=user.to_string(),
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
@@ -267,8 +275,8 @@ class Auth(object):
AuthError if no user by that token exists or the token is invalid.
"""
try:
- macaroon = pymacaroons.Macaroon.deserialize(token)
- except Exception: # deserialize can throw more-or-less anything
+ user_id, guest = self._parse_and_validate_macaroon(token, rights)
+ except _InvalidMacaroonException:
# doesn't look like a macaroon: treat it as an opaque token which
# must be in the database.
# TODO: it would be nice to get rid of this, but apparently some
@@ -277,19 +285,8 @@ class Auth(object):
defer.returnValue(r)
try:
- user_id = self.get_user_id_from_macaroon(macaroon)
user = UserID.from_string(user_id)
- self.validate_macaroon(
- macaroon, rights, self.hs.config.expire_access_token,
- user_id=user_id,
- )
-
- guest = False
- for caveat in macaroon.caveats:
- if caveat.caveat_id == "guest = true":
- guest = True
-
if guest:
# Guest access tokens are not stored in the database (there can
# only be one access token per guest, anyway).
@@ -361,6 +358,55 @@ class Auth(object):
errcode=Codes.UNKNOWN_TOKEN
)
+ def _parse_and_validate_macaroon(self, token, rights="access"):
+ """Takes a macaroon and tries to parse and validate it. This is cached
+ if and only if rights == access and there isn't an expiry.
+
+ On invalid macaroon raises _InvalidMacaroonException
+
+ Returns:
+ (user_id, is_guest)
+ """
+ if rights == "access":
+ cached = self.token_cache.get(token, None)
+ if cached:
+ return cached
+
+ try:
+ macaroon = pymacaroons.Macaroon.deserialize(token)
+ except Exception: # deserialize can throw more-or-less anything
+ # doesn't look like a macaroon: treat it as an opaque token which
+ # must be in the database.
+ # TODO: it would be nice to get rid of this, but apparently some
+ # people use access tokens which aren't macaroons
+ raise _InvalidMacaroonException()
+
+ try:
+ user_id = self.get_user_id_from_macaroon(macaroon)
+
+ has_expiry = False
+ guest = False
+ for caveat in macaroon.caveats:
+ if caveat.caveat_id.startswith("time "):
+ has_expiry = True
+ elif caveat.caveat_id == "guest = true":
+ guest = True
+
+ self.validate_macaroon(
+ macaroon, rights, self.hs.config.expire_access_token,
+ user_id=user_id,
+ )
+ except (pymacaroons.exceptions.MacaroonException, TypeError, ValueError):
+ raise AuthError(
+ self.TOKEN_NOT_FOUND_HTTP_STATUS, "Invalid macaroon passed.",
+ errcode=Codes.UNKNOWN_TOKEN
+ )
+
+ if not has_expiry and rights == "access":
+ self.token_cache[token] = (user_id, guest)
+
+ return user_id, guest
+
def get_user_id_from_macaroon(self, macaroon):
"""Retrieve the user_id given by the caveats on the macaroon.
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
new file mode 100644
index 0000000000..cd0e815919
--- /dev/null
+++ b/synapse/app/_base.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import gc
+import logging
+
+import affinity
+from daemonize import Daemonize
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+from twisted.internet import reactor
+
+
+def start_worker_reactor(appname, config):
+ """ Run the reactor in the main process
+
+ Daemonizes if necessary, and then configures some resources, before starting
+ the reactor. Pulls configuration from the 'worker' settings in 'config'.
+
+ Args:
+ appname (str): application name which will be sent to syslog
+ config (synapse.config.Config): config object
+ """
+
+ logger = logging.getLogger(config.worker_app)
+
+ start_reactor(
+ appname,
+ config.soft_file_limit,
+ config.gc_thresholds,
+ config.worker_pid_file,
+ config.worker_daemonize,
+ config.worker_cpu_affinity,
+ logger,
+ )
+
+
+def start_reactor(
+ appname,
+ soft_file_limit,
+ gc_thresholds,
+ pid_file,
+ daemonize,
+ cpu_affinity,
+ logger,
+):
+ """ Run the reactor in the main process
+
+ Daemonizes if necessary, and then configures some resources, before starting
+ the reactor
+
+ Args:
+ appname (str): application name which will be sent to syslog
+ soft_file_limit (int):
+ gc_thresholds:
+ pid_file (str): name of pid file to write to if daemonize is True
+ daemonize (bool): true to run the reactor in a background process
+ cpu_affinity (int|None): cpu affinity mask
+ logger (logging.Logger): logger instance to pass to Daemonize
+ """
+
+ def run():
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
+ logger.info("Running")
+ if cpu_affinity is not None:
+ logger.info("Setting CPU affinity to %s" % cpu_affinity)
+ affinity.set_process_affinity_mask(0, cpu_affinity)
+ change_resource_limit(soft_file_limit)
+ if gc_thresholds:
+ gc.set_threshold(*gc_thresholds)
+ reactor.run()
+
+ if daemonize:
+ daemon = Daemonize(
+ app=appname,
+ pid=pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a476efa63..ba2657bbad 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -13,38 +13,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.appservice")
@@ -181,36 +174,13 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-appservice",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-appservice", config)
if __name__ == '__main__':
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 9b72c649ac..129cfa901f 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -13,47 +13,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.client_reader")
@@ -65,8 +57,8 @@ class ClientReaderSlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
TransactionStore,
+ SlavedClientIpStore,
BaseSlavedStore,
- ClientIpStore, # After BaseSlavedStore because the constructor is different
):
pass
@@ -183,36 +175,13 @@ def start(config_options):
ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-client-reader",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-client-reader", config)
if __name__ == '__main__':
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index eb392e1c9d..40cebe6f4a 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -13,44 +13,36 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.federation_reader")
@@ -172,36 +164,13 @@ def start(config_options):
ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-federation-reader",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-federation-reader", config)
if __name__ == '__main__':
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 03327dc47a..389e3909d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -13,44 +13,37 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
from synapse.federation import send_queue
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.federation_sender")
@@ -213,36 +206,12 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
-
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-federation-sender",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-federation-sender", config)
class FederationSenderHandler(object):
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
new file mode 100644
index 0000000000..bee4c47498
--- /dev/null
+++ b/synapse/app/frontend_proxy.py
@@ -0,0 +1,239 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+
+import synapse
+from synapse import events
+from synapse.api.errors import SynapseError
+from synapse.app import _base
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.http.server import JsonResource
+from synapse.http.servlet import (
+ RestServlet, parse_json_object_from_request,
+)
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.versionstring import get_version_string
+from twisted.internet import defer, reactor
+from twisted.web.resource import Resource
+
+logger = logging.getLogger("synapse.app.frontend_proxy")
+
+
+class KeyUploadServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$",
+ releases=())
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(KeyUploadServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.http_client = hs.get_simple_http_client()
+ self.main_uri = hs.config.worker_main_http_uri
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, device_id):
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+ user_id = requester.user.to_string()
+ body = parse_json_object_from_request(request)
+
+ if device_id is not None:
+ # passing the device_id here is deprecated; however, we allow it
+ # for now for compatibility with older clients.
+ if (requester.device_id is not None and
+ device_id != requester.device_id):
+ logger.warning("Client uploading keys for a different device "
+ "(logged in as %s, uploading for %s)",
+ requester.device_id, device_id)
+ else:
+ device_id = requester.device_id
+
+ if device_id is None:
+ raise SynapseError(
+ 400,
+ "To upload keys, you must pass device_id when authenticating"
+ )
+
+ if body:
+ # They're actually trying to upload something, proxy to main synapse.
+ result = yield self.http_client.post_json_get_json(
+ self.main_uri + request.uri,
+ body,
+ )
+
+ defer.returnValue((200, result))
+ else:
+ # Just interested in counts.
+ result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+ defer.returnValue((200, {"one_time_key_counts": result}))
+
+
+class FrontendProxySlavedStore(
+ SlavedDeviceStore,
+ SlavedClientIpStore,
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ BaseSlavedStore,
+):
+ pass
+
+
+class FrontendProxyServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_addresses = listener_config["bind_addresses"]
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ KeyUploadServlet(self).register(resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+
+ for address in bind_addresses:
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=address
+ )
+
+ logger.info("Synapse client reader now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ bind_addresses = listener["bind_addresses"]
+
+ for address in bind_addresses:
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=address
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return ReplicationClientHandler(self.get_datastore())
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse frontend proxy", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.frontend_proxy"
+
+ assert config.worker_main_http_uri is not None
+
+ setup_logging(config, use_worker_options=True)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ss = FrontendProxyServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+ ss.get_handlers()
+ ss.start_listening(config.worker_listeners)
+
+ def start():
+ ss.get_state_handler().start_caching()
+ ss.get_datastore().start_profiling()
+
+ reactor.callWhenRunning(start)
+
+ _base.start_worker_reactor("synapse-frontend-proxy", config)
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 081e7cce59..84ad8f04a0 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -13,61 +13,48 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import synapse
-
import gc
import logging
import os
import sys
+import synapse
import synapse.config.logger
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
+ LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
+ STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.app import _base
from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
- check_requirements, CONDITIONAL_REQUIREMENTS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
-
-from twisted.internet import reactor, defer
-from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
-from twisted.web.server import GzipEncoderFactory
-from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
-from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.api.urls import (
- FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
- SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
- SERVER_KEY_V2_PREFIX,
-)
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.http.server import RootRedirect
+from synapse.http.site import SynapseSite
from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
+ check_requirements
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.federation.transport.server import TransportLayerServer
-
-from synapse.util.rlimit import change_resource_limit
-from synapse.util.versionstring import get_version_string
+from synapse.rest import ClientRestResource
+from synapse.rest.key.v1.server_key_resource import LocalKey
+from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
logger = logging.getLogger("synapse.app.homeserver")
@@ -446,37 +433,18 @@ def run(hs):
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
- def in_thread():
- # Uncomment to enable tracing of log context changes.
- # sys.settrace(logcontext_tracer)
-
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- change_resource_limit(hs.config.soft_file_limit)
- if hs.config.gc_thresholds:
- gc.set_threshold(*hs.config.gc_thresholds)
- reactor.run()
-
- if hs.config.daemonize:
-
- if hs.config.print_pidfile:
- print (hs.config.pid_file)
-
- daemon = Daemonize(
- app="synapse-homeserver",
- pid=hs.config.pid_file,
- action=lambda: in_thread(),
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
-
- daemon.start()
- else:
- in_thread()
+ if hs.config.daemonize and hs.config.print_pidfile:
+ print (hs.config.pid_file)
+
+ _base.start_reactor(
+ "synapse-homeserver",
+ hs.config.soft_file_limit,
+ hs.config.gc_thresholds,
+ hs.config.pid_file,
+ hs.config.daemonize,
+ hs.config.cpu_affinity,
+ logger,
+ )
def main():
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 26c4416956..36c18bdbcb 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -13,57 +13,49 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
+from synapse import events
+from synapse.api.urls import (
+ CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.api.urls import (
- CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.media_repository")
class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
+ SlavedClientIpStore,
TransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
- ClientIpStore,
):
pass
@@ -180,36 +172,13 @@ def start(config_options):
ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-media-repository",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-media-repository", config)
if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f9114acfcb..db9a4d16f4 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -13,41 +13,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.roommember import RoomMemberStore
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
-from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.storage.engines import create_engine
+from synapse.server import HomeServer
from synapse.storage import DataStore
+from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
- PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.pusher")
@@ -244,18 +236,6 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
@@ -263,18 +243,7 @@ def start(config_options):
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-pusher",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-pusher", config)
if __name__ == '__main__':
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 13c00ef2ba..80e4ba5336 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -13,56 +13,50 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
+import logging
+import sys
import synapse
-
from synapse.api.constants import EventTypes
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler, get_interested_parties
-from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.rest.client.v2_alpha import sync
-from synapse.rest.client.v1 import events
-from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
-from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
-from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import contextlib
-import gc
-
logger = logging.getLogger("synapse.app.synchrotron")
@@ -77,9 +71,9 @@ class SynchrotronSlavedStore(
SlavedPresenceStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
+ SlavedClientIpStore,
RoomStore,
BaseSlavedStore,
- ClientIpStore, # After BaseSlavedStore because the constructor is different
):
who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"]
@@ -440,36 +434,13 @@ def start(config_options):
ss.setup()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_datastore().start_profiling()
ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-synchrotron",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-synchrotron", config)
if __name__ == '__main__':
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 9d8edaa8e3..be661a70c7 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -14,43 +14,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse
+import logging
+import sys
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v2_alpha import user_directory
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
-from synapse.storage.client_ips import ClientIpStore
from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from synapse import events
-
from twisted.internet import reactor
from twisted.web.resource import Resource
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.user_dir")
@@ -58,9 +52,9 @@ class UserDirectorySlaveStore(
SlavedEventStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
+ SlavedClientIpStore,
UserDirectoryStore,
BaseSlavedStore,
- ClientIpStore, # After BaseSlavedStore because the constructor is different
):
def __init__(self, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
@@ -233,36 +227,13 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-user-dir",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-user-dir", config)
if __name__ == '__main__':
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 0f890fc04a..b22cacf8dc 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -33,6 +33,7 @@ from .jwt import JWTConfig
from .password_auth_providers import PasswordAuthProviderConfig
from .emailconfig import EmailConfig
from .workers import WorkerConfig
+from .push import PushConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
@@ -40,7 +41,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
JWTConfig, PasswordConfig, EmailConfig,
- WorkerConfig, PasswordAuthProviderConfig,):
+ WorkerConfig, PasswordAuthProviderConfig, PushConfig,):
pass
diff --git a/synapse/config/push.py b/synapse/config/push.py
new file mode 100644
index 0000000000..9c68318b40
--- /dev/null
+++ b/synapse/config/push.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class PushConfig(Config):
+ def read_config(self, config):
+ self.push_redact_content = False
+
+ push_config = config.get("email", {})
+ self.push_redact_content = push_config.get("redact_content", False)
+
+ def default_config(self, config_dir_path, server_name, **kwargs):
+ return """
+ # Control how push messages are sent to google/apple to notifications.
+ # Normally every message said in a room with one or more people using
+ # mobile devices will be posted to a push server hosted by matrix.org
+ # which is registered with google and apple in order to allow push
+ # notifications to be sent to these mobile devices.
+ #
+ # Setting redact_content to true will make the push messages contain no
+ # message content which will provide increased privacy. This is a
+ # temporary solution pending improvements to Android and iPhone apps
+ # to get content from the app rather than the notification.
+ #
+ # For modern android devices the notification content will still appear
+ # because it is loaded by the app. iPhone, however will send a
+ # notification saying only that a message arrived and who it came from.
+ #
+ #push:
+ # redact_content: false
+ """
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 28b4e5f50c..89d61a0503 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ class ServerConfig(Config):
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.public_baseurl = config.get("public_baseurl")
+ self.cpu_affinity = config.get("cpu_affinity")
# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
@@ -147,6 +149,27 @@ class ServerConfig(Config):
# When running as a daemon, the file to store the pid in
pid_file: %(pid_file)s
+ # CPU affinity mask. Setting this restricts the CPUs on which the
+ # process will be scheduled. It is represented as a bitmask, with the
+ # lowest order bit corresponding to the first logical CPU and the
+ # highest order bit corresponding to the last logical CPU. Not all CPUs
+ # may exist on a given system but a mask may specify more CPUs than are
+ # present.
+ #
+ # For example:
+ # 0x00000001 is processor #0,
+ # 0x00000003 is processors #0 and #1,
+ # 0xFFFFFFFF is all processors (#0 through #31).
+ #
+ # Pinning a Python process to a single CPU is desirable, because Python
+ # is inherently single-threaded due to the GIL, and can suffer a
+ # 30-40%% slowdown due to cache blow-out and thread context switching
+ # if the scheduler happens to schedule the underlying threads across
+ # different cores. See
+ # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/.
+ #
+ # cpu_affinity: 0xFFFFFFFF
+
# Whether to serve a web client from the HTTP/HTTPS root resource.
web_client: True
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ea48d931a1..c5a5a8919c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -32,6 +32,9 @@ class WorkerConfig(Config):
self.worker_replication_port = config.get("worker_replication_port", None)
self.worker_name = config.get("worker_name", self.worker_app)
+ self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+ self.worker_cpu_affinity = config.get("worker_cpu_affinity")
+
if self.worker_listeners:
for listener in self.worker_listeners:
bind_address = listener.pop("bind_address", None)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 3d676e7d8b..a78f01e442 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -153,12 +153,10 @@ class Authenticator(object):
class BaseFederationServlet(object):
REQUIRE_AUTH = True
- def __init__(self, handler, authenticator, ratelimiter, server_name,
- room_list_handler):
+ def __init__(self, handler, authenticator, ratelimiter, server_name):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
- self.room_list_handler = room_list_handler
def _wrap(self, func):
authenticator = self.authenticator
@@ -590,7 +588,7 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
- data = yield self.room_list_handler.get_local_public_room_list(
+ data = yield self.handler.get_local_public_room_list(
limit, since_token,
network_tuple=network_tuple
)
@@ -611,7 +609,7 @@ class FederationVersionServlet(BaseFederationServlet):
}))
-SERVLET_CLASSES = (
+FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
FederationEventServlet,
@@ -634,17 +632,27 @@ SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
OpenIdUserInfo,
- PublicRoomList,
FederationVersionServlet,
)
+ROOM_LIST_CLASSES = (
+ PublicRoomList,
+)
+
def register_servlets(hs, resource, authenticator, ratelimiter):
- for servletclass in SERVLET_CLASSES:
+ for servletclass in FEDERATION_SERVLET_CLASSES:
servletclass(
handler=hs.get_replication_layer(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
- room_list_handler=hs.get_room_list_handler(),
+ ).register(resource)
+
+ for servletclass in ROOM_LIST_CLASSES:
+ servletclass(
+ handler=hs.get_room_list_handler(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
).register(resource)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index e7a1bb7246..b00446bec0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -21,6 +21,7 @@ from synapse.api.constants import LoginType
from synapse.types import UserID
from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
from synapse.util.async import run_on_reactor
+from synapse.util.caches.expiringcache import ExpiringCache
from twisted.web.client import PartialDownloadError
@@ -52,7 +53,15 @@ class AuthHandler(BaseHandler):
LoginType.DUMMY: self._check_dummy_auth,
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
- self.sessions = {}
+
+ # This is not a cache per se, but a store of all current sessions that
+ # expire after N hours
+ self.sessions = ExpiringCache(
+ cache_name="register_sessions",
+ clock=hs.get_clock(),
+ expiry_ms=self.SESSION_EXPIRE_MS,
+ reset_expiry_on_get=True,
+ )
account_handler = _AccountHandler(
hs, check_user_exists=self.check_user_exists
@@ -617,16 +626,6 @@ class AuthHandler(BaseHandler):
logger.debug("Saving session %s", session)
session["last_used"] = self.hs.get_clock().time_msec()
self.sessions[session["id"]] = session
- self._prune_sessions()
-
- def _prune_sessions(self):
- for sid, sess in self.sessions.items():
- last_used = 0
- if 'last_used' in sess:
- last_used = sess['last_used']
- now = self.hs.get_clock().time_msec()
- if last_used < now - AuthHandler.SESSION_EXPIRE_MS:
- del self.sessions[sid]
def hash(self, password):
"""Computes a secure hash of password.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 982cda3edf..ed60d494ff 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler):
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(
- devices=((user_id, device_id) for device_id in device_map.keys())
+ user_id, device_id=None
)
devices = device_map.values()
@@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler):
except errors.StoreError:
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(
- devices=((user_id, device_id),)
+ user_id, device_id,
)
_update_device_from_client_ips(device, ips)
defer.returnValue(device)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 39d2bee8da..b790a7c2ef 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -75,6 +75,8 @@ class FederationHandler(BaseHandler):
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
+ self.is_mine_id = hs.is_mine_id
+ self.pusher_pool = hs.get_pusherpool()
self.replication_layer.set_handler(self)
@@ -1068,6 +1070,24 @@ class FederationHandler(BaseHandler):
"""
event = pdu
+ is_blocked = yield self.store.is_room_blocked(event.room_id)
+ if is_blocked:
+ raise SynapseError(403, "This room has been blocked on this server")
+
+ membership = event.content.get("membership")
+ if event.type != EventTypes.Member or membership != Membership.INVITE:
+ raise SynapseError(400, "The event was not an m.room.member invite event")
+
+ sender_domain = get_domain_from_id(event.sender)
+ if sender_domain != origin:
+ raise SynapseError(400, "The invite event was not from the server sending it")
+
+ if event.state_key is None:
+ raise SynapseError(400, "The invite event did not have a state key")
+
+ if not self.is_mine_id(event.state_key):
+ raise SynapseError(400, "The invite event must be for this server")
+
event.internal_metadata.outlier = True
event.internal_metadata.invite_from_remote = True
@@ -1276,7 +1296,7 @@ class FederationHandler(BaseHandler):
for event in res:
# We sign these again because there was a bug where we
# incorrectly signed things the first time round
- if self.hs.is_mine_id(event.event_id):
+ if self.is_mine_id(event.event_id):
event.signatures.update(
compute_event_signature(
event,
@@ -1349,7 +1369,7 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.hs.is_mine_id(event.event_id):
+ if self.is_mine_id(event.event_id):
# FIXME: This is a temporary work around where we occasionally
# return events slightly differently than when they were
# originally signed
@@ -1393,7 +1413,7 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- if not event.internal_metadata.is_outlier():
+ if not event.internal_metadata.is_outlier() and not backfilled:
yield self.action_generator.handle_push_actions_for_event(
event, context
)
@@ -1407,7 +1427,7 @@ class FederationHandler(BaseHandler):
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a04f634c5c..be4f123c54 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,6 +34,7 @@ from canonicaljson import encode_canonical_json
import logging
import random
+import ujson
logger = logging.getLogger(__name__)
@@ -49,6 +50,8 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock()
+ self.pusher_pool = hs.get_pusherpool()
+
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
@@ -498,6 +501,14 @@ class MessageHandler(BaseHandler):
logger.warn("Denying new event %r because %s", event, err)
raise err
+ # Ensure that we can round trip before trying to persist in db
+ try:
+ dump = ujson.dumps(event.content)
+ ujson.loads(dump)
+ except:
+ logger.exception("Failed to encode content: %r", event.content)
+ raise
+
yield self.maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias:
@@ -601,7 +612,7 @@ class MessageHandler(BaseHandler):
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d2a0d6520a..5698d28088 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -61,7 +61,7 @@ class RoomCreationHandler(BaseHandler):
}
@defer.inlineCallbacks
- def create_room(self, requester, config):
+ def create_room(self, requester, config, ratelimit=True):
""" Creates a new room.
Args:
@@ -75,7 +75,8 @@ class RoomCreationHandler(BaseHandler):
"""
user_id = requester.user.to_string()
- yield self.ratelimit(requester)
+ if ratelimit:
+ yield self.ratelimit(requester)
if "room_alias_name" in config:
for wchar in string.whitespace:
@@ -167,6 +168,7 @@ class RoomCreationHandler(BaseHandler):
initial_state=initial_state,
creation_content=creation_content,
room_alias=room_alias,
+ power_level_content_override=config.get("power_level_content_override", {})
)
if "name" in config:
@@ -245,7 +247,8 @@ class RoomCreationHandler(BaseHandler):
invite_list,
initial_state,
creation_content,
- room_alias
+ room_alias,
+ power_level_content_override,
):
def create(etype, content, **kwargs):
e = {
@@ -291,7 +294,15 @@ class RoomCreationHandler(BaseHandler):
ratelimit=False,
)
- if (EventTypes.PowerLevels, '') not in initial_state:
+ # We treat the power levels override specially as this needs to be one
+ # of the first events that get sent into a room.
+ pl_content = initial_state.pop((EventTypes.PowerLevels, ''), None)
+ if pl_content is not None:
+ yield send(
+ etype=EventTypes.PowerLevels,
+ content=pl_content,
+ )
+ else:
power_level_content = {
"users": {
creator_id: 100,
@@ -316,6 +327,8 @@ class RoomCreationHandler(BaseHandler):
for invitee in invite_list:
power_level_content["users"][invitee] = 100
+ power_level_content.update(power_level_content_override)
+
yield send(
etype=EventTypes.PowerLevels,
content=power_level_content,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 1ca88517a2..b3f979b246 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler):
if not remote_room_hosts:
remote_room_hosts = []
+ if effective_membership_state not in ("leave", "ban",):
+ is_blocked = yield self.store.is_room_blocked(room_id)
+ if is_blocked:
+ raise SynapseError(403, "This room has been blocked on this server")
+
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
@@ -369,6 +374,11 @@ class RoomMemberHandler(BaseHandler):
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
+ if event.membership not in (Membership.LEAVE, Membership.BAN):
+ is_blocked = yield self.store.is_room_blocked(room_id)
+ if is_blocked:
+ raise SynapseError(403, "This room has been blocked on this server")
+
yield message_handler.handle_new_client_event(
requester,
event,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 91c6c6be3c..e6df1819b9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -579,18 +579,17 @@ class SyncHandler(object):
since_token = sync_result_builder.since_token
if since_token and since_token.device_list_key:
- room_ids = yield self.store.get_rooms_for_user(user_id)
-
- user_ids_changed = set()
changed = yield self.store.get_user_whose_devices_changed(
since_token.device_list_key
)
- for other_user_id in changed:
- other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
- if room_ids.intersection(other_room_ids):
- user_ids_changed.add(other_user_id)
+ if not changed:
+ defer.returnValue([])
+
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
- defer.returnValue(user_ids_changed)
+ defer.returnValue(users_who_share_room & changed)
else:
defer.returnValue([])
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d33a20a1f2..2a49456bfc 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -42,6 +42,8 @@ class UserDirectoyHandler(object):
"""
INITIAL_SLEEP_MS = 50
+ INITIAL_SLEEP_COUNT = 100
+ INITIAL_BATCH_SIZE = 100
def __init__(self, hs):
self.store = hs.get_datastore()
@@ -126,6 +128,7 @@ class UserDirectoyHandler(object):
if not deltas:
return
+ logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
@@ -187,9 +190,9 @@ class UserDirectoyHandler(object):
if is_public:
yield self.store.add_users_to_public_room(
room_id,
- user_ids=unhandled_users - self.initially_handled_users_in_public
+ user_ids=user_ids - self.initially_handled_users_in_public
)
- self.initially_handled_users_in_public != unhandled_users
+ self.initially_handled_users_in_public |= user_ids
# We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources.
@@ -198,18 +201,22 @@ class UserDirectoyHandler(object):
to_update = set()
count = 0
for user_id in user_ids:
- if count % 100 == 0:
+ if count % self.INITIAL_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
continue
+ if self.store.get_if_app_services_interested_in_user(user_id):
+ count += 1
+ continue
+
for other_user_id in user_ids:
if user_id == other_user_id:
continue
- if count % 100 == 0:
+ if count % self.INITIAL_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
count += 1
@@ -230,13 +237,13 @@ class UserDirectoyHandler(object):
else:
self.initially_handled_users_share_private_room.add(user_set)
- if len(to_insert) > 100:
+ if len(to_insert) > self.INITIAL_BATCH_SIZE:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
to_insert.clear()
- if len(to_update) > 100:
+ if len(to_update) > self.INITIAL_BATCH_SIZE:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
)
@@ -294,7 +301,7 @@ class UserDirectoyHandler(object):
room_id, self.server_name,
)
if not is_in_room:
- logger.debug("Server left room: %r", room_id)
+ logger.info("Server left room: %r", room_id)
# Fetch all the users that we marked as being in user
# directory due to being in the room and then check if
# need to remove those users or not
@@ -411,8 +418,10 @@ class UserDirectoyHandler(object):
to_insert = set()
to_update = set()
+ is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+
# First, if they're our user then we need to update for every user
- if self.is_mine_id(user_id):
+ if self.is_mine_id(user_id) and not is_appservice:
# Returns a map of other_user_id -> shared_private. We only need
# to update mappings if for users that either don't share a room
# already (aren't in the map) or, if the room is private, those that
@@ -443,7 +452,10 @@ class UserDirectoyHandler(object):
if user_id == other_user_id:
continue
- if self.is_mine_id(other_user_id):
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ other_user_id
+ )
+ if self.is_mine_id(other_user_id) and not is_appservice:
shared_is_private = yield self.store.get_if_users_share_a_room(
other_user_id, user_id,
)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 9a96e6fe8f..b0d64aa6c4 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -19,8 +19,9 @@ from twisted.internet import defer
from .push_rule_evaluator import PushRuleEvaluatorForEvent
-from synapse.visibility import filter_events_for_clients_context
from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import get_metrics_for
+from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
@@ -32,6 +33,23 @@ logger = logging.getLogger(__name__)
rules_by_room = {}
+push_metrics = get_metrics_for(__name__)
+
+push_rules_invalidation_counter = push_metrics.register_counter(
+ "push_rules_invalidation_counter"
+)
+push_rules_state_size_counter = push_metrics.register_counter(
+ "push_rules_state_size_counter"
+)
+
+# Measures whether we use the fast path of using state deltas, or if we have to
+# recalculate from scratch
+push_rules_delta_state_cache_metric = cache_metrics.register_cache(
+ "cache",
+ size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values
+ cache_name="push_rules_delta_state_cache_metric",
+)
+
class BulkPushRuleEvaluator(object):
"""Calculates the outcome of push rules for an event for all users in the
@@ -42,6 +60,12 @@ class BulkPushRuleEvaluator(object):
self.hs = hs
self.store = hs.get_datastore()
+ self.room_push_rule_cache_metrics = cache_metrics.register_cache(
+ "cache",
+ size_callback=lambda: 0, # There's not good value for this
+ cache_name="room_push_rule_cache",
+ )
+
@defer.inlineCallbacks
def _get_rules_for_event(self, event, context):
"""This gets the rules for all users in the room at the time of the event,
@@ -79,7 +103,10 @@ class BulkPushRuleEvaluator(object):
# It's important that RulesForRoom gets added to self._get_rules_for_room.cache
# before any lookup methods get called on it as otherwise there may be
# a race if invalidate_all gets called (which assumes its in the cache)
- return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
+ return RulesForRoom(
+ self.hs, room_id, self._get_rules_for_room.cache,
+ self.room_push_rule_cache_metrics,
+ )
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
@@ -92,15 +119,6 @@ class BulkPushRuleEvaluator(object):
rules_by_user = yield self._get_rules_for_event(event, context)
actions_by_user = {}
- # None of these users can be peeking since this list of users comes
- # from the set of users in the room, so we know for sure they're all
- # actually in the room.
- user_tuples = [(u, False) for u in rules_by_user]
-
- filtered_by_user = yield filter_events_for_clients_context(
- self.store, user_tuples, [event], {event.event_id: context}
- )
-
room_members = yield self.store.get_joined_users_from_context(
event, context
)
@@ -110,6 +128,14 @@ class BulkPushRuleEvaluator(object):
condition_cache = {}
for uid, rules in rules_by_user.iteritems():
+ if event.sender == uid:
+ continue
+
+ if not event.is_state():
+ is_ignored = yield self.store.is_ignored_by(event.sender, uid)
+ if is_ignored:
+ continue
+
display_name = None
profile_info = room_members.get(uid)
if profile_info:
@@ -121,13 +147,6 @@ class BulkPushRuleEvaluator(object):
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
- filtered = filtered_by_user[uid]
- if len(filtered) == 0:
- continue
-
- if filtered[0].sender == uid:
- continue
-
for rule in rules:
if 'enabled' in rule and not rule['enabled']:
continue
@@ -170,17 +189,19 @@ class RulesForRoom(object):
the entire cache for the room.
"""
- def __init__(self, hs, room_id, rules_for_room_cache):
+ def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
"""
Args:
hs (HomeServer)
room_id (str)
rules_for_room_cache(Cache): The cache object that caches these
RoomsForUser objects.
+ room_push_rule_cache_metrics (CacheMetric)
"""
self.room_id = room_id
self.is_mine_id = hs.is_mine_id
self.store = hs.get_datastore()
+ self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
self.linearizer = Linearizer(name="rules_for_room")
@@ -222,11 +243,19 @@ class RulesForRoom(object):
"""
state_group = context.state_group
+ if state_group and self.state_group == state_group:
+ logger.debug("Using cached rules for %r", self.room_id)
+ self.room_push_rule_cache_metrics.inc_hits()
+ defer.returnValue(self.rules_by_user)
+
with (yield self.linearizer.queue(())):
if state_group and self.state_group == state_group:
logger.debug("Using cached rules for %r", self.room_id)
+ self.room_push_rule_cache_metrics.inc_hits()
defer.returnValue(self.rules_by_user)
+ self.room_push_rule_cache_metrics.inc_misses()
+
ret_rules_by_user = {}
missing_member_event_ids = {}
if state_group and self.state_group == context.prev_group:
@@ -234,8 +263,13 @@ class RulesForRoom(object):
# results.
ret_rules_by_user = self.rules_by_user
current_state_ids = context.delta_ids
+
+ push_rules_delta_state_cache_metric.inc_hits()
else:
current_state_ids = context.current_state_ids
+ push_rules_delta_state_cache_metric.inc_misses()
+
+ push_rules_state_size_counter.inc_by(len(current_state_ids))
logger.debug(
"Looking for member changes in %r %r", state_group, current_state_ids
@@ -282,6 +316,14 @@ class RulesForRoom(object):
yield self._update_rules_with_member_event_ids(
ret_rules_by_user, missing_member_event_ids, state_group, event
)
+ else:
+ # The push rules didn't change but lets update the cache anyway
+ self.update_cache(
+ self.sequence,
+ members={}, # There were no membership changes
+ rules_by_user=ret_rules_by_user,
+ state_group=state_group
+ )
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@@ -380,6 +422,7 @@ class RulesForRoom(object):
self.state_group = object()
self.member_map = {}
self.rules_by_user = {}
+ push_rules_invalidation_counter.inc()
def update_cache(self, sequence, members, rules_by_user, state_group):
if sequence == self.sequence:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index c0f8176e3d..8a5d473108 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -275,7 +275,7 @@ class HttpPusher(object):
if event.type == 'm.room.member':
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
- if 'content' in event:
+ if not self.hs.config.push_redact_content and 'content' in event:
d['notification']['content'] = event.content
# We no longer send aliases separately, instead, we send the human
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 4d88046579..172c27c137 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -200,7 +200,9 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
-def _flatten_dict(d, prefix=[], result={}):
+def _flatten_dict(d, prefix=[], result=None):
+ if result is None:
+ result = {}
for key, value in d.items():
if isinstance(value, basestring):
result[".".join(prefix + [key])] = value.lower()
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index a34cfec8f2..630e92c90e 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,6 +40,7 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
+ "affinity": ["affinity"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index a374f2f1a2..0d3f31a50c 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -16,6 +16,7 @@
from ._base import BaseSlavedStore
from synapse.storage import DataStore
from synapse.config.appservice import load_appservices
+from synapse.storage.appservice import _make_exclusive_regex
class SlavedApplicationServiceStore(BaseSlavedStore):
@@ -25,6 +26,7 @@ class SlavedApplicationServiceStore(BaseSlavedStore):
hs.config.server_name,
hs.config.app_service_config_files
)
+ self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
get_app_service_by_token = DataStore.get_app_service_by_token.__func__
get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
@@ -38,3 +40,6 @@ class SlavedApplicationServiceStore(BaseSlavedStore):
get_appservice_state = DataStore.get_appservice_state.__func__
set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
set_appservice_state = DataStore.set_appservice_state.__func__
+ get_if_app_services_interested_in_user = (
+ DataStore.get_if_app_services_interested_in_user.__func__
+ )
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
new file mode 100644
index 0000000000..352c9a2aa8
--- /dev/null
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
+from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches.descriptors import Cache
+
+
+class SlavedClientIpStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(SlavedClientIpStore, self).__init__(db_conn, hs)
+
+ self.client_ip_last_seen = Cache(
+ name="client_ip_last_seen",
+ keylen=4,
+ max_entries=50000 * CACHE_SIZE_FACTOR,
+ )
+
+ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
+ now = int(self._clock.time_msec())
+ key = (user_id, access_token, ip)
+
+ try:
+ last_seen = self.client_ip_last_seen.get(key)
+ except KeyError:
+ last_seen = None
+
+ # Rate-limited inserts
+ if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+ return
+
+ self.hs.get_tcp_replication().send_user_ip(
+ user_id, access_token, ip, user_agent, device_id, now
+ )
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 90fb6c1336..6d2513c4e2 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
+ UserIpCommand,
)
from .protocol import ClientReplicationStreamProtocol
@@ -178,6 +179,12 @@ class ReplicationClientHandler(object):
cmd = InvalidateCacheCommand(cache_func.__name__, keys)
self.send_command(cmd)
+ def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+ """Tell the master that the user made a request.
+ """
+ cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
+ self.send_command(cmd)
+
def await_sync(self, data):
"""Returns a deferred that is resolved when we receive a SYNC command
with given data.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 84d2a2272a..171227cce2 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -304,6 +304,40 @@ class InvalidateCacheCommand(Command):
return " ".join((self.cache_func, json.dumps(self.keys)))
+class UserIpCommand(Command):
+ """Sent periodically when a worker sees activity from a client.
+
+ Format::
+
+ USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
+ """
+ NAME = "USER_IP"
+
+ def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+ self.user_id = user_id
+ self.access_token = access_token
+ self.ip = ip
+ self.user_agent = user_agent
+ self.device_id = device_id
+ self.last_seen = last_seen
+
+ @classmethod
+ def from_line(cls, line):
+ user_id, jsn = line.split(" ", 1)
+
+ access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+
+ return cls(
+ user_id, access_token, ip, user_agent, device_id, last_seen
+ )
+
+ def to_line(self):
+ return self.user_id + " " + json.dumps((
+ self.access_token, self.ip, self.user_agent, self.device_id,
+ self.last_seen,
+ ))
+
+
# Map of command name to command type.
COMMAND_MAP = {
cmd.NAME: cmd
@@ -320,6 +354,7 @@ COMMAND_MAP = {
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
+ UserIpCommand,
)
}
@@ -342,5 +377,6 @@ VALID_CLIENT_COMMANDS = (
FederationAckCommand.NAME,
RemovePusherCommand.NAME,
InvalidateCacheCommand.NAME,
+ UserIpCommand.NAME,
ErrorCommand.NAME,
)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 9fee2a484b..d59503b905 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -244,7 +244,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
becoming full.
"""
if self.state == ConnectionStates.CLOSED:
- logger.info("[%s] Not sending, connection closed", self.id())
+ logger.debug("[%s] Not sending, connection closed", self.id())
return
if do_buffer and self.state != ConnectionStates.ESTABLISHED:
@@ -264,7 +264,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def _queue_command(self, cmd):
"""Queue the command until the connection is ready to write to again.
"""
- logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
+ logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
self.pending_commands.append(cmd)
if len(self.pending_commands) > self.max_line_buffer:
@@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
+ def on_USER_IP(self, cmd):
+ self.streamer.on_user_ip(
+ cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
+ cmd.last_seen,
+ )
+
@defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token):
"""Subscribe the remote to a streams.
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 69c46911ec..3ea3ca5a6f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
federation_ack_counter = metrics.register_counter("federation_ack")
remove_pusher_counter = metrics.register_counter("remove_pusher")
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
+user_ip_cache_counter = metrics.register_counter("user_ip_cache")
logger = logging.getLogger(__name__)
@@ -238,6 +239,15 @@ class ReplicationStreamer(object):
invalidate_cache_counter.inc()
getattr(self.store, cache_func).invalidate(tuple(keys))
+ @measure_func("repl.on_user_ip")
+ def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+ """The client saw a user request
+ """
+ user_ip_cache_counter.inc()
+ self.store.insert_client_ip(
+ user_id, access_token, ip, user_agent, device_id, last_seen,
+ )
+
def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients.
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 29fcd72375..465b25033d 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -15,8 +15,9 @@
from twisted.internet import defer
+from synapse.api.constants import Membership
from synapse.api.errors import AuthError, SynapseError
-from synapse.types import UserID
+from synapse.types import UserID, create_requester
from synapse.http.servlet import parse_json_object_from_request
from .base import ClientV1RestServlet, client_path_patterns
@@ -157,9 +158,145 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
defer.returnValue((200, {}))
+class ShutdownRoomRestServlet(ClientV1RestServlet):
+ """Shuts down a room by removing all local users from the room and blocking
+ all future invites and joins to the room. Any local aliases will be repointed
+ to a new room created by `new_room_user_id` and kicked users will be auto
+ joined to the new room.
+ """
+ PATTERNS = client_path_patterns("/admin/shutdown_room/(?P<room_id>[^/]+)")
+
+ DEFAULT_MESSAGE = (
+ "Sharing illegal content on this server is not permitted and rooms in"
+ " violation will be blocked."
+ )
+
+ def __init__(self, hs):
+ super(ShutdownRoomRestServlet, self).__init__(hs)
+ self.store = hs.get_datastore()
+ self.handlers = hs.get_handlers()
+ self.state = hs.get_state_handler()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ is_admin = yield self.auth.is_server_admin(requester.user)
+ if not is_admin:
+ raise AuthError(403, "You are not a server admin")
+
+ content = parse_json_object_from_request(request)
+
+ new_room_user_id = content.get("new_room_user_id")
+ if not new_room_user_id:
+ raise SynapseError(400, "Please provide field `new_room_user_id`")
+
+ room_creator_requester = create_requester(new_room_user_id)
+
+ message = content.get("message", self.DEFAULT_MESSAGE)
+ room_name = content.get("room_name", "Content Violation Notification")
+
+ info = yield self.handlers.room_creation_handler.create_room(
+ room_creator_requester,
+ config={
+ "preset": "public_chat",
+ "name": room_name,
+ "power_level_content_override": {
+ "users_default": -10,
+ },
+ },
+ ratelimit=False,
+ )
+ new_room_id = info["room_id"]
+
+ msg_handler = self.handlers.message_handler
+ yield msg_handler.create_and_send_nonmember_event(
+ room_creator_requester,
+ {
+ "type": "m.room.message",
+ "content": {"body": message, "msgtype": "m.text"},
+ "room_id": new_room_id,
+ "sender": new_room_user_id,
+ },
+ ratelimit=False,
+ )
+
+ requester_user_id = requester.user.to_string()
+
+ logger.info("Shutting down room %r", room_id)
+
+ yield self.store.block_room(room_id, requester_user_id)
+
+ users = yield self.state.get_current_user_in_room(room_id)
+ kicked_users = []
+ for user_id in users:
+ if not self.hs.is_mine_id(user_id):
+ continue
+
+ logger.info("Kicking %r from %r...", user_id, room_id)
+
+ target_requester = create_requester(user_id)
+ yield self.handlers.room_member_handler.update_membership(
+ requester=target_requester,
+ target=target_requester.user,
+ room_id=room_id,
+ action=Membership.LEAVE,
+ content={},
+ ratelimit=False
+ )
+
+ yield self.handlers.room_member_handler.forget(target_requester.user, room_id)
+
+ yield self.handlers.room_member_handler.update_membership(
+ requester=target_requester,
+ target=target_requester.user,
+ room_id=new_room_id,
+ action=Membership.JOIN,
+ content={},
+ ratelimit=False
+ )
+
+ kicked_users.append(user_id)
+
+ aliases_for_room = yield self.store.get_aliases_for_room(room_id)
+
+ yield self.store.update_aliases_for_room(
+ room_id, new_room_id, requester_user_id
+ )
+
+ defer.returnValue((200, {
+ "kicked_users": kicked_users,
+ "local_aliases": aliases_for_room,
+ "new_room_id": new_room_id,
+ }))
+
+
+class QuarantineMediaInRoom(ClientV1RestServlet):
+ """Quarantines all media in a room so that no one can download it via
+ this server.
+ """
+ PATTERNS = client_path_patterns("/admin/quarantine_media/(?P<room_id>[^/]+)")
+
+ def __init__(self, hs):
+ super(QuarantineMediaInRoom, self).__init__(hs)
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ is_admin = yield self.auth.is_server_admin(requester.user)
+ if not is_admin:
+ raise AuthError(403, "You are not a server admin")
+
+ num_quarantined = yield self.store.quarantine_media_ids_in_room(
+ room_id, requester.user.to_string(),
+ )
+
+ defer.returnValue((200, {"num_quarantined": num_quarantined}))
+
+
class ResetPasswordRestServlet(ClientV1RestServlet):
"""Post request to allow an administrator reset password for a user.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
@user:to_reset_password?access_token=admin_access_token
@@ -182,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
"""Post request to allow an administrator reset password for a user.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
"""
UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
@@ -206,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
class GetUsersPaginatedRestServlet(ClientV1RestServlet):
"""Get request to get specific number of users from Synapse.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
@admin:user?access_token=admin_access_token&start=0&limit=10
@@ -225,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, target_user_id):
"""Get request to get specific number of users from Synapse.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
"""
target_user = UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
@@ -258,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
"""Post request to get specific number of users from Synapse..
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
@admin:user?access_token=admin_access_token
@@ -296,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
class SearchUsersRestServlet(ClientV1RestServlet):
"""Get request to search user table for specific users according to
search term.
- This need a user have a administrator access in Synapse.
+ This needs user to have administrator access in Synapse.
Example:
http://localhost:8008/_matrix/client/api/v1/admin/search_users/
@admin:user?access_token=admin_access_token&term=alice
@@ -316,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
def on_GET(self, request, target_user_id):
"""Get request to search user table for specific users according to
search term.
- This need a user have a administrator access in Synapse.
+ This needs user to have a administrator access in Synapse.
"""
target_user = UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
@@ -353,3 +490,5 @@ def register_servlets(hs, http_server):
ResetPasswordRestServlet(hs).register(http_server)
GetUsersPaginatedRestServlet(hs).register(http_server)
SearchUsersRestServlet(hs).register(http_server)
+ ShutdownRoomRestServlet(hs).register(http_server)
+ QuarantineMediaInRoom(hs).register(http_server)
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 9a2ed6ed88..1819a560cb 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -73,6 +73,7 @@ class PushersSetRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(PushersSetRestServlet, self).__init__(hs)
self.notifier = hs.get_notifier()
+ self.pusher_pool = self.hs.get_pusherpool()
@defer.inlineCallbacks
def on_POST(self, request):
@@ -81,12 +82,10 @@ class PushersSetRestServlet(ClientV1RestServlet):
content = parse_json_object_from_request(request)
- pusher_pool = self.hs.get_pusherpool()
-
if ('pushkey' in content and 'app_id' in content
and 'kind' in content and
content['kind'] is None):
- yield pusher_pool.remove_pusher(
+ yield self.pusher_pool.remove_pusher(
content['app_id'], content['pushkey'], user_id=user.to_string()
)
defer.returnValue((200, {}))
@@ -109,14 +108,14 @@ class PushersSetRestServlet(ClientV1RestServlet):
append = content['append']
if not append:
- yield pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
+ yield self.pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
app_id=content['app_id'],
pushkey=content['pushkey'],
not_user_id=user.to_string()
)
try:
- yield pusher_pool.add_pusher(
+ yield self.pusher_pool.add_pusher(
user_id=user.to_string(),
access_token=requester.access_token_id,
kind=content['kind'],
@@ -152,6 +151,7 @@ class PushersRemoveRestServlet(RestServlet):
self.hs = hs
self.notifier = hs.get_notifier()
self.auth = hs.get_v1auth()
+ self.pusher_pool = self.hs.get_pusherpool()
@defer.inlineCallbacks
def on_GET(self, request):
@@ -161,10 +161,8 @@ class PushersRemoveRestServlet(RestServlet):
app_id = parse_string(request, "app_id", required=True)
pushkey = parse_string(request, "pushkey", required=True)
- pusher_pool = self.hs.get_pusherpool()
-
try:
- yield pusher_pool.remove_pusher(
+ yield self.pusher_pool.remove_pusher(
app_id=app_id,
pushkey=pushkey,
user_id=user.to_string(),
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 83e209d18f..2939896f44 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet):
filter_id = parse_string(request, "filter", default=None)
full_state = parse_boolean(request, "full_state", default=False)
- logger.info(
+ logger.debug(
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, filter_id=%r, device_id=%r" % (
user, timeout, since, set_presence, filter_id, device_id
@@ -164,27 +164,34 @@ class SyncRestServlet(RestServlet):
)
time_now = self.clock.time_msec()
+ response_content = self.encode_response(
+ time_now, sync_result, requester.access_token_id, filter
+ )
+
+ defer.returnValue((200, response_content))
- joined = self.encode_joined(
- sync_result.joined, time_now, requester.access_token_id, filter.event_fields
+ @staticmethod
+ def encode_response(time_now, sync_result, access_token_id, filter):
+ joined = SyncRestServlet.encode_joined(
+ sync_result.joined, time_now, access_token_id, filter.event_fields
)
- invited = self.encode_invited(
- sync_result.invited, time_now, requester.access_token_id
+ invited = SyncRestServlet.encode_invited(
+ sync_result.invited, time_now, access_token_id,
)
- archived = self.encode_archived(
- sync_result.archived, time_now, requester.access_token_id,
+ archived = SyncRestServlet.encode_archived(
+ sync_result.archived, time_now, access_token_id,
filter.event_fields,
)
- response_content = {
+ return {
"account_data": {"events": sync_result.account_data},
"to_device": {"events": sync_result.to_device},
"device_lists": {
"changed": list(sync_result.device_lists),
},
- "presence": self.encode_presence(
+ "presence": SyncRestServlet.encode_presence(
sync_result.presence, time_now
),
"rooms": {
@@ -196,9 +203,8 @@ class SyncRestServlet(RestServlet):
"next_batch": sync_result.next_batch.to_string(),
}
- defer.returnValue((200, response_content))
-
- def encode_presence(self, events, time_now):
+ @staticmethod
+ def encode_presence(events, time_now):
return {
"events": [
{
@@ -212,7 +218,8 @@ class SyncRestServlet(RestServlet):
]
}
- def encode_joined(self, rooms, time_now, token_id, event_fields):
+ @staticmethod
+ def encode_joined(rooms, time_now, token_id, event_fields):
"""
Encode the joined rooms in a sync result
@@ -231,13 +238,14 @@ class SyncRestServlet(RestServlet):
"""
joined = {}
for room in rooms:
- joined[room.room_id] = self.encode_room(
+ joined[room.room_id] = SyncRestServlet.encode_room(
room, time_now, token_id, only_fields=event_fields
)
return joined
- def encode_invited(self, rooms, time_now, token_id):
+ @staticmethod
+ def encode_invited(rooms, time_now, token_id):
"""
Encode the invited rooms in a sync result
@@ -270,7 +278,8 @@ class SyncRestServlet(RestServlet):
return invited
- def encode_archived(self, rooms, time_now, token_id, event_fields):
+ @staticmethod
+ def encode_archived(rooms, time_now, token_id, event_fields):
"""
Encode the archived rooms in a sync result
@@ -289,7 +298,7 @@ class SyncRestServlet(RestServlet):
"""
joined = {}
for room in rooms:
- joined[room.room_id] = self.encode_room(
+ joined[room.room_id] = SyncRestServlet.encode_room(
room, time_now, token_id, joined=False, only_fields=event_fields
)
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index 6788375e85..6879249c8a 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -66,14 +66,19 @@ class DownloadResource(Resource):
@defer.inlineCallbacks
def _respond_local_file(self, request, media_id, name):
media_info = yield self.store.get_local_media(media_id)
- if not media_info:
+ if not media_info or media_info["quarantined_by"]:
respond_404(request)
return
media_type = media_info["media_type"]
media_length = media_info["media_length"]
upload_name = name if name else media_info["upload_name"]
- file_path = self.filepaths.local_media_filepath(media_id)
+ if media_info["url_cache"]:
+ # TODO: Check the file still exists, if it doesn't we can redownload
+ # it from the url `media_info["url_cache"]`
+ file_path = self.filepaths.url_cache_filepath(media_id)
+ else:
+ file_path = self.filepaths.local_media_filepath(media_id)
yield respond_with_file(
request, media_type, file_path, media_length,
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 0137458f71..d92b7ff337 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -71,3 +71,21 @@ class MediaFilePaths(object):
self.base_path, "remote_thumbnail", server_name,
file_id[0:2], file_id[2:4], file_id[4:],
)
+
+ def url_cache_filepath(self, media_id):
+ return os.path.join(
+ self.base_path, "url_cache",
+ media_id[0:2], media_id[2:4], media_id[4:]
+ )
+
+ def url_cache_thumbnail(self, media_id, width, height, content_type,
+ method):
+ top_level_type, sub_type = content_type.split("/")
+ file_name = "%i-%i-%s-%s-%s" % (
+ width, height, top_level_type, sub_type, method
+ )
+ return os.path.join(
+ self.base_path, "url_cache_thumbnails",
+ media_id[0:2], media_id[2:4], media_id[4:],
+ file_name
+ )
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index bae2b4c757..0ea1248ce6 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -135,6 +135,8 @@ class MediaRepository(object):
media_info = yield self._download_remote_file(
server_name, media_id
)
+ elif media_info["quarantined_by"]:
+ raise NotFoundError()
else:
self.recently_accessed_remotes.add((server_name, media_id))
yield self.store.update_cached_last_access_time(
@@ -324,13 +326,17 @@ class MediaRepository(object):
defer.returnValue(t_path)
@defer.inlineCallbacks
- def _generate_local_thumbnails(self, media_id, media_info):
+ def _generate_local_thumbnails(self, media_id, media_info, url_cache=False):
media_type = media_info["media_type"]
requirements = self._get_thumbnail_requirements(media_type)
if not requirements:
return
- input_path = self.filepaths.local_media_filepath(media_id)
+ if url_cache:
+ input_path = self.filepaths.url_cache_filepath(media_id)
+ else:
+ input_path = self.filepaths.local_media_filepath(media_id)
+
thumbnailer = Thumbnailer(input_path)
m_width = thumbnailer.width
m_height = thumbnailer.height
@@ -358,9 +364,14 @@ class MediaRepository(object):
for t_width, t_height, t_type in scales:
t_method = "scale"
- t_path = self.filepaths.local_media_thumbnail(
- media_id, t_width, t_height, t_type, t_method
- )
+ if url_cache:
+ t_path = self.filepaths.url_cache_thumbnail(
+ media_id, t_width, t_height, t_type, t_method
+ )
+ else:
+ t_path = self.filepaths.local_media_thumbnail(
+ media_id, t_width, t_height, t_type, t_method
+ )
self._makedirs(t_path)
t_len = thumbnailer.scale(t_path, t_width, t_height, t_type)
@@ -375,9 +386,14 @@ class MediaRepository(object):
# thumbnail.
continue
t_method = "crop"
- t_path = self.filepaths.local_media_thumbnail(
- media_id, t_width, t_height, t_type, t_method
- )
+ if url_cache:
+ t_path = self.filepaths.url_cache_thumbnail(
+ media_id, t_width, t_height, t_type, t_method
+ )
+ else:
+ t_path = self.filepaths.local_media_thumbnail(
+ media_id, t_width, t_height, t_type, t_method
+ )
self._makedirs(t_path)
t_len = thumbnailer.crop(t_path, t_width, t_height, t_type)
local_thumbnails.append((
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index c680fddab5..b81a336c5d 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -164,7 +164,7 @@ class PreviewUrlResource(Resource):
if _is_media(media_info['media_type']):
dims = yield self.media_repo._generate_local_thumbnails(
- media_info['filesystem_id'], media_info
+ media_info['filesystem_id'], media_info, url_cache=True,
)
og = {
@@ -210,7 +210,7 @@ class PreviewUrlResource(Resource):
if _is_media(image_info['media_type']):
# TODO: make sure we don't choke on white-on-transparent images
dims = yield self.media_repo._generate_local_thumbnails(
- image_info['filesystem_id'], image_info
+ image_info['filesystem_id'], image_info, url_cache=True,
)
if dims:
og["og:image:width"] = dims['width']
@@ -256,7 +256,7 @@ class PreviewUrlResource(Resource):
# XXX: horrible duplication with base_resource's _download_remote_file()
file_id = random_string(24)
- fname = self.filepaths.local_media_filepath(file_id)
+ fname = self.filepaths.url_cache_filepath(file_id)
self.media_repo._makedirs(fname)
try:
@@ -303,6 +303,7 @@ class PreviewUrlResource(Resource):
upload_name=download_name,
media_length=length,
user_id=user,
+ url_cache=url,
)
except Exception as e:
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index d8f54adc99..68d56b2b10 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -81,7 +81,7 @@ class ThumbnailResource(Resource):
method, m_type):
media_info = yield self.store.get_local_media(media_id)
- if not media_info:
+ if not media_info or media_info["quarantined_by"]:
respond_404(request)
return
@@ -101,9 +101,16 @@ class ThumbnailResource(Resource):
t_type = thumbnail_info["thumbnail_type"]
t_method = thumbnail_info["thumbnail_method"]
- file_path = self.filepaths.local_media_thumbnail(
- media_id, t_width, t_height, t_type, t_method,
- )
+ if media_info["url_cache"]:
+ # TODO: Check the file still exists, if it doesn't we can redownload
+ # it from the url `media_info["url_cache"]`
+ file_path = self.filepaths.url_cache_thumbnail(
+ media_id, t_width, t_height, t_type, t_method,
+ )
+ else:
+ file_path = self.filepaths.local_media_thumbnail(
+ media_id, t_width, t_height, t_type, t_method,
+ )
yield respond_with_file(request, t_type, file_path)
else:
@@ -117,7 +124,7 @@ class ThumbnailResource(Resource):
desired_type):
media_info = yield self.store.get_local_media(media_id)
- if not media_info:
+ if not media_info or media_info["quarantined_by"]:
respond_404(request)
return
@@ -134,9 +141,18 @@ class ThumbnailResource(Resource):
t_type = info["thumbnail_type"] == desired_type
if t_w and t_h and t_method and t_type:
- file_path = self.filepaths.local_media_thumbnail(
- media_id, desired_width, desired_height, desired_type, desired_method,
- )
+ if media_info["url_cache"]:
+ # TODO: Check the file still exists, if it doesn't we can redownload
+ # it from the url `media_info["url_cache"]`
+ file_path = self.filepaths.url_cache_thumbnail(
+ media_id, desired_width, desired_height, desired_type,
+ desired_method,
+ )
+ else:
+ file_path = self.filepaths.local_media_thumbnail(
+ media_id, desired_width, desired_height, desired_type,
+ desired_method,
+ )
yield respond_with_file(request, desired_type, file_path)
return
diff --git a/synapse/state.py b/synapse/state.py
index 5b386e3183..390799fbd5 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -24,13 +24,13 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.events.snapshot import EventContext
from synapse.util.async import Linearizer
+from synapse.util.caches import CACHE_SIZE_FACTOR
from collections import namedtuple
from frozendict import frozendict
import logging
import hashlib
-import os
logger = logging.getLogger(__name__)
@@ -38,9 +38,6 @@ logger = logging.getLogger(__name__)
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
-
-
SIZE_OF_CACHE = int(100000 * CACHE_SIZE_FACTOR)
EVICTION_TIMEOUT_SECONDS = 60 * 60
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f119c5a758..b92472df33 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -304,16 +304,6 @@ class DataStore(RoomMemberStore, RoomStore,
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
- def get_user_ip_and_agents(self, user):
- return self._simple_select_list(
- table="user_ips",
- keyvalues={"user_id": user.to_string()},
- retcols=[
- "access_token", "ip", "user_agent", "last_seen"
- ],
- desc="get_user_ip_and_agents",
- )
-
def get_users(self):
"""Function to reterive a list of users in users table.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 51730a88bf..6f54036d67 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -16,6 +16,7 @@ import logging
from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine
@@ -27,10 +28,6 @@ from twisted.internet import defer
import sys
import time
import threading
-import os
-
-
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index aa84ffc2b0..ff14e54c11 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore):
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
+
+ @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
+ def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
+ ignored_account_data = yield self.get_global_account_data_by_type_for_user(
+ "m.ignored_user_list", ignorer_user_id,
+ on_invalidate=cache_context.invalidate,
+ )
+ if not ignored_account_data:
+ defer.returnValue(False)
+
+ defer.returnValue(
+ ignored_user_id in ignored_account_data.get("ignored_users", {})
+ )
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 532df736a5..c63935cb07 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -27,6 +27,25 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
+def _make_exclusive_regex(services_cache):
+ # We precompie a regex constructed from all the regexes that the AS's
+ # have registered for exclusive users.
+ exclusive_user_regexes = [
+ regex.pattern
+ for service in services_cache
+ for regex in service.get_exlusive_user_regexes()
+ ]
+ if exclusive_user_regexes:
+ exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
+ exclusive_user_regex = re.compile(exclusive_user_regex)
+ else:
+ # We handle this case specially otherwise the constructed regex
+ # will always match
+ exclusive_user_regex = None
+
+ return exclusive_user_regex
+
+
class ApplicationServiceStore(SQLBaseStore):
def __init__(self, hs):
@@ -36,21 +55,7 @@ class ApplicationServiceStore(SQLBaseStore):
hs.hostname,
hs.config.app_service_config_files
)
-
- # We precompie a regex constructed from all the regexes that the AS's
- # have registered for exclusive users.
- exclusive_user_regexes = [
- regex.pattern
- for service in self.services_cache
- for regex in service.get_exlusive_user_regexes()
- ]
- if exclusive_user_regexes:
- exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
- self.exclusive_user_regex = re.compile(exclusive_user_regex)
- else:
- # We handle this case specially otherwise the constructed regex
- # will always match
- self.exclusive_user_regex = None
+ self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
def get_app_services(self):
return self.services_cache
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 014ab635b7..3c95e90eca 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,12 +15,13 @@
import logging
-from twisted.internet import defer
+from twisted.internet import defer, reactor
from ._base import Cache
from . import background_updates
-import os
+from synapse.util.caches import CACHE_SIZE_FACTOR
+
logger = logging.getLogger(__name__)
@@ -30,9 +31,6 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 120 * 1000
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
-
-
class ClientIpStore(background_updates.BackgroundUpdateStore):
def __init__(self, hs):
self.client_ip_last_seen = Cache(
@@ -50,10 +48,19 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
columns=["user_id", "device_id", "last_seen"],
)
- @defer.inlineCallbacks
- def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
- now = int(self._clock.time_msec())
- key = (user.to_string(), access_token, ip)
+ # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
+ self._batch_row_update = {}
+
+ self._client_ip_looper = self._clock.looping_call(
+ self._update_client_ips_batch, 5 * 1000
+ )
+ reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+
+ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
+ now=None):
+ if not now:
+ now = int(self._clock.time_msec())
+ key = (user_id, access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
@@ -62,34 +69,48 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
- defer.returnValue(None)
+ return
self.client_ip_last_seen.prefill(key, now)
- # It's safe not to lock here: a) no unique constraint,
- # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
- yield self._simple_upsert(
- "user_ips",
- keyvalues={
- "user_id": user.to_string(),
- "access_token": access_token,
- "ip": ip,
- "user_agent": user_agent,
- "device_id": device_id,
- },
- values={
- "last_seen": now,
- },
- desc="insert_client_ip",
- lock=False,
+ self._batch_row_update[key] = (user_agent, device_id, now)
+
+ def _update_client_ips_batch(self):
+ to_update = self._batch_row_update
+ self._batch_row_update = {}
+ return self.runInteraction(
+ "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
+ def _update_client_ips_batch_txn(self, txn, to_update):
+ self.database_engine.lock_table(txn, "user_ips")
+
+ for entry in to_update.iteritems():
+ (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
+
+ self._simple_upsert_txn(
+ txn,
+ table="user_ips",
+ keyvalues={
+ "user_id": user_id,
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ "device_id": device_id,
+ },
+ values={
+ "last_seen": last_seen,
+ },
+ lock=False,
+ )
+
@defer.inlineCallbacks
- def get_last_client_ip_by_device(self, devices):
+ def get_last_client_ip_by_device(self, user_id, device_id):
"""For each device_id listed, give the user_ip it was last seen on
Args:
- devices (iterable[(str, str)]): list of (user_id, device_id) pairs
+ user_id (str)
+ device_id (str): If None fetches all devices for the user
Returns:
defer.Deferred: resolves to a dict, where the keys
@@ -100,6 +121,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
res = yield self.runInteraction(
"get_last_client_ip_by_device",
self._get_last_client_ip_by_device_txn,
+ user_id, device_id,
retcols=(
"user_id",
"access_token",
@@ -108,23 +130,34 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"device_id",
"last_seen",
),
- devices=devices
)
ret = {(d["user_id"], d["device_id"]): d for d in res}
+ for key in self._batch_row_update:
+ uid, access_token, ip = key
+ if uid == user_id:
+ user_agent, did, last_seen = self._batch_row_update[key]
+ if not device_id or did == device_id:
+ ret[(user_id, device_id)] = {
+ "user_id": user_id,
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ "device_id": did,
+ "last_seen": last_seen,
+ }
defer.returnValue(ret)
@classmethod
- def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols):
+ def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
where_clauses = []
bindings = []
- for (user_id, device_id) in devices:
- if device_id is None:
- where_clauses.append("(user_id = ? AND device_id IS NULL)")
- bindings.extend((user_id, ))
- else:
- where_clauses.append("(user_id = ? AND device_id = ?)")
- bindings.extend((user_id, device_id))
+ if device_id is None:
+ where_clauses.append("user_id = ?")
+ bindings.extend((user_id, ))
+ else:
+ where_clauses.append("(user_id = ? AND device_id = ?)")
+ bindings.extend((user_id, device_id))
if not where_clauses:
return []
@@ -152,3 +185,37 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
txn.execute(sql, bindings)
return cls.cursor_to_dict(txn)
+
+ @defer.inlineCallbacks
+ def get_user_ip_and_agents(self, user):
+ user_id = user.to_string()
+ results = {}
+
+ for key in self._batch_row_update:
+ uid, access_token, ip = key
+ if uid == user_id:
+ user_agent, _, last_seen = self._batch_row_update[key]
+ results[(access_token, ip)] = (user_agent, last_seen)
+
+ rows = yield self._simple_select_list(
+ table="user_ips",
+ keyvalues={"user_id": user_id},
+ retcols=[
+ "access_token", "ip", "user_agent", "last_seen"
+ ],
+ desc="get_user_ip_and_agents",
+ )
+
+ results.update(
+ ((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
+ for row in rows
+ )
+ defer.returnValue(list(
+ {
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ "last_seen": last_seen,
+ }
+ for (access_token, ip), (user_agent, last_seen) in results.iteritems()
+ ))
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 9caaf81f2c..79e7c540ad 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -170,3 +170,17 @@ class DirectoryStore(SQLBaseStore):
"room_alias",
desc="get_aliases_for_room",
)
+
+ def update_aliases_for_room(self, old_room_id, new_room_id, creator):
+ def _update_aliases_for_room_txn(txn):
+ sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
+ txn.execute(sql, (new_room_id, creator, old_room_id,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_aliases_for_room, (old_room_id,)
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_aliases_for_room, (new_room_id,)
+ )
+ return self.runInteraction(
+ "_update_aliases_for_room_txn", _update_aliases_for_room_txn
+ )
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2b7340c1d9..7002b3752e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -403,6 +403,11 @@ class EventsStore(SQLBaseStore):
(room_id, ), new_state
)
+ for room_id, latest_event_ids in new_forward_extremeties.iteritems():
+ self.get_latest_event_ids_in_room.prefill(
+ (room_id,), list(latest_event_ids)
+ )
+
@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index a2ccc66ea7..78b1e30945 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError, Codes
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from canonicaljson import encode_canonical_json
import simplejson as json
@@ -46,12 +47,21 @@ class FilteringStore(SQLBaseStore):
defer.returnValue(json.loads(str(def_json).decode("utf-8")))
def add_user_filter(self, user_localpart, user_filter):
- def_json = json.dumps(user_filter).encode("utf-8")
+ def_json = encode_canonical_json(user_filter)
# Need an atomic transaction to SELECT the maximal ID so far then
# INSERT a new one
def _do_txn(txn):
sql = (
+ "SELECT filter_id FROM user_filters "
+ "WHERE user_id = ? AND filter_json = ?"
+ )
+ txn.execute(sql, (user_localpart, def_json))
+ filter_id_response = txn.fetchone()
+ if filter_id_response is not None:
+ return filter_id_response[0]
+
+ sql = (
"SELECT MAX(filter_id) FROM user_filters "
"WHERE user_id = ?"
)
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 4c0f82353d..82bb61b811 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -30,13 +30,16 @@ class MediaRepositoryStore(SQLBaseStore):
return self._simple_select_one(
"local_media_repository",
{"media_id": media_id},
- ("media_type", "media_length", "upload_name", "created_ts"),
+ (
+ "media_type", "media_length", "upload_name", "created_ts",
+ "quarantined_by", "url_cache",
+ ),
allow_none=True,
desc="get_local_media",
)
def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
- media_length, user_id):
+ media_length, user_id, url_cache=None):
return self._simple_insert(
"local_media_repository",
{
@@ -46,6 +49,7 @@ class MediaRepositoryStore(SQLBaseStore):
"upload_name": upload_name,
"media_length": media_length,
"user_id": user_id.to_string(),
+ "url_cache": url_cache,
},
desc="store_local_media",
)
@@ -138,7 +142,7 @@ class MediaRepositoryStore(SQLBaseStore):
{"media_origin": origin, "media_id": media_id},
(
"media_type", "media_length", "upload_name", "created_ts",
- "filesystem_id",
+ "filesystem_id", "quarantined_by",
),
allow_none=True,
desc="get_cached_remote_media",
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5d543652bb..23688430b7 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -24,6 +24,7 @@ from .engines import PostgresEngine, Sqlite3Engine
import collections
import logging
import ujson as json
+import re
logger = logging.getLogger(__name__)
@@ -507,3 +508,98 @@ class RoomStore(SQLBaseStore):
))
else:
defer.returnValue(None)
+
+ @cached(max_entries=10000)
+ def is_room_blocked(self, room_id):
+ return self._simple_select_one_onecol(
+ table="blocked_rooms",
+ keyvalues={
+ "room_id": room_id,
+ },
+ retcol="1",
+ allow_none=True,
+ desc="is_room_blocked",
+ )
+
+ @defer.inlineCallbacks
+ def block_room(self, room_id, user_id):
+ yield self._simple_insert(
+ table="blocked_rooms",
+ values={
+ "room_id": room_id,
+ "user_id": user_id,
+ },
+ desc="block_room",
+ )
+ self.is_room_blocked.invalidate((room_id,))
+
+ def quarantine_media_ids_in_room(self, room_id, quarantined_by):
+ """For a room loops through all events with media and quarantines
+ the associated media
+ """
+ def _get_media_ids_in_room(txn):
+ mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
+
+ next_token = self.get_current_events_token() + 1
+
+ total_media_quarantined = 0
+
+ while next_token:
+ sql = """
+ SELECT stream_ordering, content FROM events
+ WHERE room_id = ?
+ AND stream_ordering < ?
+ AND contains_url = ? AND outlier = ?
+ ORDER BY stream_ordering DESC
+ LIMIT ?
+ """
+ txn.execute(sql, (room_id, next_token, True, False, 100))
+
+ next_token = None
+ local_media_mxcs = []
+ remote_media_mxcs = []
+ for stream_ordering, content_json in txn:
+ next_token = stream_ordering
+ content = json.loads(content_json)
+
+ content_url = content.get("url")
+ thumbnail_url = content.get("info", {}).get("thumbnail_url")
+
+ for url in (content_url, thumbnail_url):
+ if not url:
+ continue
+ matches = mxc_re.match(url)
+ if matches:
+ hostname = matches.group(1)
+ media_id = matches.group(2)
+ if hostname == self.hostname:
+ local_media_mxcs.append(media_id)
+ else:
+ remote_media_mxcs.append((hostname, media_id))
+
+ # Now update all the tables to set the quarantined_by flag
+
+ txn.executemany("""
+ UPDATE local_media_repository
+ SET quarantined_by = ?
+ WHERE media_id = ?
+ """, ((quarantined_by, media_id) for media_id in local_media_mxcs))
+
+ txn.executemany(
+ """
+ UPDATE remote_media_cache
+ SET quarantined_by = ?
+ WHERE media_origin AND media_id = ?
+ """,
+ (
+ (quarantined_by, origin, media_id)
+ for origin, media_id in remote_media_mxcs
+ )
+ )
+
+ total_media_quarantined += len(local_media_mxcs)
+ total_media_quarantined += len(remote_media_mxcs)
+
+ return total_media_quarantined
+
+ return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room)
diff --git a/synapse/storage/schema/delta/43/blocked_rooms.sql b/synapse/storage/schema/delta/43/blocked_rooms.sql
new file mode 100644
index 0000000000..0e3cd143ff
--- /dev/null
+++ b/synapse/storage/schema/delta/43/blocked_rooms.sql
@@ -0,0 +1,21 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE blocked_rooms (
+ room_id TEXT NOT NULL,
+ user_id TEXT NOT NULL -- Admin who blocked the room
+);
+
+CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id);
diff --git a/synapse/storage/schema/delta/43/quarantine_media.sql b/synapse/storage/schema/delta/43/quarantine_media.sql
new file mode 100644
index 0000000000..630907ec4f
--- /dev/null
+++ b/synapse/storage/schema/delta/43/quarantine_media.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE local_media_repository ADD COLUMN quarantined_by TEXT;
+ALTER TABLE remote_media_cache ADD COLUMN quarantined_by TEXT;
diff --git a/synapse/storage/schema/delta/43/url_cache.sql b/synapse/storage/schema/delta/43/url_cache.sql
new file mode 100644
index 0000000000..45ebe020da
--- /dev/null
+++ b/synapse/storage/schema/delta/43/url_cache.sql
@@ -0,0 +1,16 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE local_media_repository ADD COLUMN url_cache TEXT;
diff --git a/synapse/storage/schema/delta/43/user_share.sql b/synapse/storage/schema/delta/43/user_share.sql
index f552b6eb7b..4501d90cbb 100644
--- a/synapse/storage/schema/delta/43/user_share.sql
+++ b/synapse/storage/schema/delta/43/user_share.sql
@@ -25,7 +25,8 @@ CREATE TABLE users_who_share_rooms (
CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id);
-CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id, user_id);
+CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id);
+CREATE INDEX users_who_share_rooms_o_idx ON users_who_share_rooms(other_user_id);
-- Make sure that we popualte the table initially
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index d1e679719b..5673e4aa96 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -315,6 +315,12 @@ class StateStore(SQLBaseStore):
],
)
+ for event_id, state_group_id in state_groups.iteritems():
+ txn.call_after(
+ self._get_state_group_for_event.prefill,
+ (event_id,), state_group_id
+ )
+
def _count_state_group_hops_txn(self, txn, state_group):
"""Given a state group, count how many hops there are in the tree.
@@ -584,8 +590,8 @@ class StateStore(SQLBaseStore):
state_map = yield self.get_state_ids_for_events([event_id], types)
defer.returnValue(state_map[event_id])
- @cached(num_args=2, max_entries=50000)
- def _get_state_group_for_event(self, room_id, event_id):
+ @cached(max_entries=50000)
+ def _get_state_group_for_event(self, event_id):
return self._simple_select_one_onecol(
table="event_to_state_groups",
keyvalues={
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 52b184fe78..2a4db3f03c 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -439,6 +439,7 @@ class UserDirectoryStore(SQLBaseStore):
},
retcol="share_private",
allow_none=True,
+ desc="get_if_users_share_a_room",
)
@cachedInlineCallbacks(max_entries=500000, iterable=True)
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 4a83c46d98..4adae96681 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -16,7 +16,7 @@
import synapse.metrics
import os
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index cbdff86596..af65bfe7b8 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -16,6 +16,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
+from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -25,7 +26,6 @@ from . import register_cache
from twisted.internet import defer
from collections import namedtuple
-import os
import functools
import inspect
import threading
@@ -37,9 +37,6 @@ logger = logging.getLogger(__name__)
_CacheSentinel = object()
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
-
-
class CacheEntry(object):
__slots__ = [
"deferred", "sequence", "callbacks", "invalidated"
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index cbdde34a57..6ad53a6390 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -94,6 +94,9 @@ class ExpiringCache(object):
return entry.value
+ def __contains__(self, key):
+ return key in self._cache
+
def get(self, key, default=None):
try:
return self[key]
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 609625b322..941d873ab8 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,20 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import register_cache
+from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
from blist import sorteddict
import logging
-import os
logger = logging.getLogger(__name__)
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
-
-
class StreamChangeCache(object):
"""Keeps track of the stream positions of the latest change in a set of entities.
diff --git a/synapse/visibility.py b/synapse/visibility.py
index c4dd9ae2c7..5590b866ed 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -189,25 +189,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
@defer.inlineCallbacks
-def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context):
- user_ids = set(u[0] for u in user_tuples)
- event_id_to_state = {}
- for event_id, context in event_id_to_context.items():
- state = yield store.get_events([
- e_id
- for key, e_id in context.current_state_ids.iteritems()
- if key == (EventTypes.RoomHistoryVisibility, "")
- or (key[0] == EventTypes.Member and key[1] in user_ids)
- ])
- event_id_to_state[event_id] = state
-
- res = yield filter_events_for_clients(
- store, user_tuples, events, event_id_to_state
- )
- defer.returnValue(res)
-
-
-@defer.inlineCallbacks
def filter_events_for_client(store, user_id, events, is_peeking=False):
"""
Check which events a user is allowed to see
|