diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 81510bc5c1..e8218d01ad 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -23,14 +23,27 @@ import signal
import subprocess
import sys
import yaml
+import errno
+import time
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
GREEN = "\x1b[1;32m"
+YELLOW = "\x1b[1;33m"
RED = "\x1b[1;31m"
NORMAL = "\x1b[m"
+def pid_running(pid):
+ try:
+ os.kill(pid, 0)
+ return True
+ except OSError, err:
+ if err.errno == errno.EPERM:
+ return True
+ return False
+
+
def write(message, colour=NORMAL, stream=sys.stdout):
if colour == NORMAL:
stream.write(message + "\n")
@@ -38,6 +51,11 @@ def write(message, colour=NORMAL, stream=sys.stdout):
stream.write(colour + message + NORMAL + "\n")
+def abort(message, colour=RED, stream=sys.stderr):
+ write(message, colour, stream)
+ sys.exit(1)
+
+
def start(configfile):
write("Starting ...")
args = SYNAPSE
@@ -45,7 +63,8 @@ def start(configfile):
try:
subprocess.check_call(args)
- write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
+ write("started synapse.app.homeserver(%r)" %
+ (configfile,), colour=GREEN)
except subprocess.CalledProcessError as e:
write(
"error starting (exit code: %d); see above for logs" % e.returncode,
@@ -76,8 +95,16 @@ def start_worker(app, configfile, worker_configfile):
def stop(pidfile, app):
if os.path.exists(pidfile):
pid = int(open(pidfile).read())
- os.kill(pid, signal.SIGTERM)
- write("stopped %s" % (app,), colour=GREEN)
+ try:
+ os.kill(pid, signal.SIGTERM)
+ write("stopped %s" % (app,), colour=GREEN)
+ except OSError, err:
+ if err.errno == errno.ESRCH:
+ write("%s not running" % (app,), colour=YELLOW)
+ elif err.errno == errno.EPERM:
+ abort("Cannot stop %s: Operation not permitted" % (app,))
+ else:
+ abort("Cannot stop %s: Unknown error" % (app,))
Worker = collections.namedtuple("Worker", [
@@ -191,7 +218,19 @@ def main():
if start_stop_synapse:
stop(pidfile, "synapse.app.homeserver")
- # TODO: Wait for synapse to actually shutdown before starting it again
+ # Wait for synapse to actually shutdown before starting it again
+ if action == "restart":
+ running_pids = []
+ if start_stop_synapse and os.path.exists(pidfile):
+ running_pids.append(int(open(pidfile).read()))
+ for worker in workers:
+ if os.path.exists(worker.pidfile):
+ running_pids.append(int(open(worker.pidfile).read()))
+ if len(running_pids) > 0:
+ write("Waiting for process to exit before restarting...")
+ for running_pid in running_pids:
+ while pid_running(running_pid):
+ time.sleep(0.2)
if action == "start" or action == "restart":
if start_stop_synapse:
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index b0106a3597..7346206bb1 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.constants import EventTypes
+from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer
@@ -124,29 +125,23 @@ class ApplicationService(object):
raise ValueError(
"Expected bool for 'exclusive' in ns '%s'" % ns
)
- if not isinstance(regex_obj.get("regex"), basestring):
+ regex = regex_obj.get("regex")
+ if isinstance(regex, basestring):
+ regex_obj["regex"] = re.compile(regex) # Pre-compile regex
+ else:
raise ValueError(
"Expected string for 'regex' in ns '%s'" % ns
)
return namespaces
- def _matches_regex(self, test_string, namespace_key, return_obj=False):
- if not isinstance(test_string, basestring):
- logger.error(
- "Expected a string to test regex against, but got %s",
- test_string
- )
- return False
-
+ def _matches_regex(self, test_string, namespace_key):
for regex_obj in self.namespaces[namespace_key]:
- if re.match(regex_obj["regex"], test_string):
- if return_obj:
- return regex_obj
- return True
- return False
+ if regex_obj["regex"].match(test_string):
+ return regex_obj
+ return None
def _is_exclusive(self, ns_key, test_string):
- regex_obj = self._matches_regex(test_string, ns_key, return_obj=True)
+ regex_obj = self._matches_regex(test_string, ns_key)
if regex_obj:
return regex_obj["exclusive"]
return False
@@ -166,7 +161,14 @@ class ApplicationService(object):
if not store:
defer.returnValue(False)
- member_list = yield store.get_users_in_room(event.room_id)
+ does_match = yield self._matches_user_in_member_list(event.room_id, store)
+ defer.returnValue(does_match)
+
+ @cachedInlineCallbacks(num_args=1, cache_context=True)
+ def _matches_user_in_member_list(self, room_id, store, cache_context):
+ member_list = yield store.get_users_in_room(
+ room_id, on_invalidate=cache_context.invalidate
+ )
# check joined member events
for user_id in member_list:
@@ -219,10 +221,10 @@ class ApplicationService(object):
)
def is_interested_in_alias(self, alias):
- return self._matches_regex(alias, ApplicationService.NS_ALIASES)
+ return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
def is_interested_in_room(self, room_id):
- return self._matches_regex(room_id, ApplicationService.NS_ROOMS)
+ return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
def is_exclusive_user(self, user_id):
return (
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index eeb693027b..3a4e16fa96 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -23,6 +23,7 @@ class VoipConfig(Config):
self.turn_username = config.get("turn_username")
self.turn_password = config.get("turn_password")
self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
+ self.turn_allow_guests = config.get("turn_allow_guests", True)
def default_config(self, **kwargs):
return """\
@@ -41,4 +42,11 @@ class VoipConfig(Config):
# How long generated TURN credentials last
turn_user_lifetime: "1h"
+
+ # Whether guests should be allowed to use the TURN server.
+ # This defaults to True, otherwise VoIP will be unreliable for guests.
+ # However, it does introduce a slight security risk as it allows users to
+ # connect to arbitrary endpoints without having first signed up for a
+ # valid account (e.g. by passing a CAPTCHA).
+ turn_allow_guests: True
"""
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 4db76f18bd..4d88046579 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -17,6 +17,7 @@ import logging
import re
from synapse.types import UserID
+from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@@ -125,6 +126,11 @@ class PushRuleEvaluatorForEvent(object):
return self._value_cache.get(dotted_key, None)
+# Caches (glob, word_boundary) -> regex for push. See _glob_matches
+regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
+register_cache("regex_push_cache", regex_cache)
+
+
def _glob_matches(glob, value, word_boundary=False):
"""Tests if value matches glob.
@@ -137,46 +143,63 @@ def _glob_matches(glob, value, word_boundary=False):
Returns:
bool
"""
- try:
- if IS_GLOB.search(glob):
- r = re.escape(glob)
-
- r = r.replace(r'\*', '.*?')
- r = r.replace(r'\?', '.')
-
- # handle [abc], [a-z] and [!a-z] style ranges.
- r = GLOB_REGEX.sub(
- lambda x: (
- '[%s%s]' % (
- x.group(1) and '^' or '',
- x.group(2).replace(r'\\\-', '-')
- )
- ),
- r,
- )
- if word_boundary:
- r = r"\b%s\b" % (r,)
- r = _compile_regex(r)
-
- return r.search(value)
- else:
- r = r + "$"
- r = _compile_regex(r)
-
- return r.match(value)
- elif word_boundary:
- r = re.escape(glob)
- r = r"\b%s\b" % (r,)
- r = _compile_regex(r)
- return r.search(value)
- else:
- return value.lower() == glob.lower()
+ try:
+ r = regex_cache.get((glob, word_boundary), None)
+ if not r:
+ r = _glob_to_re(glob, word_boundary)
+ regex_cache[(glob, word_boundary)] = r
+ return r.search(value)
except re.error:
logger.warn("Failed to parse glob to regex: %r", glob)
return False
+def _glob_to_re(glob, word_boundary):
+ """Generates regex for a given glob.
+
+ Args:
+ glob (string)
+ word_boundary (bool): Whether to match against word boundaries or entire
+ string. Defaults to False.
+
+ Returns:
+ regex object
+ """
+ if IS_GLOB.search(glob):
+ r = re.escape(glob)
+
+ r = r.replace(r'\*', '.*?')
+ r = r.replace(r'\?', '.')
+
+ # handle [abc], [a-z] and [!a-z] style ranges.
+ r = GLOB_REGEX.sub(
+ lambda x: (
+ '[%s%s]' % (
+ x.group(1) and '^' or '',
+ x.group(2).replace(r'\\\-', '-')
+ )
+ ),
+ r,
+ )
+ if word_boundary:
+ r = r"\b%s\b" % (r,)
+
+ return re.compile(r, flags=re.IGNORECASE)
+ else:
+ r = "^" + r + "$"
+
+ return re.compile(r, flags=re.IGNORECASE)
+ elif word_boundary:
+ r = re.escape(glob)
+ r = r"\b%s\b" % (r,)
+
+ return re.compile(r, flags=re.IGNORECASE)
+ else:
+ r = "^" + re.escape(glob) + "$"
+ return re.compile(r, flags=re.IGNORECASE)
+
+
def _flatten_dict(d, prefix=[], result={}):
for key, value in d.items():
if isinstance(value, basestring):
@@ -185,16 +208,3 @@ def _flatten_dict(d, prefix=[], result={}):
_flatten_dict(value, prefix=(prefix + [key]), result=result)
return result
-
-
-regex_cache = LruCache(5000)
-
-
-def _compile_regex(regex_str):
- r = regex_cache.get(regex_str, None)
- if r:
- return r
-
- r = re.compile(regex_str, flags=re.IGNORECASE)
- regex_cache[regex_str] = r
- return r
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 287df94b4f..6835f54e97 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,15 +17,12 @@ from twisted.internet import defer
from synapse.push.presentable_names import (
calculate_room_name, name_from_member_event
)
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@defer.inlineCallbacks
def get_badge_count(store, user_id):
- invites, joins = yield preserve_context_over_deferred(defer.gatherResults([
- preserve_fn(store.get_invited_rooms_for_user)(user_id),
- preserve_fn(store.get_rooms_for_user)(user_id),
- ], consumeErrors=True))
+ invites = yield store.get_invited_rooms_for_user(user_id)
+ joins = yield store.get_rooms_for_user(user_id)
my_receipts_by_room = yield store.get_receipts_for_user(
user_id, "m.read",
diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py
index 03141c623c..c43b30b73a 100644
--- a/synapse/rest/client/v1/voip.py
+++ b/synapse/rest/client/v1/voip.py
@@ -28,7 +28,10 @@ class VoipRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(
+ request,
+ self.hs.config.turn_allow_guests
+ )
turnUris = self.hs.config.turn_uris
turnSecret = self.hs.config.turn_shared_secret
diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py
index 31f94bc6e9..6fceb23e26 100644
--- a/synapse/rest/client/v2_alpha/thirdparty.py
+++ b/synapse/rest/client/v2_alpha/thirdparty.py
@@ -36,7 +36,7 @@ class ThirdPartyProtocolsServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
- yield self.auth.get_user_by_req(request)
+ yield self.auth.get_user_by_req(request, allow_guest=True)
protocols = yield self.appservice_handler.get_3pe_protocols()
defer.returnValue((200, protocols))
@@ -54,7 +54,7 @@ class ThirdPartyProtocolServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, protocol):
- yield self.auth.get_user_by_req(request)
+ yield self.auth.get_user_by_req(request, allow_guest=True)
protocols = yield self.appservice_handler.get_3pe_protocols(
only_protocol=protocol,
@@ -77,7 +77,7 @@ class ThirdPartyUserServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, protocol):
- yield self.auth.get_user_by_req(request)
+ yield self.auth.get_user_by_req(request, allow_guest=True)
fields = request.args
fields.pop("access_token", None)
@@ -101,7 +101,7 @@ class ThirdPartyLocationServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, protocol):
- yield self.auth.get_user_by_req(request)
+ yield self.auth.get_user_by_req(request, allow_guest=True)
fields = request.args
fields.pop("access_token", None)
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 43b5b49986..519059c306 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -152,7 +152,7 @@ class EventFederationStore(SQLBaseStore):
txn.execute(sql, (room_id, ))
results = []
- for event_id, depth in txn:
+ for event_id, depth in txn.fetchall():
hashes = self._get_event_reference_hashes_txn(txn, event_id)
prev_hashes = {
k: encode_base64(v) for k, v in hashes.items()
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 314216f039..fb23f6f462 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -496,7 +496,7 @@ class StateStore(SQLBaseStore):
state_map = yield self.get_state_ids_for_events([event_id], types)
defer.returnValue(state_map[event_id])
- @cached(num_args=2, max_entries=10000)
+ @cached(num_args=2, max_entries=100000)
def _get_state_group_for_event(self, room_id, event_id):
return self._simple_select_one_onecol(
table="event_to_state_groups",
diff --git a/synapse/types.py b/synapse/types.py
index 9666f9d73f..c87ed813b9 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -216,9 +216,7 @@ class StreamToken(
return self
def copy_and_replace(self, key, new_value):
- d = self._asdict()
- d[key] = new_value
- return StreamToken(**d)
+ return self._replace(**{key: new_value})
StreamToken.START = StreamToken(
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 35380bf8ed..1453faf0ef 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -89,6 +89,11 @@ class ObservableDeferred(object):
deferred.addCallbacks(callback, errback)
def observe(self):
+ """Observe the underlying deferred.
+
+ Can return either a deferred if the underlying deferred is still pending
+ (or has failed), or the actual value. Callers may need to use maybeDeferred.
+ """
if not self._result:
d = defer.Deferred()
@@ -101,7 +106,7 @@ class ObservableDeferred(object):
return d
else:
success, res = self._result
- return defer.succeed(res) if success else defer.fail(res)
+ return res if success else defer.fail(res)
def observers(self):
return self._observers
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 19595df422..9d0d0be1f9 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -15,12 +15,9 @@
import logging
from synapse.util.async import ObservableDeferred
-from synapse.util import unwrapFirstError
+from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
-from synapse.util.logcontext import (
- PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
-)
from . import DEBUG_CACHES, register_cache
@@ -227,8 +224,20 @@ class _CacheDescriptorBase(object):
)
self.num_args = num_args
+
+ # list of the names of the args used as the cache key
self.arg_names = all_args[1:num_args + 1]
+ # self.arg_defaults is a map of arg name to its default value for each
+ # argument that has a default value
+ if arg_spec.defaults:
+ self.arg_defaults = dict(zip(
+ all_args[-len(arg_spec.defaults):],
+ arg_spec.defaults
+ ))
+ else:
+ self.arg_defaults = {}
+
if "cache_context" in self.arg_names:
raise Exception(
"cache_context arg cannot be included among the cache keys"
@@ -292,18 +301,31 @@ class CacheDescriptor(_CacheDescriptorBase):
iterable=self.iterable,
)
+ def get_cache_key(args, kwargs):
+ """Given some args/kwargs return a generator that resolves into
+ the cache_key.
+
+ We loop through each arg name, looking up if its in the `kwargs`,
+ otherwise using the next argument in `args`. If there are no more
+ args then we try looking the arg name up in the defaults
+ """
+ pos = 0
+ for nm in self.arg_names:
+ if nm in kwargs:
+ yield kwargs[nm]
+ elif pos < len(args):
+ yield args[pos]
+ pos += 1
+ else:
+ yield self.arg_defaults[nm]
+
@functools.wraps(self.orig)
def wrapped(*args, **kwargs):
# If we're passed a cache_context then we'll want to call its invalidate()
# whenever we are invalidated
invalidate_callback = kwargs.pop("on_invalidate", None)
- # Add temp cache_context so inspect.getcallargs doesn't explode
- if self.add_cache_context:
- kwargs["cache_context"] = None
-
- arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
- cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names)
+ cache_key = tuple(get_cache_key(args, kwargs))
# Add our own `cache_context` to argument list if the wrapped function
# has asked for one
@@ -328,11 +350,9 @@ class CacheDescriptor(_CacheDescriptorBase):
defer.returnValue(cached_result)
observer.addCallback(check_result)
- return preserve_context_over_deferred(observer)
except KeyError:
ret = defer.maybeDeferred(
- preserve_context_over_fn,
- self.function_to_call,
+ logcontext.preserve_fn(self.function_to_call),
obj, *args, **kwargs
)
@@ -342,10 +362,14 @@ class CacheDescriptor(_CacheDescriptorBase):
ret.addErrback(onErr)
- ret = ObservableDeferred(ret, consumeErrors=True)
- cache.set(cache_key, ret, callback=invalidate_callback)
+ result_d = ObservableDeferred(ret, consumeErrors=True)
+ cache.set(cache_key, result_d, callback=invalidate_callback)
+ observer = result_d.observe()
- return preserve_context_over_deferred(ret.observe())
+ if isinstance(observer, defer.Deferred):
+ return logcontext.make_deferred_yieldable(observer)
+ else:
+ return observer
wrapped.invalidate = cache.invalidate
wrapped.invalidate_all = cache.invalidate_all
@@ -362,7 +386,11 @@ class CacheListDescriptor(_CacheDescriptorBase):
"""Wraps an existing cache to support bulk fetching of keys.
Given a list of keys it looks in the cache to find any hits, then passes
- the list of missing keys to the wrapped fucntion.
+ the list of missing keys to the wrapped function.
+
+ Once wrapped, the function returns either a Deferred which resolves to
+ the list of results, or (if all results were cached), just the list of
+ results.
"""
def __init__(self, orig, cached_method_name, list_name, num_args=None,
@@ -433,8 +461,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
args_to_call[self.list_name] = missing
ret_d = defer.maybeDeferred(
- preserve_context_over_fn,
- self.function_to_call,
+ logcontext.preserve_fn(self.function_to_call),
**args_to_call
)
@@ -443,8 +470,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
# We need to create deferreds for each arg in the list so that
# we can insert the new deferred into the cache.
for arg in missing:
- with PreserveLoggingContext():
- observer = ret_d.observe()
+ observer = ret_d.observe()
observer.addCallback(lambda r, arg: r.get(arg, None), arg)
observer = ObservableDeferred(observer)
@@ -471,7 +497,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
results.update(res)
return results
- return preserve_context_over_deferred(defer.gatherResults(
+ return logcontext.make_deferred_yieldable(defer.gatherResults(
cached_defers.values(),
consumeErrors=True,
).addCallback(update_results_dict).addErrback(
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index ff67b1d794..857afee7cb 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -310,6 +310,10 @@ def preserve_context_over_fn(fn, *args, **kwargs):
def preserve_context_over_deferred(deferred, context=None):
"""Given a deferred wrap it such that any callbacks added later to it will
be invoked with the current context.
+
+ Deprecated: this almost certainly doesn't do want you want, ie make
+ the deferred follow the synapse logcontext rules: try
+ ``make_deferred_yieldable`` instead.
"""
if context is None:
context = LoggingContext.current_context()
@@ -359,6 +363,25 @@ def preserve_fn(f):
return g
+@defer.inlineCallbacks
+def make_deferred_yieldable(deferred):
+ """Given a deferred, make it follow the Synapse logcontext rules:
+
+ If the deferred has completed (or is not actually a Deferred), essentially
+ does nothing (just returns another completed deferred with the
+ result/failure).
+
+ If the deferred has not yet completed, resets the logcontext before
+ returning a deferred. Then, when the deferred completes, restores the
+ current logcontext before running callbacks/errbacks.
+
+ (This is more-or-less the opposite operation to preserve_fn.)
+ """
+ with PreserveLoggingContext():
+ r = yield deferred
+ defer.returnValue(r)
+
+
# modules to ignore in `logcontext_tracer`
_to_ignore = [
"synapse.util.logcontext",
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 31659156ae..c4dd9ae2c7 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -56,7 +56,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
events ([synapse.events.EventBase]): list of events to filter
"""
forgotten = yield preserve_context_over_deferred(defer.gatherResults([
- preserve_fn(store.who_forgot_in_room)(
+ defer.maybeDeferred(
+ preserve_fn(store.who_forgot_in_room),
room_id,
)
for room_id in frozenset(e.room_id for e in events)
|