From 191c7bef6bbb80f66f66e95387940c3bb6b5a0cf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 24 Mar 2016 17:47:31 +0000 Subject: Deduplicate identical /sync requests --- synapse/util/caches/response_cache.py | 46 +++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 synapse/util/caches/response_cache.py (limited to 'synapse/util') diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py new file mode 100644 index 0000000000..1c2e344269 --- /dev/null +++ b/synapse/util/caches/response_cache.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.async import ObservableDeferred + + +class ResponseCache(object): + """ + This caches a deferred response. Until the deferred completes it will be + returned from the cache. This means that if the client retries the request + while the response is still being computed, that original response will be + used rather than trying to compute a new response. + """ + + def __init__(self): + self.pending_result_cache = {} # Request that haven't finished yet. + + def get(self, key): + result = self.pending_result_cache.get(key) + if result is not None: + return result.observe() + else: + return None + + def set(self, key, deferred): + result = ObservableDeferred(deferred) + self.pending_result_cache[key] = result + + def remove(r): + self.pending_result_cache.pop(key, None) + return r + + result.addBoth(remove) + return result.observe() -- cgit 1.5.1 From 77cba688edb9216f5c578c931e96142722641b70 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 24 Mar 2016 18:02:37 +0000 Subject: Fix typo --- synapse/util/caches/response_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 1c2e344269..be310ba320 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -25,7 +25,7 @@ class ResponseCache(object): """ def __init__(self): - self.pending_result_cache = {} # Request that haven't finished yet. + self.pending_result_cache = {} # Requests that haven't finished yet. def get(self, key): result = self.pending_result_cache.get(key) -- cgit 1.5.1 From 8d73cd502bd8ee6903c81f20f79fe5e1509692e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 14:06:00 +0100 Subject: Add concurrently_execute function --- synapse/handlers/message.py | 10 +---- synapse/handlers/room.py | 17 ++++---- synapse/handlers/sync.py | 98 +++++++++++++++++++-------------------------- synapse/util/async.py | 32 ++++++++++++++- 4 files changed, 82 insertions(+), 75 deletions(-) (limited to 'synapse/util') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5c50c611ba..0bb111d047 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -21,6 +21,7 @@ from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.types import UserID, RoomStreamToken, StreamToken @@ -556,14 +557,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - # Only do N rooms at once - n = 5 - d_list = [handle_room(e) for e in room_list] - for i in range(0, len(d_list), n): - yield defer.gatherResults( - d_list[i:i + n], - consumeErrors=True - ).addErrback(unwrapFirstError) + yield concurrently_execute(handle_room, room_list, 10) account_data_events = [] for account_data_type, content in account_data.items(): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ee99ded214..3e1d9282d7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -23,7 +23,8 @@ from synapse.api.constants import ( EventTypes, JoinRules, RoomCreationPreset, ) from synapse.api.errors import AuthError, StoreError, SynapseError -from synapse.util import stringutils, unwrapFirstError +from synapse.util import stringutils +from synapse.util.async import concurrently_execute from synapse.util.logcontext import preserve_context_over_fn from synapse.util.caches.response_cache import ResponseCache @@ -368,6 +369,8 @@ class RoomListHandler(BaseHandler): def _get_public_room_list(self): room_ids = yield self.store.get_public_room_ids() + results = [] + @defer.inlineCallbacks def handle_room(room_id): aliases = yield self.store.get_aliases_for_room(room_id) @@ -428,18 +431,12 @@ class RoomListHandler(BaseHandler): joined_users = yield self.store.get_users_in_room(room_id) result["num_joined_members"] = len(joined_users) - defer.returnValue(result) + results.append(result) - result = [] - for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)): - chunk_result = yield defer.gatherResults([ - handle_room(room_id) - for room_id in chunk - ], consumeErrors=True).addErrback(unwrapFirstError) - result.extend(v for v in chunk_result if v) + yield concurrently_execute(handle_room, room_ids, 10) # FIXME (erikj): START is no longer a valid value - defer.returnValue({"start": "START", "end": "END", "chunk": result}) + defer.returnValue({"start": "START", "end": "END", "chunk": results}) class RoomContextHandler(BaseHandler): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 06098f899e..e38fe1ef9c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -17,8 +17,8 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes -from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.async import concurrently_execute +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user @@ -250,64 +250,50 @@ class SyncHandler(BaseHandler): joined = [] invited = [] archived = [] - deferreds = [] user_id = sync_config.user.to_string() - def _should_include_room(event): - # Always send down rooms we were banned or kicked from. - if not sync_config.filter_collection.include_leave: - if event.membership == Membership.LEAVE: - if user_id == event.sender: - return False - return True - - room_list = filter(_should_include_room, room_list) - - room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)] - for room_list_chunk in room_list_chunks: - for event in room_list_chunk: - if event.membership == Membership.JOIN: - room_sync_deferred = preserve_fn( - self.full_state_sync_for_joined_room - )( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - room_sync_deferred.addCallback(joined.append) - deferreds.append(room_sync_deferred) - elif event.membership == Membership.INVITE: - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - room_sync_deferred = preserve_fn( - self.full_state_sync_for_archived_room - )( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - room_sync_deferred.addCallback(archived.append) - deferreds.append(room_sync_deferred) + @defer.inlineCallbacks + def _generate_room_entry(event): + if event.membership == Membership.JOIN: + room_result = yield self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) + joined.append(room_result) + elif event.membership == Membership.INVITE: + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if user_id == event.sender: + return + + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_result = yield self.full_state_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) + archived.append(room_result) - yield defer.gatherResults( - deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + yield concurrently_execute(_generate_room_entry, room_list, 10) account_data_for_user = sync_config.filter_collection.filter_account_data( self.account_data_for_user(account_data) diff --git a/synapse/util/async.py b/synapse/util/async.py index 640fae3890..a75e1c71fb 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,7 +16,8 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import PreserveLoggingContext, preserve_fn +from synapse.util import unwrapFirstError @defer.inlineCallbacks @@ -107,3 +108,32 @@ class ObservableDeferred(object): return "" % ( id(self), self._result, self._deferred, ) + + +def concurrently_execute(func, args, limit): + """Executes the function with each argument conncurrently while limiting + the number of concurrent executions. + + Args: + func (func): Function to execute, should return a deferred. + args (list): List of arguments to pass to func, each invocation of func + gets a signle argument. + limit (int): Maximum number of conccurent executions. + + Returns: + deferred + """ + it = iter(args) + + @defer.inlineCallbacks + def _concurrently_execute_inner(): + try: + while True: + yield func(it.next()) + except StopIteration: + pass + + return defer.gatherResults([ + preserve_fn(_concurrently_execute_inner)() + for _ in xrange(limit) + ], consumeErrors=True).addErrback(unwrapFirstError) -- cgit 1.5.1 From 3f4eb4c92402d80d0f41501bf71a60a1b94f2756 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 14:15:27 +0100 Subject: Comment --- synapse/util/async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index a75e1c71fb..cd4d90f3cf 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -121,7 +121,7 @@ def concurrently_execute(func, args, limit): limit (int): Maximum number of conccurent executions. Returns: - deferred + deferred: Resolved when all function invocations have finished. """ it = iter(args) -- cgit 1.5.1 From 87f2dec8d475f038beb138bc56e3ef76fcb83ec6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 6 Apr 2016 13:08:05 +0100 Subject: Make the cache objects be per instance rather than being global --- synapse/storage/receipts.py | 4 ++-- synapse/storage/registration.py | 2 +- synapse/storage/state.py | 4 ++-- synapse/util/caches/descriptors.py | 45 ++++++++++++++++++++------------------ 4 files changed, 29 insertions(+), 26 deletions(-) (limited to 'synapse/util') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 4befebc8e2..7fdd84bbdc 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -160,8 +160,8 @@ class ReceiptsStore(SQLBaseStore): "content": content, }]) - @cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids", - num_args=3, inlineCallbacks=True) + @cachedList(cached_method_name="get_linearized_receipts_for_room", + list_name="room_ids", num_args=3, inlineCallbacks=True) def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): if not room_ids: defer.returnValue({}) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d46a963bb8..1f71773aaa 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -319,7 +319,7 @@ class RegistrationStore(SQLBaseStore): defer.returnValue(res if res else False) - @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1, + @cachedList(cached_method_name="is_guest", list_name="user_ids", num_args=1, inlineCallbacks=True) def are_guests(self, user_ids): sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % ( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e9f9406014..c5d2a3a6df 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -273,8 +273,8 @@ class StateStore(SQLBaseStore): desc="_get_state_group_for_event", ) - @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", - num_args=1, inlineCallbacks=True) + @cachedList(cached_method_name="_get_state_group_for_event", + list_name="event_ids", num_args=1, inlineCallbacks=True) def _get_state_group_for_events(self, event_ids): """Returns mapping event_id -> state_group """ diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 35544b19fd..758f5982b0 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -167,7 +167,8 @@ class CacheDescriptor(object): % (orig.__name__,) ) - self.cache = Cache( + def __get__(self, obj, objtype=None): + cache = Cache( name=self.orig.__name__, max_entries=self.max_entries, keylen=self.num_args, @@ -175,14 +176,12 @@ class CacheDescriptor(object): tree=self.tree, ) - def __get__(self, obj, objtype=None): - @functools.wraps(self.orig) def wrapped(*args, **kwargs): arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names) try: - cached_result_d = self.cache.get(cache_key) + cached_result_d = cache.get(cache_key) observer = cached_result_d.observe() if DEBUG_CACHES: @@ -204,7 +203,7 @@ class CacheDescriptor(object): # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated # while the SELECT is executing (SYN-369) - sequence = self.cache.sequence + sequence = cache.sequence ret = defer.maybeDeferred( preserve_context_over_fn, @@ -213,20 +212,21 @@ class CacheDescriptor(object): ) def onErr(f): - self.cache.invalidate(cache_key) + cache.invalidate(cache_key) return f ret.addErrback(onErr) ret = ObservableDeferred(ret, consumeErrors=True) - self.cache.update(sequence, cache_key, ret) + cache.update(sequence, cache_key, ret) return preserve_context_over_deferred(ret.observe()) - wrapped.invalidate = self.cache.invalidate - wrapped.invalidate_all = self.cache.invalidate_all - wrapped.invalidate_many = self.cache.invalidate_many - wrapped.prefill = self.cache.prefill + wrapped.invalidate = cache.invalidate + wrapped.invalidate_all = cache.invalidate_all + wrapped.invalidate_many = cache.invalidate_many + wrapped.prefill = cache.prefill + wrapped.cache = cache obj.__dict__[self.orig.__name__] = wrapped @@ -240,11 +240,12 @@ class CacheListDescriptor(object): the list of missing keys to the wrapped fucntion. """ - def __init__(self, orig, cache, list_name, num_args=1, inlineCallbacks=False): + def __init__(self, orig, cached_method_name, list_name, num_args=1, + inlineCallbacks=False): """ Args: orig (function) - cache (Cache) + method_name (str); The name of the chached method. list_name (str): Name of the argument which is the bulk lookup list num_args (int) inlineCallbacks (bool): Whether orig is a generator that should @@ -263,7 +264,7 @@ class CacheListDescriptor(object): self.arg_names = inspect.getargspec(orig).args[1:num_args + 1] self.list_pos = self.arg_names.index(self.list_name) - self.cache = cache + self.cached_method_name = cached_method_name self.sentinel = object() @@ -277,11 +278,13 @@ class CacheListDescriptor(object): if self.list_name not in self.arg_names: raise Exception( "Couldn't see arguments %r for %r." - % (self.list_name, cache.name,) + % (self.list_name, cached_method_name,) ) def __get__(self, obj, objtype=None): + cache = getattr(obj, self.cached_method_name).cache + @functools.wraps(self.orig) def wrapped(*args, **kwargs): arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) @@ -297,14 +300,14 @@ class CacheListDescriptor(object): key[self.list_pos] = arg try: - res = self.cache.get(tuple(key)).observe() + res = cache.get(tuple(key)).observe() res.addCallback(lambda r, arg: (arg, r), arg) cached[arg] = res except KeyError: missing.append(arg) if missing: - sequence = self.cache.sequence + sequence = cache.sequence args_to_call = dict(arg_dict) args_to_call[self.list_name] = missing @@ -327,10 +330,10 @@ class CacheListDescriptor(object): key = list(keyargs) key[self.list_pos] = arg - self.cache.update(sequence, tuple(key), observer) + cache.update(sequence, tuple(key), observer) def invalidate(f, key): - self.cache.invalidate(key) + cache.invalidate(key) return f observer.addErrback(invalidate, tuple(key)) @@ -370,7 +373,7 @@ def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False): ) -def cachedList(cache, list_name, num_args=1, inlineCallbacks=False): +def cachedList(cached_method_name, list_name, num_args=1, inlineCallbacks=False): """Creates a descriptor that wraps a function in a `CacheListDescriptor`. Used to do batch lookups for an already created cache. A single argument @@ -400,7 +403,7 @@ def cachedList(cache, list_name, num_args=1, inlineCallbacks=False): """ return lambda orig: CacheListDescriptor( orig, - cache=cache, + cached_method_name=cached_method_name, list_name=list_name, num_args=num_args, inlineCallbacks=inlineCallbacks, -- cgit 1.5.1 From af03ecf35223f93971596f38393c62f4694705fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Apr 2016 15:44:22 +0100 Subject: Deduplicate joins --- synapse/handlers/room_member.py | 31 ++++++++++++++++++++++++ synapse/util/async.py | 42 +++++++++++++++++++++++++++++++++ synapse/util/caches/response_cache.py | 2 +- tests/util/test_linearizer.py | 44 +++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 tests/util/test_linearizer.py (limited to 'synapse/util') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index fe2315df8f..0fcc9445a8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -24,6 +24,7 @@ from synapse.api.constants import ( ) from synapse.api.errors import AuthError, SynapseError, Codes from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.async import Linearizer from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -60,6 +61,8 @@ class RoomMemberHandler(BaseHandler): def __init__(self, hs): super(RoomMemberHandler, self).__init__(hs) + self.member_linearizer = Linearizer() + self.clock = hs.get_clock() self.distributor = hs.get_distributor() @@ -182,6 +185,34 @@ class RoomMemberHandler(BaseHandler): remote_room_hosts=None, third_party_signed=None, ratelimit=True, + ): + key = (target, room_id,) + + with (yield self.member_linearizer.queue(key)): + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _update_membership( + self, + requester, + target, + room_id, + action, + txn_id=None, + remote_room_hosts=None, + third_party_signed=None, + ratelimit=True, ): effective_membership_state = action if action in ["kick", "unban"]: diff --git a/synapse/util/async.py b/synapse/util/async.py index cd4d90f3cf..408c86be91 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -19,6 +19,8 @@ from twisted.internet import defer, reactor from .logcontext import PreserveLoggingContext, preserve_fn from synapse.util import unwrapFirstError +from contextlib import contextmanager + @defer.inlineCallbacks def sleep(seconds): @@ -137,3 +139,43 @@ def concurrently_execute(func, args, limit): preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) ], consumeErrors=True).addErrback(unwrapFirstError) + + +@contextmanager +def _trigger_defer_manager(d): + try: + yield + finally: + d.callback(None) + + +class Linearizer(object): + """Linearizes access to resources based on a key. Useful to ensure only one + thing is happening at a time on a given resource. + + Example: + + with (yield linearizer.queue("test_key")): + # do some work. + + """ + def __init__(self): + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + current_defer = self.key_to_defer.get(key) + + new_defer = defer.Deferred() + self.key_to_defer[key] = new_defer + + def remove_if_current(_): + d = self.key_to_defer.get(key) + if d is new_defer: + self.key_to_defer.pop(key, None) + + new_defer.addBoth(remove_if_current) + + yield current_defer + + defer.returnValue(_trigger_defer_manager(new_defer)) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index be310ba320..36686b479e 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -35,7 +35,7 @@ class ResponseCache(object): return None def set(self, key, deferred): - result = ObservableDeferred(deferred) + result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result def remove(r): diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py new file mode 100644 index 0000000000..afcba482f9 --- /dev/null +++ b/tests/util/test_linearizer.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import unittest + +from twisted.internet import defer + +from synapse.util.async import Linearizer + + +class LinearizerTestCase(unittest.TestCase): + + @defer.inlineCallbacks + def test_linearizer(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + with cm1: + self.assertFalse(d2.called) + + self.assertTrue(d2.called) + + with (yield d2): + pass -- cgit 1.5.1 From 639cd07d6d4e22e3413349bbd3bfb33db37a8d2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Apr 2016 14:24:12 +0100 Subject: Add comment --- synapse/util/async.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 408c86be91..14a3dfd43f 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -164,6 +164,14 @@ class Linearizer(object): @defer.inlineCallbacks def queue(self, key): + # If there is already a deferred in the queue, we pull it out so that + # we can wait on it later. + # Then we replace it with a deferred that we resolve *after* the + # context manager has exited. + # We only return the context manager after the previous deferred has + # resolved. + # This all has the net effect of creating a chain of deferreds that + # wait for the previous deferred before starting their work. current_defer = self.key_to_defer.get(key) new_defer = defer.Deferred() -- cgit 1.5.1 From ee5aef6c72575045fc441076b29b0c06eb46a28c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Apr 2016 15:29:34 +0100 Subject: Log contexts and squash things together --- synapse/util/async.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 14a3dfd43f..072b6362b5 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,7 +16,9 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext, preserve_fn +from .logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util import unwrapFirstError from contextlib import contextmanager @@ -141,14 +143,6 @@ def concurrently_execute(func, args, limit): ], consumeErrors=True).addErrback(unwrapFirstError) -@contextmanager -def _trigger_defer_manager(d): - try: - yield - finally: - d.callback(None) - - class Linearizer(object): """Linearizes access to resources based on a key. Useful to ensure only one thing is happening at a time on a given resource. @@ -177,13 +171,17 @@ class Linearizer(object): new_defer = defer.Deferred() self.key_to_defer[key] = new_defer - def remove_if_current(_): - d = self.key_to_defer.get(key) - if d is new_defer: - self.key_to_defer.pop(key, None) - - new_defer.addBoth(remove_if_current) + if current_defer: + yield preserve_context_over_deferred(current_defer) - yield current_defer + @contextmanager + def _ctx_manager(d): + try: + yield + finally: + d.callback(None) + d = self.key_to_defer.get(key) + if d is new_defer: + self.key_to_defer.pop(key, None) - defer.returnValue(_trigger_defer_manager(new_defer)) + defer.returnValue(_ctx_manager(new_defer)) -- cgit 1.5.1 From 95ac3078da54908855721361b1305ed0c41215d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Apr 2016 16:07:16 +0100 Subject: Rename things --- synapse/util/async.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 072b6362b5..0d6f48e2d8 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -175,13 +175,13 @@ class Linearizer(object): yield preserve_context_over_deferred(current_defer) @contextmanager - def _ctx_manager(d): + def _ctx_manager(): try: yield finally: - d.callback(None) - d = self.key_to_defer.get(key) - if d is new_defer: + new_defer.callback(None) + current_d = self.key_to_defer.get(key) + if current_d is new_defer: self.key_to_defer.pop(key, None) - defer.returnValue(_ctx_manager(new_defer)) + defer.returnValue(_ctx_manager()) -- cgit 1.5.1 From b9ee5650b0027b664aa700a7ce451a546f404350 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 8 Apr 2016 11:01:38 +0100 Subject: Move all the wrapper functions for distributor.fire Move the functions inside the distributor and import them where needed. This reduces duplication and makes it possible for flake8 to detect when the functions aren't used in a given file. --- synapse/handlers/federation.py | 5 +---- synapse/handlers/register.py | 5 +---- synapse/handlers/room.py | 15 --------------- synapse/handlers/room_member.py | 16 +--------------- synapse/util/distributor.py | 22 +++++++++++++++++++++- 5 files changed, 24 insertions(+), 39 deletions(-) (limited to 'synapse/util') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eb02f0e000..c28226f840 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -40,6 +40,7 @@ from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination from synapse.push.action_generator import ActionGenerator +from synapse.util.distributor import user_joined_room from twisted.internet import defer @@ -49,10 +50,6 @@ import logging logger = logging.getLogger(__name__) -def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user, room_id) - - class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index f287ee247b..b0862067e1 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -23,6 +23,7 @@ from synapse.api.errors import ( from ._base import BaseHandler from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient +from synapse.util.distributor import registered_user import logging import urllib @@ -30,10 +31,6 @@ import urllib logger = logging.getLogger(__name__) -def registered_user(distributor, user): - return distributor.fire("registered_user", user) - - class RegistrationHandler(BaseHandler): def __init__(self, hs): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3e1d9282d7..ea306cd42a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,7 +25,6 @@ from synapse.api.constants import ( from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.util import stringutils from synapse.util.async import concurrently_execute -from synapse.util.logcontext import preserve_context_over_fn from synapse.util.caches.response_cache import ResponseCache from collections import OrderedDict @@ -39,20 +38,6 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" -def user_left_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_left_room", user=user, room_id=room_id - ) - - -def user_joined_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_joined_room", user=user, room_id=room_id - ) - - class RoomCreationHandler(BaseHandler): PRESETS_DICT = { diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index b6ef3c91af..753c75d9c1 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -23,8 +23,8 @@ from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes -from synapse.util.logcontext import preserve_context_over_fn from synapse.util.async import Linearizer +from synapse.util.distributor import user_left_room, user_joined_room from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -38,20 +38,6 @@ logger = logging.getLogger(__name__) id_server_scheme = "https://" -def user_left_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_left_room", user=user, room_id=room_id - ) - - -def user_joined_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_joined_room", user=user, room_id=room_id - ) - - class RoomMemberHandler(BaseHandler): # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 8875813de4..d7cccc06b1 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -15,7 +15,9 @@ from twisted.internet import defer -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_context_over_fn +) from synapse.util import unwrapFirstError @@ -25,6 +27,24 @@ import logging logger = logging.getLogger(__name__) +def registered_user(distributor, user): + return distributor.fire("registered_user", user) + + +def user_left_room(distributor, user, room_id): + return preserve_context_over_fn( + distributor.fire, + "user_left_room", user=user, room_id=room_id + ) + + +def user_joined_room(distributor, user, room_id): + return preserve_context_over_fn( + distributor.fire, + "user_joined_room", user=user, room_id=room_id + ) + + class Distributor(object): """A central dispatch point for loosely-connected pieces of code to register, observe, and fire signals. -- cgit 1.5.1 From 7e2f971c08250cf432d43dd6244faefb2074ff8c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 8 Apr 2016 14:01:56 +0100 Subject: Remove some unused functions (#711) * Remove some unused functions * get_room_events_stream is only used in tests * is_exclusive_room might actually be something we want --- synapse/appservice/api.py | 5 - synapse/handlers/message.py | 29 ------ synapse/handlers/room_member.py | 13 --- synapse/storage/_base.py | 6 -- synapse/storage/prepare_database.py | 12 --- synapse/storage/presence.py | 10 -- synapse/storage/roommember.py | 33 ------- synapse/storage/stream.py | 90 ------------------ synapse/util/__init__.py | 3 - synapse/util/ratelimitutils.py | 14 --- synapse/util/stringutils.py | 4 - tests/storage/test_presence.py | 27 ------ tests/storage/test_redaction.py | 51 +--------- tests/storage/test_roommember.py | 7 -- tests/storage/test_stream.py | 185 ------------------------------------ 15 files changed, 4 insertions(+), 485 deletions(-) delete mode 100644 tests/storage/test_stream.py (limited to 'synapse/util') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index bc90605324..6da6a1b62e 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -100,11 +100,6 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("push_bulk to %s threw exception %s", uri, ex) defer.returnValue(False) - @defer.inlineCallbacks - def push(self, service, event, txn_id=None): - response = yield self.push_bulk(service, [event], txn_id) - defer.returnValue(response) - def _serialize(self, events): time_now = self.clock.time_msec() return [ diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fa78d4acec..f51feda2f4 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -44,35 +44,6 @@ class MessageHandler(BaseHandler): self.validator = EventValidator() self.snapshot_cache = SnapshotCache() - @defer.inlineCallbacks - def get_message(self, msg_id=None, room_id=None, sender_id=None, - user_id=None): - """ Retrieve a message. - - Args: - msg_id (str): The message ID to obtain. - room_id (str): The room where the message resides. - sender_id (str): The user ID of the user who sent the message. - user_id (str): The user ID of the user making this request. - Returns: - The message, or None if no message exists. - Raises: - SynapseError if something went wrong. - """ - yield self.auth.check_joined_room(room_id, user_id) - - # Pull out the message from the db -# msg = yield self.store.get_message( -# room_id=room_id, -# msg_id=msg_id, -# user_id=sender_id -# ) - - # TODO (erikj): Once we work out the correct c-s api we need to think - # on how to do this. - - defer.returnValue(None) - @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True): diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 753c75d9c1..b69f36aefe 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -392,19 +392,6 @@ class RoomMemberHandler(BaseHandler): and guest_access.content["guest_access"] == "can_join" ) - def _should_do_dance(self, current_state, inviter, room_hosts=None): - # TODO: Shouldn't this be remote_room_host? - room_hosts = room_hosts or [] - - is_host_in_room = self.is_host_in_room(current_state) - if is_host_in_room: - return False, room_hosts - - if inviter and not self.hs.is_mine(inviter): - room_hosts.append(inviter.domain) - - return True, room_hosts - @defer.inlineCallbacks def lookup_room_alias(self, room_alias): """ diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 04d7fcf6d6..1e27c2c0ce 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -810,12 +810,6 @@ class SQLBaseStore(object): return txn.execute(sql, keyvalues.values()) - def get_next_stream_id(self): - with self._next_stream_id_lock: - i = self._next_stream_id - self._next_stream_id += 1 - return i - def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value): # Fetch a mapping of room_id -> max stream position for "recent" rooms. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 00833422af..57f14fd12b 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -30,18 +30,6 @@ SCHEMA_VERSION = 31 dir_path = os.path.abspath(os.path.dirname(__file__)) -def read_schema(path): - """ Read the named database schema. - - Args: - path: Path of the database schema. - Returns: - A string containing the database schema. - """ - with open(path) as schema_file: - return schema_file.read() - - class PrepareDatabaseException(Exception): pass diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 59b4ef5ce6..07f5fae8dd 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -176,16 +176,6 @@ class PresenceStore(SQLBaseStore): desc="disallow_presence_visible", ) - def is_presence_visible(self, observed_localpart, observer_userid): - return self._simple_select_one( - table="presence_allow_inbound", - keyvalues={"observed_user_id": observed_localpart, - "observer_user_id": observer_userid}, - retcols=["observed_user_id"], - allow_none=True, - desc="is_presence_visible", - ) - def add_presence_list_pending(self, observer_localpart, observed_userid): return self._simple_insert( table="presence_list", diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 66e7a40e3c..77518e893f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -121,26 +121,6 @@ class RoomMemberStore(SQLBaseStore): with self._stream_id_gen.get_next() as stream_ordering: yield self.runInteraction("locally_reject_invite", f, stream_ordering) - def get_room_member(self, user_id, room_id): - """Retrieve the current state of a room member. - - Args: - user_id (str): The member's user ID. - room_id (str): The room the member is in. - Returns: - Deferred: Results in a MembershipEvent or None. - """ - return self.runInteraction( - "get_room_member", - self._get_members_events_txn, - room_id, - user_id=user_id, - ).addCallback( - self._get_events - ).addCallback( - lambda events: events[0] if events else None - ) - @cached(max_entries=5000) def get_users_in_room(self, room_id): def f(txn): @@ -203,19 +183,6 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(invite) defer.returnValue(None) - def get_leave_and_ban_events_for_user(self, user_id): - """ Get all the leave events for a user - Args: - user_id (str): The user ID. - Returns: - A deferred list of event objects. - """ - return self.get_rooms_for_user_where_membership_is( - user_id, (Membership.LEAVE, Membership.BAN) - ).addCallback(lambda leaves: self._get_events([ - leave.event_id for leave in leaves - ])) - def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user matches one in the membership list. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 76bcd9cd00..95b12559a6 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -303,96 +303,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(ret) - def get_room_events_stream( - self, - user_id, - from_key, - to_key, - limit=0, - is_guest=False, - room_ids=None - ): - room_ids = room_ids or [] - room_ids = [r for r in room_ids] - if is_guest: - current_room_membership_sql = ( - "SELECT c.room_id FROM history_visibility AS h" - " INNER JOIN current_state_events AS c" - " ON h.event_id = c.event_id" - " WHERE c.room_id IN (%s)" - " AND h.history_visibility = 'world_readable'" % ( - ",".join(map(lambda _: "?", room_ids)) - ) - ) - current_room_membership_args = room_ids - else: - current_room_membership_sql = ( - "SELECT m.room_id FROM room_memberships as m " - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id AND c.state_key = m.user_id" - " WHERE m.user_id = ? AND m.membership = 'join'" - ) - current_room_membership_args = [user_id] - - # We also want to get any membership events about that user, e.g. - # invites or leave notifications. - membership_sql = ( - "SELECT m.event_id FROM room_memberships as m " - "INNER JOIN current_state_events as c ON m.event_id = c.event_id " - "WHERE m.user_id = ? " - ) - membership_args = [user_id] - - if limit: - limit = max(limit, MAX_STREAM_SIZE) - else: - limit = MAX_STREAM_SIZE - - # From and to keys should be integers from ordering. - from_id = RoomStreamToken.parse_stream_token(from_key) - to_id = RoomStreamToken.parse_stream_token(to_key) - - if from_key == to_key: - return defer.succeed(([], to_key)) - - sql = ( - "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " - "(e.outlier = ? AND (room_id IN (%(current)s)) OR " - "(event_id IN (%(invites)s))) " - "AND e.stream_ordering > ? AND e.stream_ordering <= ? " - "ORDER BY stream_ordering ASC LIMIT %(limit)d " - ) % { - "current": current_room_membership_sql, - "invites": membership_sql, - "limit": limit - } - - def f(txn): - args = ([False] + current_room_membership_args + membership_args + - [from_id.stream, to_id.stream]) - txn.execute(sql, args) - - rows = self.cursor_to_dict(txn) - - ret = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) - - self._set_before_and_after(ret, rows) - - if rows: - key = "s%d" % max(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = to_key - - return ret, key - - return self.runInteraction("get_room_events_stream", f) - @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, direction='b', limit=-1): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 3b9da5b34a..b462495eb8 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -49,9 +49,6 @@ class Clock(object): l.start(msec / 1000.0, now=False) return l - def stop_looping_call(self, loop): - loop.stop() - def call_later(self, delay, callback, *args, **kwargs): """Call something later diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 4076eed269..1101881a2d 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -100,20 +100,6 @@ class _PerHostRatelimiter(object): self.current_processing = set() self.request_times = [] - def is_empty(self): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - return not ( - self.ready_request_queue - or self.sleeping_requests - or self.current_processing - or self.request_times - ) - @contextlib.contextmanager def ratelimit(self): # `contextlib.contextmanager` takes a generator and turns it into a diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index b490bb8725..a100f151d4 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -21,10 +21,6 @@ _string_with_symbols = ( ) -def origin_from_ucid(ucid): - return ucid.split("@", 1)[1] - - def random_string(length): return ''.join(random.choice(string.ascii_letters) for _ in xrange(length)) diff --git a/tests/storage/test_presence.py b/tests/storage/test_presence.py index ec78f007ca..63203cea35 100644 --- a/tests/storage/test_presence.py +++ b/tests/storage/test_presence.py @@ -34,33 +34,6 @@ class PresenceStoreTestCase(unittest.TestCase): self.u_apple = UserID.from_string("@apple:test") self.u_banana = UserID.from_string("@banana:test") - @defer.inlineCallbacks - def test_visibility(self): - self.assertFalse((yield self.store.is_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ))) - - yield self.store.allow_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ) - - self.assertTrue((yield self.store.is_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ))) - - yield self.store.disallow_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ) - - self.assertFalse((yield self.store.is_presence_visible( - observed_localpart=self.u_apple.localpart, - observer_userid=self.u_banana.to_string(), - ))) - @defer.inlineCallbacks def test_presence_list(self): self.assertEquals( diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 5880409867..6afaca3a61 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -110,22 +110,10 @@ class RedactionTestCase(unittest.TestCase): self.room1, self.u_alice, Membership.JOIN ) - start = yield self.store.get_room_events_max_id() - msg_event = yield self.inject_message(self.room1, self.u_alice, u"t") - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - # Check event has not been redacted: - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertObjectHasAttributes( { @@ -144,17 +132,7 @@ class RedactionTestCase(unittest.TestCase): self.room1, msg_event.event_id, self.u_alice, reason ) - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - # Check redaction - - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertEqual(msg_event.event_id, event.event_id) @@ -184,25 +162,12 @@ class RedactionTestCase(unittest.TestCase): self.room1, self.u_alice, Membership.JOIN ) - start = yield self.store.get_room_events_max_id() - msg_event = yield self.inject_room_member( self.room1, self.u_bob, Membership.JOIN, extra_content={"blue": "red"}, ) - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - # Check event has not been redacted: - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertObjectHasAttributes( { @@ -221,17 +186,9 @@ class RedactionTestCase(unittest.TestCase): self.room1, msg_event.event_id, self.u_alice, reason ) - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - # Check redaction - event = results[0] + event = yield self.store.get_event(msg_event.event_id) self.assertTrue("redacted_because" in event.unsigned) diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py index b029ff0584..997090fe35 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py @@ -70,13 +70,6 @@ class RoomMemberStoreTestCase(unittest.TestCase): def test_one_member(self): yield self.inject_room_member(self.room, self.u_alice, Membership.JOIN) - self.assertEquals( - Membership.JOIN, - (yield self.store.get_room_member( - user_id=self.u_alice.to_string(), - room_id=self.room.to_string(), - )).membership - ) self.assertEquals( [self.u_alice.to_string()], [m.user_id for m in ( diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py deleted file mode 100644 index da322152c7..0000000000 --- a/tests/storage/test_stream.py +++ /dev/null @@ -1,185 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from tests import unittest -from twisted.internet import defer - -from synapse.api.constants import EventTypes, Membership -from synapse.types import UserID, RoomID -from tests.storage.event_injector import EventInjector - -from tests.utils import setup_test_homeserver - -from mock import Mock - - -class StreamStoreTestCase(unittest.TestCase): - - @defer.inlineCallbacks - def setUp(self): - hs = yield setup_test_homeserver( - resource_for_federation=Mock(), - http_client=None, - ) - - self.store = hs.get_datastore() - self.event_builder_factory = hs.get_event_builder_factory() - self.event_injector = EventInjector(hs) - self.handlers = hs.get_handlers() - self.message_handler = self.handlers.message_handler - - self.u_alice = UserID.from_string("@alice:test") - self.u_bob = UserID.from_string("@bob:test") - - self.room1 = RoomID.from_string("!abc123:test") - self.room2 = RoomID.from_string("!xyx987:test") - - @defer.inlineCallbacks - def test_event_stream_get_other(self): - # Both bob and alice joins the room - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - # Initial stream key: - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_message(self.room1, self.u_alice, u"test") - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_bob.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - event = results[0] - - self.assertObjectHasAttributes( - { - "type": EventTypes.Message, - "user_id": self.u_alice.to_string(), - "content": {"body": "test", "msgtype": "message"}, - }, - event, - ) - - @defer.inlineCallbacks - def test_event_stream_get_own(self): - # Both bob and alice joins the room - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - # Initial stream key: - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_message(self.room1, self.u_alice, u"test") - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_alice.to_string(), - start, - end, - ) - - self.assertEqual(1, len(results)) - - event = results[0] - - self.assertObjectHasAttributes( - { - "type": EventTypes.Message, - "user_id": self.u_alice.to_string(), - "content": {"body": "test", "msgtype": "message"}, - }, - event, - ) - - @defer.inlineCallbacks - def test_event_stream_join_leave(self): - # Both bob and alice joins the room - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - # Then bob leaves again. - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.LEAVE - ) - - # Initial stream key: - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_message(self.room1, self.u_alice, u"test") - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_bob.to_string(), - start, - end, - ) - - # We should not get the message, as it happened *after* bob left. - self.assertEqual(0, len(results)) - - @defer.inlineCallbacks - def test_event_stream_prev_content(self): - yield self.event_injector.inject_room_member( - self.room1, self.u_bob, Membership.JOIN - ) - - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN - ) - - start = yield self.store.get_room_events_max_id() - - yield self.event_injector.inject_room_member( - self.room1, self.u_alice, Membership.JOIN, - ) - - end = yield self.store.get_room_events_max_id() - - results, _ = yield self.store.get_room_events_stream( - self.u_bob.to_string(), - start, - end, - ) - - # We should not get the message, as it happened *after* bob left. - self.assertEqual(1, len(results)) - - event = results[0] - - self.assertTrue( - "prev_content" in event.unsigned, - msg="No prev_content key" - ) -- cgit 1.5.1 From 82d7eea7e3127445d745b5a1e2f2c636a590067e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 11 Apr 2016 14:57:09 +0100 Subject: Move the versionstring code out of app.homeserver into util --- synapse/app/homeserver.py | 87 ++----------------------------------------- synapse/util/rlimit.py | 37 ++++++++++++++++++ synapse/util/versionstring.py | 84 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 83 deletions(-) create mode 100644 synapse/util/rlimit.py create mode 100644 synapse/util/versionstring.py (limited to 'synapse/util') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2b4473b9ac..d2085a9405 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,8 +20,6 @@ import contextlib import logging import os import re -import resource -import subprocess import sys import time from synapse.config._base import ConfigError @@ -66,6 +64,9 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.federation.transport.server import TransportLayerServer +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + from synapse import events from daemonize import Daemonize @@ -269,86 +270,6 @@ def quit_with_error(error_string): sys.exit(1) -def get_version_string(): - try: - null = open(os.devnull, 'w') - cwd = os.path.dirname(os.path.abspath(__file__)) - try: - git_branch = subprocess.check_output( - ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], - stderr=null, - cwd=cwd, - ).strip() - git_branch = "b=" + git_branch - except subprocess.CalledProcessError: - git_branch = "" - - try: - git_tag = subprocess.check_output( - ['git', 'describe', '--exact-match'], - stderr=null, - cwd=cwd, - ).strip() - git_tag = "t=" + git_tag - except subprocess.CalledProcessError: - git_tag = "" - - try: - git_commit = subprocess.check_output( - ['git', 'rev-parse', '--short', 'HEAD'], - stderr=null, - cwd=cwd, - ).strip() - except subprocess.CalledProcessError: - git_commit = "" - - try: - dirty_string = "-this_is_a_dirty_checkout" - is_dirty = subprocess.check_output( - ['git', 'describe', '--dirty=' + dirty_string], - stderr=null, - cwd=cwd, - ).strip().endswith(dirty_string) - - git_dirty = "dirty" if is_dirty else "" - except subprocess.CalledProcessError: - git_dirty = "" - - if git_branch or git_tag or git_commit or git_dirty: - git_version = ",".join( - s for s in - (git_branch, git_tag, git_commit, git_dirty,) - if s - ) - - return ( - "Synapse/%s (%s)" % ( - synapse.__version__, git_version, - ) - ).encode("ascii") - except Exception as e: - logger.info("Failed to check for git repository: %s", e) - - return ("Synapse/%s" % (synapse.__version__,)).encode("ascii") - - -def change_resource_limit(soft_file_no): - try: - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - - if not soft_file_no: - soft_file_no = hard - - resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) - logger.info("Set file limit to: %d", soft_file_no) - - resource.setrlimit( - resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) - ) - except (ValueError, resource.error) as e: - logger.warn("Failed to set file or core limit: %s", e) - - def setup(config_options): """ Args: @@ -378,7 +299,7 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = get_version_string() + version_string = get_version_string("Synapse", synapse) logger.info("Server hostname: %s", config.server_name) logger.info("Server version: %s", version_string) diff --git a/synapse/util/rlimit.py b/synapse/util/rlimit.py new file mode 100644 index 0000000000..f4a9abf83f --- /dev/null +++ b/synapse/util/rlimit.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import resource +import logging + + +logger = logging.getLogger("synapse.app.homeserver") + + +def change_resource_limit(soft_file_no): + try: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + + if not soft_file_no: + soft_file_no = hard + + resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) + logger.info("Set file limit to: %d", soft_file_no) + + resource.setrlimit( + resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) + ) + except (ValueError, resource.error) as e: + logger.warn("Failed to set file or core limit: %s", e) diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py new file mode 100644 index 0000000000..a4f156cb3b --- /dev/null +++ b/synapse/util/versionstring.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import os +import logging + +logger = logging.getLogger(__name__) + + +def get_version_string(name, module): + try: + null = open(os.devnull, 'w') + cwd = os.path.dirname(os.path.abspath(module.__file__)) + try: + git_branch = subprocess.check_output( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + stderr=null, + cwd=cwd, + ).strip() + git_branch = "b=" + git_branch + except subprocess.CalledProcessError: + git_branch = "" + + try: + git_tag = subprocess.check_output( + ['git', 'describe', '--exact-match'], + stderr=null, + cwd=cwd, + ).strip() + git_tag = "t=" + git_tag + except subprocess.CalledProcessError: + git_tag = "" + + try: + git_commit = subprocess.check_output( + ['git', 'rev-parse', '--short', 'HEAD'], + stderr=null, + cwd=cwd, + ).strip() + except subprocess.CalledProcessError: + git_commit = "" + + try: + dirty_string = "-this_is_a_dirty_checkout" + is_dirty = subprocess.check_output( + ['git', 'describe', '--dirty=' + dirty_string], + stderr=null, + cwd=cwd, + ).strip().endswith(dirty_string) + + git_dirty = "dirty" if is_dirty else "" + except subprocess.CalledProcessError: + git_dirty = "" + + if git_branch or git_tag or git_commit or git_dirty: + git_version = ",".join( + s for s in + (git_branch, git_tag, git_commit, git_dirty,) + if s + ) + + return ( + "%s/%s (%s)" % ( + name, module.__version__, git_version, + ) + ).encode("ascii") + except Exception as e: + logger.info("Failed to check for git repository: %s", e) + + return ("%s/%s" % (name, module.__version__,)).encode("ascii") -- cgit 1.5.1 From cb9c465707f265fb2b4606269495ddb53d84db05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Apr 2016 10:21:32 +0100 Subject: Use SynapseError 504 for Timeout errors --- synapse/util/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index b462495eb8..2b3f0bef3c 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.api.errors import SynapseError from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -80,7 +81,7 @@ class Clock(object): def timed_out_fn(): try: - ret_deferred.errback(RuntimeError("Timed out")) + ret_deferred.errback(SynapseError(504, "Timed out")) except: pass -- cgit 1.5.1 From eb8619e2562432cdd22a625bb612c907c63f723c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Apr 2016 16:08:32 +0100 Subject: Create log context in Measure if one doesn't exist --- synapse/util/metrics.py | 23 +++++++++++++++++------ tests/test_state.py | 4 ++-- 2 files changed, 19 insertions(+), 8 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index c51b641125..e1f374807e 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -50,7 +50,7 @@ block_db_txn_duration = metrics.register_distribution( class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", - "ru_stime", "db_txn_count", "db_txn_duration" + "ru_stime", "db_txn_count", "db_txn_duration", "created_context" ] def __init__(self, clock, name): @@ -58,14 +58,20 @@ class Measure(object): self.name = name self.start_context = None self.start = None + self.created_context = False def __enter__(self): self.start = self.clock.time_msec() self.start_context = LoggingContext.current_context() - if self.start_context: - self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() - self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration = self.start_context.db_txn_duration + if not self.start_context: + logger.warn("Entered Measure without log context: %s", self.name) + self.start_context = LoggingContext("Measure") + self.start_context.__enter__() + self.created_context = True + + self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() + self.db_txn_count = self.start_context.db_txn_count + self.db_txn_duration = self.start_context.db_txn_duration def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None or not self.start_context: @@ -91,7 +97,12 @@ class Measure(object): block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name) block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name) - block_db_txn_count.inc_by(context.db_txn_count - self.db_txn_count, self.name) + block_db_txn_count.inc_by( + context.db_txn_count - self.db_txn_count, self.name + ) block_db_txn_duration.inc_by( context.db_txn_duration - self.db_txn_duration, self.name ) + + if self.created_context: + self.start_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/tests/test_state.py b/tests/test_state.py index a1ea7ef672..1a11bbcee0 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -140,13 +140,13 @@ class StateTestCase(unittest.TestCase): "add_event_hashes", ] ) - hs = Mock(spec=[ + hs = Mock(spec_set=[ "get_datastore", "get_auth", "get_state_handler", "get_clock", ]) hs.get_datastore.return_value = self.store hs.get_state_handler.return_value = None - hs.get_auth.return_value = Auth(hs) hs.get_clock.return_value = MockClock() + hs.get_auth.return_value = Auth(hs) self.state = StateHandler(hs) self.event_id = 0 -- cgit 1.5.1 From c10ed26c303741fe0e43f11e2fbeeb148f466b17 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 21 Apr 2016 19:19:07 +0100 Subject: Flesh out email templating Mostly WIP porting the room name calculation logic from the web client so our room names in the email mirror the clients. --- synapse/push/emailpusher.py | 7 ++ synapse/push/mailer.py | 61 +++++++++++++++++- synapse/python_dependencies.py | 2 +- synapse/util/room_name.py | 142 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 208 insertions(+), 4 deletions(-) create mode 100644 synapse/util/room_name.py (limited to 'synapse/util') diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 4e21221fb7..7c810029fa 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -82,6 +82,13 @@ class EmailPusher(object): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) yield self._process() + @defer.inlineCallbacks + def on_new_receipts(self, min_stream_id, max_stream_id): + # We could wake up and cancel the timer but there tend to be quite a + # lot of read receipts so it's probably less work to just let the + # timer fire + return defer.succeed(None) + @defer.inlineCallbacks def on_timer(self): self.timed_call = None diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 0f20d43f75..e68d701ffd 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -14,18 +14,23 @@ # limitations under the License. from twisted.internet import defer - from twisted.mail.smtp import sendmail + import email.utils import email.mime.multipart from email.mime.text import MIMEText +from synapse.util.async import concurrently_execute +from synapse.util.room_name import calculate_room_name + import jinja2 class Mailer(object): def __init__(self, hs): self.hs = hs + self.store = self.hs.get_datastore() + self.state_handler = self.hs.get_state_handler() loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) env = jinja2.Environment(loader=loader) self.notif_template = env.get_template(self.hs.config.email_notif_template_html) @@ -38,9 +43,41 @@ class Mailer(object): if raw_to == '': raise RuntimeError("Invalid 'to' address") - plainText = self.notif_template.render() + rooms_in_order = deduped_ordered_list( + [pa['room_id'] for pa in push_actions] + ) + + notifs_by_room = {} + for pa in push_actions: + notifs_by_room.setdefault(pa["room_id"], []).append(pa) + + # collect the current state for all the rooms in which we have + # notifications + state_by_room = {} + + @defer.inlineCallbacks + def _fetch_room_state(room_id): + room_state = yield self.state_handler.get_current_state(room_id) + state_by_room[room_id] = room_state + + # Run at most 3 of these at once: sync does 10 at a time but email + # notifs are much realtime than sync so we can afford to wait a bit. + yield concurrently_execute(_fetch_room_state, rooms_in_order, 3) - text_part = MIMEText(plainText, "plain") + rooms = [ + self.get_room_vars( + r, user_id, notifs_by_room[r], state_by_room[r] + ) for r in rooms_in_order + ] + + template_vars = { + "unsubscribe_link": self.make_unsubscribe_link(), + "rooms": rooms, + } + + plainText = self.notif_template.render(**template_vars) + + text_part = MIMEText(plainText, "html") text_part['Subject'] = "New Matrix Notifications" text_part['From'] = self.hs.config.email_notif_from text_part['To'] = email_address @@ -50,3 +87,21 @@ class Mailer(object): raw_from, raw_to, text_part.as_string(), port=self.hs.config.email_smtp_port ) + + def get_room_vars(self, room_id, user_id, notifs, room_state): + room_vars = {} + room_vars['title'] = calculate_room_name(room_state, user_id) + return room_vars + + def make_unsubscribe_link(self): + return "https://vector.im/#/settings" # XXX: matrix.to + + +def deduped_ordered_list(l): + seen = set() + ret = [] + for item in l: + if item not in seen: + seen.add(item) + ret.append(item) + return ret \ No newline at end of file diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index a065c78b4d..16524dbdcd 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -46,7 +46,7 @@ CONDITIONAL_REQUIREMENTS = { "netaddr>=0.7.18": ["netaddr"], }, "email.enable_notifs": { - "Jinja2": ["Jinja2"], + "Jinja2>=2.8": ["Jinja2>=2.8"], }, } diff --git a/synapse/util/room_name.py b/synapse/util/room_name.py new file mode 100644 index 0000000000..7e49b92bb4 --- /dev/null +++ b/synapse/util/room_name.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +# intentionally looser than what aliases we allow to be registered since +# other HSes may allow aliases that we would not +ALIAS_RE = re.compile(r"^#.*:.+$") + +ALL_ALONE = "Empty Room" + + +def calculate_room_name(room_state, user_id): + # does it have a name? + if ("m.room.name", "") in room_state: + m_room_name = room_state[("m.room.name", "")] + if m_room_name.content and m_room_name.content["name"]: + return m_room_name.content["name"] + + # does it have a caononical alias? + if ("m.room.canonical_alias", "") in room_state: + canon_alias = room_state[("m.room.canonical_alias", "")] + if ( + canon_alias.content and canon_alias.content["alias"] and + looks_like_an_alias(canon_alias.content["alias"]) + ): + return canon_alias.content["alias"] + + # at this point we're going to need to search the state by all state keys + # for an event type, so rearrange the data structure + room_state_bytype = state_as_two_level_dict(room_state) + + # right then, any aliases at all? + if "m.room.aliases" in room_state_bytype: + m_room_aliases = room_state_bytype["m.room.aliases"] + if len(m_room_aliases.values()) > 0: + first_alias_event = m_room_aliases.values()[0] + if first_alias_event.content and first_alias_event.content["aliases"]: + the_aliases = first_alias_event.content["aliases"] + if len(the_aliases) > 0 and looks_like_an_alias(the_aliases[0]): + return the_aliases[0] + + my_member_event = None + if ("m.room.member", user_id) in room_state: + my_member_event = room_state[("m.room.member", user_id)] + + if ( + my_member_event is not None and + my_member_event.content['membership'] == "invite" + ): + if ("m.room.member", my_member_event.sender) in room_state: + inviter_member_event = room_state[("m.room.member", my_member_event.sender)] + return "Invite from %s" % (name_from_member_event(inviter_member_event),) + else: + return "Room Invite" + + # we're going to have to generate a name based on who's in the room, + # so find out who is in the room that isn't the user. + if "m.room.member" in room_state_bytype: + all_members = [ + ev for ev in room_state_bytype["m.room.member"].values() + if ev.membership == "join" or ev.membership == "invite" + ] + other_members = [m for m in all_members if m.sender != user_id] + else: + other_members = [] + all_members = [] + + if len(other_members) == 0: + if len(all_members) == 1: + # self-chat, peeked room with 1 participant, + # or inbound invite, or outbound 3PID invite. + if all_members[0].sender == user_id: + if "m.room.third_party_invite" in room_state_bytype: + third_party_invites = room_state_bytype["m.room.third_party_invite"] + if len(third_party_invites) > 0: + # technically third party invite events are not member + # events, but they are close enough + return "Inviting %s" ( + descriptor_from_member_events(third_party_invites) + ) + else: + return ALL_ALONE + else: + return name_from_member_event(all_members[0]) + else: + return ALL_ALONE + else: + return descriptor_from_member_events(other_members) + + +def state_as_two_level_dict(state): + ret = {} + for k, v in state.items(): + ret.setdefault(k[0], {})[k[1]] = v + return ret + + +def looks_like_an_alias(string): + return ALIAS_RE.match(string) is not None + + +def descriptor_from_member_events(member_events): + # else if (otherMembers.length === 1) { + # return otherMembers[0].name; + # } + # else if (otherMembers.length === 2) { + # return ( + # otherMembers[0].name + " and " + otherMembers[1].name + # ); + # } + # else { + # return ( + # otherMembers[0].name + " and " + (otherMembers.length - 1) + " others" + # ); + # } + if len(member_events) == 0: + return "nobody" + elif len(member_events) == 1: + return name_from_member_event(member_events[0]) + return "all the people, so many people. They all go hand in hand, hand in hand in their park life." + + +def name_from_member_event(member_event): + if ( + member_event.content and "displayname" in member_event.content and + member_event.content["displayname"] + ): + return member_event.content["displayname"] + return member_event.sender \ No newline at end of file -- cgit 1.5.1 From 9e7aa98c229af4f657756f9089654d2eab7a96ce Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 15:40:51 +0100 Subject: Split out create_resource_tree to a separate file --- synapse/app/homeserver.py | 89 +++--------------------------------- synapse/util/httpresourcetree.py | 98 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 82 deletions(-) create mode 100644 synapse/util/httpresourcetree.py (limited to 'synapse/util') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d2085a9405..fdadffeba7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -66,6 +66,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse.util.httpresourcetree import create_resource_tree from synapse import events @@ -174,7 +175,12 @@ class SynapseHomeServer(HomeServer): if name == "replication": resources[REPLICATION_PREFIX] = ReplicationResource(self) - root_resource = create_resource_tree(resources) + if WEB_CLIENT_PREFIX in resources: + root_resource = RootRedirect(WEB_CLIENT_PREFIX) + else: + root_resource = Resource() + + root_resource = create_resource_tree(resources, root_resource) if tls: reactor.listenSSL( port, @@ -494,87 +500,6 @@ class SynapseSite(Site): pass -def create_resource_tree(desired_tree, redirect_root_to_web_client=True): - """Create the resource tree for this Home Server. - - This in unduly complicated because Twisted does not support putting - child resources more than 1 level deep at a time. - - Args: - web_client (bool): True to enable the web client. - redirect_root_to_web_client (bool): True to redirect '/' to the - location of the web client. This does nothing if web_client is not - True. - """ - if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree: - root_resource = RootRedirect(WEB_CLIENT_PREFIX) - else: - root_resource = Resource() - - # ideally we'd just use getChild and putChild but getChild doesn't work - # unless you give it a Request object IN ADDITION to the name :/ So - # instead, we'll store a copy of this mapping so we can actually add - # extra resources to existing nodes. See self._resource_id for the key. - resource_mappings = {} - for full_path, res in desired_tree.items(): - logger.info("Attaching %s to path %s", res, full_path) - last_resource = root_resource - for path_seg in full_path.split('/')[1:-1]: - if path_seg not in last_resource.listNames(): - # resource doesn't exist, so make a "dummy resource" - child_resource = Resource() - last_resource.putChild(path_seg, child_resource) - res_id = _resource_id(last_resource, path_seg) - resource_mappings[res_id] = child_resource - last_resource = child_resource - else: - # we have an existing Resource, use that instead. - res_id = _resource_id(last_resource, path_seg) - last_resource = resource_mappings[res_id] - - # =========================== - # now attach the actual desired resource - last_path_seg = full_path.split('/')[-1] - - # if there is already a resource here, thieve its children and - # replace it - res_id = _resource_id(last_resource, last_path_seg) - if res_id in resource_mappings: - # there is a dummy resource at this path already, which needs - # to be replaced with the desired resource. - existing_dummy_resource = resource_mappings[res_id] - for child_name in existing_dummy_resource.listNames(): - child_res_id = _resource_id( - existing_dummy_resource, child_name - ) - child_resource = resource_mappings[child_res_id] - # steal the children - res.putChild(child_name, child_resource) - - # finally, insert the desired resource in the right place - last_resource.putChild(last_path_seg, res) - res_id = _resource_id(last_resource, last_path_seg) - resource_mappings[res_id] = res - - return root_resource - - -def _resource_id(resource, path_seg): - """Construct an arbitrary resource ID so you can retrieve the mapping - later. - - If you want to represent resource A putChild resource B with path C, - the mapping should looks like _resource_id(A,C) = B. - - Args: - resource (Resource): The *parent* Resourceb - path_seg (str): The name of the child Resource to be attached. - Returns: - str: A unique string which can be a key to the child Resource. - """ - return "%s-%s" % (resource, path_seg) - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py new file mode 100644 index 0000000000..45be47159a --- /dev/null +++ b/synapse/util/httpresourcetree.py @@ -0,0 +1,98 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.web.resource import Resource + +import logging + +logger = logging.getLogger(__name__) + + +def create_resource_tree(desired_tree, root_resource): + """Create the resource tree for this Home Server. + + This in unduly complicated because Twisted does not support putting + child resources more than 1 level deep at a time. + + Args: + web_client (bool): True to enable the web client. + root_resource (twisted.web.resource.Resource): The root + resource to add the tree to. + Returns: + twisted.web.resource.Resource: the ``root_resource`` with a tree of + child resources added to it. + """ + + # ideally we'd just use getChild and putChild but getChild doesn't work + # unless you give it a Request object IN ADDITION to the name :/ So + # instead, we'll store a copy of this mapping so we can actually add + # extra resources to existing nodes. See self._resource_id for the key. + resource_mappings = {} + for full_path, res in desired_tree.items(): + logger.info("Attaching %s to path %s", res, full_path) + last_resource = root_resource + for path_seg in full_path.split('/')[1:-1]: + if path_seg not in last_resource.listNames(): + # resource doesn't exist, so make a "dummy resource" + child_resource = Resource() + last_resource.putChild(path_seg, child_resource) + res_id = _resource_id(last_resource, path_seg) + resource_mappings[res_id] = child_resource + last_resource = child_resource + else: + # we have an existing Resource, use that instead. + res_id = _resource_id(last_resource, path_seg) + last_resource = resource_mappings[res_id] + + # =========================== + # now attach the actual desired resource + last_path_seg = full_path.split('/')[-1] + + # if there is already a resource here, thieve its children and + # replace it + res_id = _resource_id(last_resource, last_path_seg) + if res_id in resource_mappings: + # there is a dummy resource at this path already, which needs + # to be replaced with the desired resource. + existing_dummy_resource = resource_mappings[res_id] + for child_name in existing_dummy_resource.listNames(): + child_res_id = _resource_id( + existing_dummy_resource, child_name + ) + child_resource = resource_mappings[child_res_id] + # steal the children + res.putChild(child_name, child_resource) + + # finally, insert the desired resource in the right place + last_resource.putChild(last_path_seg, res) + res_id = _resource_id(last_resource, last_path_seg) + resource_mappings[res_id] = res + + return root_resource + + +def _resource_id(resource, path_seg): + """Construct an arbitrary resource ID so you can retrieve the mapping + later. + + If you want to represent resource A putChild resource B with path C, + the mapping should looks like _resource_id(A,C) = B. + + Args: + resource (Resource): The *parent* Resourceb + path_seg (str): The name of the child Resource to be attached. + Returns: + str: A unique string which can be a key to the child Resource. + """ + return "%s-%s" % (resource, path_seg) -- cgit 1.5.1 From 5905f36f0557f2b496e5b2759db295a3b2807574 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 17:08:02 +0100 Subject: Split out setting up the manhole to a separate file --- synapse/app/homeserver.py | 33 +++++++------------------------ synapse/util/manhole.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 26 deletions(-) create mode 100644 synapse/util/manhole.py (limited to 'synapse/util') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1fa93be93e..b033073ef7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,13 +32,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d from synapse.server import HomeServer - -from twisted.conch.manhole import ColoredManhole -from twisted.conch.insults import insults -from twisted.conch import manhole_ssh -from twisted.cred import checkers, portal - - from twisted.internet import reactor, task, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper @@ -64,6 +57,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.manhole import listen_manhole from synapse.http.site import SynapseSite @@ -209,25 +203,12 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( - matrix="rabbithole" - ) - - rlm = manhole_ssh.TerminalRealm() - rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( - ColoredManhole, - { - "__name__": "__console__", - "hs": self, - } - ) - - f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - - reactor.listenTCP( - listener["port"], - f, - interface=listener.get("bind_address", '127.0.0.1') + listen_manhole( + bind_address=listener.get("bind_address", '127.0.0.1'), + bind_port=listener["port"], + username="matrix", + password="rabbithole", + globals={"hs": self}, ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py new file mode 100644 index 0000000000..e12583209f --- /dev/null +++ b/synapse/util/manhole.py @@ -0,0 +1,50 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.conch.manhole import ColoredManhole +from twisted.conch.insults import insults +from twisted.conch import manhole_ssh +from twisted.cred import checkers, portal + + +from twisted.internet import reactor + + +def listen_manhole(bind_address, bind_port, username, password, globals): + """Starts a ssh listener with password authentication using + the given username and password. Clients connecting to the ssh + listener will find themselves in a colored python shell with + the supplied globals. + + Args: + bind_address(str): IP address to listen on. + bind_port(int): TCP port to listen on. + username(str): The username ssh clients should auth with. + password(str): The password ssh clients should auth with. + globals(dict): The variables to expose in the shell. + """ + + checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( + **{username: password} + ) + + rlm = manhole_ssh.TerminalRealm() + rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( + ColoredManhole, + dict(globals, __name__="__console__") + ) + + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + + reactor.listenTCP(bind_port, factory, interface=bind_address) -- cgit 1.5.1 From e8701e64b9ce52a377dba7091017e5d2e116ecdf Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 22 Apr 2016 17:28:42 +0100 Subject: Implement group-of-people names --- synapse/util/room_name.py | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/room_name.py b/synapse/util/room_name.py index 7e49b92bb4..30b7291369 100644 --- a/synapse/util/room_name.py +++ b/synapse/util/room_name.py @@ -71,9 +71,9 @@ def calculate_room_name(room_state, user_id): if "m.room.member" in room_state_bytype: all_members = [ ev for ev in room_state_bytype["m.room.member"].values() - if ev.membership == "join" or ev.membership == "invite" + if ev.content['membership'] == "join" or ev.content['membership'] == "invite" ] - other_members = [m for m in all_members if m.sender != user_id] + other_members = [m for m in all_members if m.state_key != user_id] else: other_members = [] all_members = [] @@ -113,30 +113,27 @@ def looks_like_an_alias(string): def descriptor_from_member_events(member_events): - # else if (otherMembers.length === 1) { - # return otherMembers[0].name; - # } - # else if (otherMembers.length === 2) { - # return ( - # otherMembers[0].name + " and " + otherMembers[1].name - # ); - # } - # else { - # return ( - # otherMembers[0].name + " and " + (otherMembers.length - 1) + " others" - # ); - # } if len(member_events) == 0: return "nobody" elif len(member_events) == 1: return name_from_member_event(member_events[0]) - return "all the people, so many people. They all go hand in hand, hand in hand in their park life." + elif len(member_events) == 2: + return "%s and %s" % ( + name_from_member_event(member_events[0]), + name_from_member_event(member_events[1]), + ) + else: + return "%s and %d others" % ( + name_from_member_event(member_events[0]), + len(member_events) - 1, + ) def name_from_member_event(member_event): + # XXX: Need to look in invite state for invite display names. if ( member_event.content and "displayname" in member_event.content and member_event.content["displayname"] ): return member_event.content["displayname"] - return member_event.sender \ No newline at end of file + return member_event.state_key \ No newline at end of file -- cgit 1.5.1 From c5b3c6e1010ce55eda27b35f008819867c21bc51 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 22 Apr 2016 18:33:36 +0100 Subject: Sort member events So names of people in a room are given in order --- synapse/util/room_name.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/room_name.py b/synapse/util/room_name.py index 30b7291369..30ef77b9f8 100644 --- a/synapse/util/room_name.py +++ b/synapse/util/room_name.py @@ -73,6 +73,10 @@ def calculate_room_name(room_state, user_id): ev for ev in room_state_bytype["m.room.member"].values() if ev.content['membership'] == "join" or ev.content['membership'] == "invite" ] + # Sort the member events oldest-first so the we name people in the + # order the joined (it should at least be deterministic rather than + # dictionary iteration order) + all_members.sort(key=lambda e: e.origin_server_ts) other_members = [m for m in all_members if m.state_key != user_id] else: other_members = [] -- cgit 1.5.1 From 05e49ffbdf83ec3f910e1f8dbbe23aa4da297986 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 22 Apr 2016 18:44:17 +0100 Subject: No we don't: it's just the display name --- synapse/util/room_name.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/room_name.py b/synapse/util/room_name.py index 30ef77b9f8..d85ccaea55 100644 --- a/synapse/util/room_name.py +++ b/synapse/util/room_name.py @@ -134,7 +134,6 @@ def descriptor_from_member_events(member_events): def name_from_member_event(member_event): - # XXX: Need to look in invite state for invite display names. if ( member_event.content and "displayname" in member_event.content and member_event.content["displayname"] -- cgit 1.5.1 From 290f125a13c3c9f5cb772909248d4e482dcfb871 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 25 Apr 2016 14:42:59 +0100 Subject: Typo --- synapse/util/room_name.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/room_name.py b/synapse/util/room_name.py index d85ccaea55..f55ef293b6 100644 --- a/synapse/util/room_name.py +++ b/synapse/util/room_name.py @@ -29,7 +29,7 @@ def calculate_room_name(room_state, user_id): if m_room_name.content and m_room_name.content["name"]: return m_room_name.content["name"] - # does it have a caononical alias? + # does it have a canonical alias? if ("m.room.canonical_alias", "") in room_state: canon_alias = room_state[("m.room.canonical_alias", "")] if ( -- cgit 1.5.1 From f22f46f4f902e071fe322854a228f8fe53677cdc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 14:59:21 +0100 Subject: Move the listenTCP call outside the manhole function --- synapse/app/homeserver.py | 16 +++++++++------- synapse/util/manhole.py | 14 +++++--------- 2 files changed, 14 insertions(+), 16 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b033073ef7..df675c0ed4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -57,7 +57,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import listen_manhole +from synapse.util.manhole import manhole from synapse.http.site import SynapseSite @@ -203,12 +203,14 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - listen_manhole( - bind_address=listener.get("bind_address", '127.0.0.1'), - bind_port=listener["port"], - username="matrix", - password="rabbithole", - globals={"hs": self}, + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index e12583209f..9b106cdf47 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -18,21 +18,19 @@ from twisted.conch import manhole_ssh from twisted.cred import checkers, portal -from twisted.internet import reactor - - -def listen_manhole(bind_address, bind_port, username, password, globals): +def manhole(username, password, globals): """Starts a ssh listener with password authentication using the given username and password. Clients connecting to the ssh listener will find themselves in a colored python shell with the supplied globals. Args: - bind_address(str): IP address to listen on. - bind_port(int): TCP port to listen on. username(str): The username ssh clients should auth with. password(str): The password ssh clients should auth with. globals(dict): The variables to expose in the shell. + + Returns: + twisted.internet.protocol.Factory: A factory to pass to ``listenTCP`` """ checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( @@ -45,6 +43,4 @@ def listen_manhole(bind_address, bind_port, username, password, globals): dict(globals, __name__="__console__") ) - factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - - reactor.listenTCP(bind_port, factory, interface=bind_address) + return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) -- cgit 1.5.1 From 72e2fafa207c28581c62bcce2f1a6ede410fee5a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 17:34:25 +0100 Subject: Add a metrics listener and a ssh listener to the pusher --- synapse/app/pusher.py | 69 +++++++++++++++++++++++++++++++++++++++++++++++-- synapse/util/manhole.py | 26 ++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8922573db7..abb9f1fe8e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -17,19 +17,25 @@ import synapse from synapse.server import HomeServer -from synapse.util.versionstring import get_version_string from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep -from synapse.util.logcontext import (LoggingContext, preserve_fn) +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer +from twisted.web.resource import Resource import sys import logging @@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig): ) self.user_agent_suffix = None self.start_pushers = True + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") def default_config(self, **kwargs): return """\ ## Slave ## + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + listeners: [] + # Uncomment to enable a ssh manhole listener on the pusher. + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Uncomment to enable a metric listener on the pusher. + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"], + # compress: False + report_stats: False """ @@ -100,6 +122,46 @@ class PusherServer(HomeServer): }] }) + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse pusher now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks def replicate(self): http_client = self.get_simple_http_client() @@ -191,6 +253,9 @@ def setup(config_options): ) ps.setup() + ps.start_listening() + + change_resource_limit(ps.config.soft_file_limit) def start(): ps.replicate() diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 9b106cdf47..97e0f00b67 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -16,6 +16,26 @@ from twisted.conch.manhole import ColoredManhole from twisted.conch.insults import insults from twisted.conch import manhole_ssh from twisted.cred import checkers, portal +from twisted.conch.ssh.keys import Key + +PUBLIC_KEY = ( + "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az" + "64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS" + "kbh/C+BR3utDS555mV" +) + +PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW +4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw +vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb +Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1 +xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8 +PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2 +gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu +DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML +pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP +EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg== +-----END RSA PRIVATE KEY-----""" def manhole(username, password, globals): @@ -43,4 +63,8 @@ def manhole(username, password, globals): dict(globals, __name__="__console__") ) - return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY) + factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY) + + return factory -- cgit 1.5.1 From 7b4715bad704231b51c6d0462cfd19ed32df5e0b Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 25 Apr 2016 18:27:04 +0100 Subject: More variable calculation for email notifs Include name of the person we're sending to and add summary text at the top giving an overview of what's happened. --- res/templates/notif.html | 3 +- synapse/push/mailer.py | 57 ++++++++++++++- synapse/util/presentable_names.py | 145 ++++++++++++++++++++++++++++++++++++++ synapse/util/room_name.py | 142 ------------------------------------- 4 files changed, 202 insertions(+), 145 deletions(-) create mode 100644 synapse/util/presentable_names.py delete mode 100644 synapse/util/room_name.py (limited to 'synapse/util') diff --git a/res/templates/notif.html b/res/templates/notif.html index 648ff034b3..aee52ec8c9 100644 --- a/res/templates/notif.html +++ b/res/templates/notif.html @@ -1,7 +1,8 @@ -

{{ summaryText }}

+
Hi {{ user_display_name }},
+
{{ summary_text }}
{% for room in rooms %} {% include 'room.html' with context %} diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 9212d36b84..9e2297a03b 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -21,11 +21,19 @@ import email.mime.multipart from email.mime.text import MIMEText from synapse.util.async import concurrently_execute -from synapse.util.room_name import calculate_room_name +from synapse.util.presentable_names import calculate_room_name, name_from_member_event +from synapse.types import UserID +from synapse.api.errors import StoreError import jinja2 +MESSAGE_FROM_PERSON_IN_ROOM = "You have a message from %s in the %s room" +MESSAGE_FROM_PERSON = "You have a message from %s" +MESSAGES_IN_ROOM = "There are some messages for you in the %s room" +MESSAGES_IN_ROOMS = "Here are some messages you may have missed" + + class Mailer(object): def __init__(self, hs): self.hs = hs @@ -55,6 +63,13 @@ class Mailer(object): # notifications state_by_room = {} + try: + user_display_name = yield self.store.get_profile_displayname( + UserID.from_string(user_id).localpart + ) + except StoreError: + user_display_name = user_id + @defer.inlineCallbacks def _fetch_room_state(room_id): room_state = yield self.state_handler.get_current_state(room_id) @@ -70,8 +85,14 @@ class Mailer(object): ) for r in rooms_in_order ] + summary_text = yield self.make_summary_text( + notifs_by_room, state_by_room, user_id + ) + template_vars = { + "user_display_name": user_display_name, "unsubscribe_link": self.make_unsubscribe_link(), + "summary_text": summary_text, "rooms": rooms, } @@ -93,6 +114,38 @@ class Mailer(object): room_vars['title'] = calculate_room_name(room_state, user_id) return room_vars + @defer.inlineCallbacks + def make_summary_text(self, notifs_by_room, state_by_room, user_id): + if len(notifs_by_room) == 1: + room_id = notifs_by_room.keys()[0] + sender_name = None + if len(notifs_by_room[room_id]) == 1: + # If the room has some kind of name, use it, but we don't + # want the generated-from-names one here otherwise we'll + # end up with, "new message from Bob in the Bob room" + room_name = calculate_room_name( + state_by_room[room_id], user_id, fallback_to_members=False + ) + event = yield self.store.get_event( + notifs_by_room[room_id][0]["event_id"] + ) + if ("m.room.member", event.sender) in state_by_room[room_id]: + state_event = state_by_room[room_id][("m.room.member", event.sender)] + sender_name = name_from_member_event(state_event) + if sender_name is not None and room_name is not None: + defer.returnValue( + MESSAGE_FROM_PERSON_IN_ROOM % (sender_name, room_name) + ) + elif sender_name is not None: + defer.returnValue(MESSAGE_FROM_PERSON % (sender_name,)) + else: + room_name = calculate_room_name(state_by_room[room_id], user_id) + defer.returnValue(MESSAGES_IN_ROOM % (room_name,)) + else: + defer.returnValue(MESSAGES_IN_ROOMS) + + defer.returnValue("Some thing have occurred in some rooms") + def make_unsubscribe_link(self): return "https://vector.im/#/settings" # XXX: matrix.to @@ -104,4 +157,4 @@ def deduped_ordered_list(l): if item not in seen: seen.add(item) ret.append(item) - return ret \ No newline at end of file + return ret diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py new file mode 100644 index 0000000000..2ae01e453d --- /dev/null +++ b/synapse/util/presentable_names.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +# intentionally looser than what aliases we allow to be registered since +# other HSes may allow aliases that we would not +ALIAS_RE = re.compile(r"^#.*:.+$") + +ALL_ALONE = "Empty Room" + + +def calculate_room_name(room_state, user_id, fallback_to_members=True): + # does it have a name? + if ("m.room.name", "") in room_state: + m_room_name = room_state[("m.room.name", "")] + if m_room_name.content and m_room_name.content["name"]: + return m_room_name.content["name"] + + # does it have a canonical alias? + if ("m.room.canonical_alias", "") in room_state: + canon_alias = room_state[("m.room.canonical_alias", "")] + if ( + canon_alias.content and canon_alias.content["alias"] and + _looks_like_an_alias(canon_alias.content["alias"]) + ): + return canon_alias.content["alias"] + + # at this point we're going to need to search the state by all state keys + # for an event type, so rearrange the data structure + room_state_bytype = _state_as_two_level_dict(room_state) + + # right then, any aliases at all? + if "m.room.aliases" in room_state_bytype: + m_room_aliases = room_state_bytype["m.room.aliases"] + if len(m_room_aliases.values()) > 0: + first_alias_event = m_room_aliases.values()[0] + if first_alias_event.content and first_alias_event.content["aliases"]: + the_aliases = first_alias_event.content["aliases"] + if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]): + return the_aliases[0] + + my_member_event = None + if ("m.room.member", user_id) in room_state: + my_member_event = room_state[("m.room.member", user_id)] + + if ( + my_member_event is not None and + my_member_event.content['membership'] == "invite" + ): + if ("m.room.member", my_member_event.sender) in room_state: + inviter_member_event = room_state[("m.room.member", my_member_event.sender)] + return "Invite from %s" % (name_from_member_event(inviter_member_event),) + else: + return "Room Invite" + + if not fallback_to_members: + return None + + # we're going to have to generate a name based on who's in the room, + # so find out who is in the room that isn't the user. + if "m.room.member" in room_state_bytype: + all_members = [ + ev for ev in room_state_bytype["m.room.member"].values() + if ev.content['membership'] == "join" or ev.content['membership'] == "invite" + ] + # Sort the member events oldest-first so the we name people in the + # order the joined (it should at least be deterministic rather than + # dictionary iteration order) + all_members.sort(key=lambda e: e.origin_server_ts) + other_members = [m for m in all_members if m.state_key != user_id] + else: + other_members = [] + all_members = [] + + if len(other_members) == 0: + if len(all_members) == 1: + # self-chat, peeked room with 1 participant, + # or inbound invite, or outbound 3PID invite. + if all_members[0].sender == user_id: + if "m.room.third_party_invite" in room_state_bytype: + third_party_invites = room_state_bytype["m.room.third_party_invite"] + if len(third_party_invites) > 0: + # technically third party invite events are not member + # events, but they are close enough + return "Inviting %s" ( + descriptor_from_member_events(third_party_invites) + ) + else: + return ALL_ALONE + else: + return name_from_member_event(all_members[0]) + else: + return ALL_ALONE + else: + return descriptor_from_member_events(other_members) + + +def descriptor_from_member_events(member_events): + if len(member_events) == 0: + return "nobody" + elif len(member_events) == 1: + return name_from_member_event(member_events[0]) + elif len(member_events) == 2: + return "%s and %s" % ( + name_from_member_event(member_events[0]), + name_from_member_event(member_events[1]), + ) + else: + return "%s and %d others" % ( + name_from_member_event(member_events[0]), + len(member_events) - 1, + ) + + +def name_from_member_event(member_event): + if ( + member_event.content and "displayname" in member_event.content and + member_event.content["displayname"] + ): + return member_event.content["displayname"] + return member_event.state_key + + +def _state_as_two_level_dict(state): + ret = {} + for k, v in state.items(): + ret.setdefault(k[0], {})[k[1]] = v + return ret + + +def _looks_like_an_alias(string): + return ALIAS_RE.match(string) is not None \ No newline at end of file diff --git a/synapse/util/room_name.py b/synapse/util/room_name.py deleted file mode 100644 index f55ef293b6..0000000000 --- a/synapse/util/room_name.py +++ /dev/null @@ -1,142 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import re - -# intentionally looser than what aliases we allow to be registered since -# other HSes may allow aliases that we would not -ALIAS_RE = re.compile(r"^#.*:.+$") - -ALL_ALONE = "Empty Room" - - -def calculate_room_name(room_state, user_id): - # does it have a name? - if ("m.room.name", "") in room_state: - m_room_name = room_state[("m.room.name", "")] - if m_room_name.content and m_room_name.content["name"]: - return m_room_name.content["name"] - - # does it have a canonical alias? - if ("m.room.canonical_alias", "") in room_state: - canon_alias = room_state[("m.room.canonical_alias", "")] - if ( - canon_alias.content and canon_alias.content["alias"] and - looks_like_an_alias(canon_alias.content["alias"]) - ): - return canon_alias.content["alias"] - - # at this point we're going to need to search the state by all state keys - # for an event type, so rearrange the data structure - room_state_bytype = state_as_two_level_dict(room_state) - - # right then, any aliases at all? - if "m.room.aliases" in room_state_bytype: - m_room_aliases = room_state_bytype["m.room.aliases"] - if len(m_room_aliases.values()) > 0: - first_alias_event = m_room_aliases.values()[0] - if first_alias_event.content and first_alias_event.content["aliases"]: - the_aliases = first_alias_event.content["aliases"] - if len(the_aliases) > 0 and looks_like_an_alias(the_aliases[0]): - return the_aliases[0] - - my_member_event = None - if ("m.room.member", user_id) in room_state: - my_member_event = room_state[("m.room.member", user_id)] - - if ( - my_member_event is not None and - my_member_event.content['membership'] == "invite" - ): - if ("m.room.member", my_member_event.sender) in room_state: - inviter_member_event = room_state[("m.room.member", my_member_event.sender)] - return "Invite from %s" % (name_from_member_event(inviter_member_event),) - else: - return "Room Invite" - - # we're going to have to generate a name based on who's in the room, - # so find out who is in the room that isn't the user. - if "m.room.member" in room_state_bytype: - all_members = [ - ev for ev in room_state_bytype["m.room.member"].values() - if ev.content['membership'] == "join" or ev.content['membership'] == "invite" - ] - # Sort the member events oldest-first so the we name people in the - # order the joined (it should at least be deterministic rather than - # dictionary iteration order) - all_members.sort(key=lambda e: e.origin_server_ts) - other_members = [m for m in all_members if m.state_key != user_id] - else: - other_members = [] - all_members = [] - - if len(other_members) == 0: - if len(all_members) == 1: - # self-chat, peeked room with 1 participant, - # or inbound invite, or outbound 3PID invite. - if all_members[0].sender == user_id: - if "m.room.third_party_invite" in room_state_bytype: - third_party_invites = room_state_bytype["m.room.third_party_invite"] - if len(third_party_invites) > 0: - # technically third party invite events are not member - # events, but they are close enough - return "Inviting %s" ( - descriptor_from_member_events(third_party_invites) - ) - else: - return ALL_ALONE - else: - return name_from_member_event(all_members[0]) - else: - return ALL_ALONE - else: - return descriptor_from_member_events(other_members) - - -def state_as_two_level_dict(state): - ret = {} - for k, v in state.items(): - ret.setdefault(k[0], {})[k[1]] = v - return ret - - -def looks_like_an_alias(string): - return ALIAS_RE.match(string) is not None - - -def descriptor_from_member_events(member_events): - if len(member_events) == 0: - return "nobody" - elif len(member_events) == 1: - return name_from_member_event(member_events[0]) - elif len(member_events) == 2: - return "%s and %s" % ( - name_from_member_event(member_events[0]), - name_from_member_event(member_events[1]), - ) - else: - return "%s and %d others" % ( - name_from_member_event(member_events[0]), - len(member_events) - 1, - ) - - -def name_from_member_event(member_event): - if ( - member_event.content and "displayname" in member_event.content and - member_event.content["displayname"] - ): - return member_event.content["displayname"] - return member_event.state_key \ No newline at end of file -- cgit 1.5.1 From ebbabc4986371c83d1d2659d10b27caad9b47951 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Apr 2016 11:49:36 +0100 Subject: Handle room invites in email notifs --- res/templates/room.html | 14 +++++++++----- synapse/push/mailer.py | 35 ++++++++++++++++++++++++++++++----- synapse/util/presentable_names.py | 6 +++--- 3 files changed, 42 insertions(+), 13 deletions(-) (limited to 'synapse/util') diff --git a/res/templates/room.html b/res/templates/room.html index f369575b98..6c68ee1fdc 100644 --- a/res/templates/room.html +++ b/res/templates/room.html @@ -13,9 +13,13 @@ {% endif %} {% endif %}
-
- {% for notif in room.notifs %} - {% include 'notif.html' with context %} - {% endfor %} -
+ {% if room.invite %} + Join the conversation. + {% else %} +
+ {% for notif in room.notifs %} + {% include 'notif.html' with context %} + {% endfor %} +
+ {% endif %} diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 7ef64f8f6d..d2cf24765a 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -39,6 +39,8 @@ MESSAGE_FROM_PERSON = "You have a message from %s" MESSAGES_FROM_PERSON = "You have messages from %s" MESSAGES_IN_ROOM = "There are some messages for you in the %s room" MESSAGES_IN_ROOMS = "Here are some messages you may have missed" +INVITE_FROM_PERSON_TO_ROOM = "%s has invited you to join the %s room" +INVITE_FROM_PERSON = "%s has invited you to chat" CONTEXT_BEFORE = 1 @@ -148,17 +150,24 @@ class Mailer(object): @defer.inlineCallbacks def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state): + my_member_event = room_state[("m.room.member", user_id)] + is_invite = my_member_event.content["membership"] == "invite" + room_vars = { "title": calculate_room_name(room_state, user_id), "hash": string_ordinal_total(room_id), # See sender avatar hash "notifs": [], + "invite": is_invite } - for n in notifs: - vars = yield self.get_notif_vars( - n, user_id, notif_events[n['event_id']], room_state - ) - room_vars['notifs'].append(vars) + if is_invite: + room_vars["link"] = self.make_room_link(room_id) + else: + for n in notifs: + vars = yield self.get_notif_vars( + n, user_id, notif_events[n['event_id']], room_state + ) + room_vars['notifs'].append(vars) defer.returnValue(room_vars) @@ -235,6 +244,18 @@ class Mailer(object): state_by_room[room_id], user_id, fallback_to_members=False ) + my_member_event = state_by_room[room_id][("m.room.member", user_id)] + if my_member_event.content["membership"] == "invite": + inviter_member_event = state_by_room[room_id][ + ("m.room.member", my_member_event.sender) + ] + inviter_name = name_from_member_event(inviter_member_event) + + if room_name is None: + return INVITE_FROM_PERSON % (inviter_name,) + else: + return INVITE_FROM_PERSON_TO_ROOM % (inviter_name, room_name) + sender_name = None if len(notifs_by_room[room_id]) == 1: # There is just the one notification, so give some detail @@ -242,6 +263,7 @@ class Mailer(object): if ("m.room.member", event.sender) in state_by_room[room_id]: state_event = state_by_room[room_id][("m.room.member", event.sender)] sender_name = name_from_member_event(state_event) + if sender_name is not None and room_name is not None: return MESSAGE_FROM_PERSON_IN_ROOM % (sender_name, room_name) elif sender_name is not None: @@ -268,6 +290,9 @@ class Mailer(object): # Stuff's happened in multiple different rooms return MESSAGES_IN_ROOMS + def make_room_link(self, room_id): + return "https://matrix.to/%s" % (room_id,) + def make_notif_link(self, notif): return "https://matrix.to/%s/%s" % ( notif['room_id'], notif['event_id'] diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index 2ae01e453d..f80a7fe58e 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -52,6 +52,9 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True): if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]): return the_aliases[0] + if not fallback_to_members: + return None + my_member_event = None if ("m.room.member", user_id) in room_state: my_member_event = room_state[("m.room.member", user_id)] @@ -66,9 +69,6 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True): else: return "Room Invite" - if not fallback_to_members: - return None - # we're going to have to generate a name based on who's in the room, # so find out who is in the room that isn't the user. if "m.room.member" in room_state_bytype: -- cgit 1.5.1 From ec9cbe847d0d489c602309312fb947daa0e6a129 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 10:07:30 +0100 Subject: pep8 newline --- synapse/util/presentable_names.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index f80a7fe58e..13def91776 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -142,4 +142,4 @@ def _state_as_two_level_dict(state): def _looks_like_an_alias(string): - return ALIAS_RE.match(string) is not None \ No newline at end of file + return ALIAS_RE.match(string) is not None -- cgit 1.5.1 From c7c75e87dc6b619d07d224bd4c6464313cc96840 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 19:47:35 +0100 Subject: Docstring --- synapse/util/presentable_names.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index 13def91776..3efa8a8206 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -23,6 +23,20 @@ ALL_ALONE = "Empty Room" def calculate_room_name(room_state, user_id, fallback_to_members=True): + """ + Works out a user-facing name for the given room as per Matrix + spec recommendations. + Does not yet support internationalisation. + Args: + room_state: Dictionary of the room's state + user_id: The ID of the user to whom the room name is being presented + fallback_to_members: If False, return None instead of generating a name + based on the room's members if the room has no + title or aliases. + + Returns: + (string or None) A human readable name for the room. + """ # does it have a name? if ("m.room.name", "") in room_state: m_room_name = room_state[("m.room.name", "")] -- cgit 1.5.1 From 680f1d9387f48ced9902f4e7d54758ab6a0aa9b0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 May 2016 22:55:11 +0100 Subject: catch thinko in presentable names --- synapse/util/presentable_names.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index 3efa8a8206..a6866f6117 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -14,6 +14,9 @@ # limitations under the License. import re +import logging + +logger = logging.getLogger(__name__) # intentionally looser than what aliases we allow to be registered since # other HSes may allow aliases that we would not @@ -105,13 +108,21 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True): # or inbound invite, or outbound 3PID invite. if all_members[0].sender == user_id: if "m.room.third_party_invite" in room_state_bytype: - third_party_invites = room_state_bytype["m.room.third_party_invite"] + third_party_invites = ( + room_state_bytype["m.room.third_party_invite"].values() + ) + if len(third_party_invites) > 0: # technically third party invite events are not member # events, but they are close enough - return "Inviting %s" ( - descriptor_from_member_events(third_party_invites) - ) + + # FIXME: no they're not - they look nothing like a member; + # they have a great big encrypted thing as their name to + # prevent leaking the 3PID name... + # return "Inviting %s" % ( + # descriptor_from_member_events(third_party_invites) + # ) + return "Inviting email address" else: return ALL_ALONE else: -- cgit 1.5.1 From 597013caa5e22c7134b6ca6e398659ba76047b15 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jun 2016 18:01:22 +0100 Subject: Make cachedList go a bit faster --- synapse/metrics/metric.py | 22 ++++++++++--------- synapse/util/caches/descriptors.py | 44 +++++++++++++++++++++++++++++--------- 2 files changed, 46 insertions(+), 20 deletions(-) (limited to 'synapse/util') diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 368fc24984..6f82b360bc 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -15,6 +15,7 @@ from itertools import chain +from collections import Counter # TODO(paul): I can't believe Python doesn't have one of these @@ -55,30 +56,29 @@ class CounterMetric(BaseMetric): """The simplest kind of metric; one that stores a monotonically-increasing integer that counts events.""" + __slots__ = ("counts") + def __init__(self, *args, **kwargs): super(CounterMetric, self).__init__(*args, **kwargs) - self.counts = {} + self.counts = Counter() # Scalar metrics are never empty if self.is_scalar(): self.counts[()] = 0 def inc_by(self, incr, *values): - if len(values) != self.dimension(): - raise ValueError( - "Expected as many values to inc() as labels (%d)" % (self.dimension()) - ) + # if len(values) != self.dimension(): + # raise ValueError( + # "Expected as many values to inc() as labels (%d)" % (self.dimension()) + # ) # TODO: should assert that the tag values are all strings - if values not in self.counts: - self.counts[values] = incr - else: - self.counts[values] += incr + self.counts[values] += incr def inc(self, *values): - self.inc_by(1, *values) + self.counts[values] += 1 def render_item(self, k): return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] @@ -132,6 +132,8 @@ class CacheMetric(object): This metric generates standard metric name pairs, so that monitoring rules can easily be applied to measure hit ratio.""" + __slots__ = ("name", "hits", "total", "size") + def __init__(self, name, size_callback, labels=[]): self.name = name diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 758f5982b0..4bbb16ed3c 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -32,6 +32,7 @@ import os import functools import inspect import threading +import itertools logger = logging.getLogger(__name__) @@ -43,6 +44,14 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) class Cache(object): + __slots__ = ( + "cache", + "max_entries", + "name", + "keylen", + "sequence", + "thread", + ) def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): if lru: @@ -293,16 +302,21 @@ class CacheListDescriptor(object): # cached is a dict arg -> deferred, where deferred results in a # 2-tuple (`arg`, `result`) - cached = {} + results = {} + cached_defers = {} missing = [] for arg in list_args: key = list(keyargs) key[self.list_pos] = arg try: - res = cache.get(tuple(key)).observe() - res.addCallback(lambda r, arg: (arg, r), arg) - cached[arg] = res + res = cache.get(tuple(key)) + if not res.called: + res = res.observe() + res.addCallback(lambda r, arg: (arg, r), arg) + cached_defers[arg] = res + else: + results[arg] = res.result except KeyError: missing.append(arg) @@ -340,12 +354,22 @@ class CacheListDescriptor(object): res = observer.observe() res.addCallback(lambda r, arg: (arg, r), arg) - cached[arg] = res - - return preserve_context_over_deferred(defer.gatherResults( - cached.values(), - consumeErrors=True, - ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))) + cached_defers[arg] = res + + if cached_defers: + return preserve_context_over_deferred(defer.gatherResults( + cached_defers.values(), + consumeErrors=True, + ).addCallback( + lambda res: { + k: v + for k, v in itertools.chain(results.items(), res) + } + )).addErrback( + unwrapFirstError + ) + else: + return results obj.__dict__[self.orig.__name__] = wrapped -- cgit 1.5.1 From e043ede4a2f18a47b67bf19368600183554824f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jun 2016 11:52:32 +0100 Subject: Small optimisation to CacheListDescriptor --- synapse/metrics/metric.py | 22 ++++++++++------------ synapse/util/async.py | 9 +++++++++ synapse/util/caches/descriptors.py | 4 ++-- 3 files changed, 21 insertions(+), 14 deletions(-) (limited to 'synapse/util') diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 6f82b360bc..368fc24984 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -15,7 +15,6 @@ from itertools import chain -from collections import Counter # TODO(paul): I can't believe Python doesn't have one of these @@ -56,29 +55,30 @@ class CounterMetric(BaseMetric): """The simplest kind of metric; one that stores a monotonically-increasing integer that counts events.""" - __slots__ = ("counts") - def __init__(self, *args, **kwargs): super(CounterMetric, self).__init__(*args, **kwargs) - self.counts = Counter() + self.counts = {} # Scalar metrics are never empty if self.is_scalar(): self.counts[()] = 0 def inc_by(self, incr, *values): - # if len(values) != self.dimension(): - # raise ValueError( - # "Expected as many values to inc() as labels (%d)" % (self.dimension()) - # ) + if len(values) != self.dimension(): + raise ValueError( + "Expected as many values to inc() as labels (%d)" % (self.dimension()) + ) # TODO: should assert that the tag values are all strings - self.counts[values] += incr + if values not in self.counts: + self.counts[values] = incr + else: + self.counts[values] += incr def inc(self, *values): - self.counts[values] += 1 + self.inc_by(1, *values) def render_item(self, k): return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] @@ -132,8 +132,6 @@ class CacheMetric(object): This metric generates standard metric name pairs, so that monitoring rules can easily be applied to measure hit ratio.""" - __slots__ = ("name", "hits", "total", "size") - def __init__(self, name, size_callback, labels=[]): self.name = name diff --git a/synapse/util/async.py b/synapse/util/async.py index 0d6f48e2d8..40be7fe7e3 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -102,6 +102,15 @@ class ObservableDeferred(object): def observers(self): return self._observers + def has_called(self): + return self._result is not None + + def has_succeeded(self): + return self._result is not None and self._result[0] is True + + def get_result(self): + return self._result[1] + def __getattr__(self, name): return getattr(self._deferred, name) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 4bbb16ed3c..5be4097279 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -311,12 +311,12 @@ class CacheListDescriptor(object): try: res = cache.get(tuple(key)) - if not res.called: + if not res.has_succeeded(): res = res.observe() res.addCallback(lambda r, arg: (arg, r), arg) cached_defers[arg] = res else: - results[arg] = res.result + results[arg] = res.get_result() except KeyError: missing.append(arg) -- cgit 1.5.1 From 73c711243382a48b9b67fddf5ed9df2d1ee1be43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jun 2016 11:29:44 +0100 Subject: Change CacheMetrics to be quicker We change it so that each cache has an individual CacheMetric, instead of having one global CacheMetric. This means that when a cache tries to increment a counter it does not need to go through so many indirections. --- synapse/metrics/__init__.py | 16 ++++------- synapse/metrics/metric.py | 44 +++++++++++++++--------------- synapse/util/caches/__init__.py | 20 ++++++++++---- synapse/util/caches/descriptors.py | 17 +++++++++--- synapse/util/caches/dictionary_cache.py | 8 +++--- synapse/util/caches/expiringcache.py | 8 +++--- synapse/util/caches/stream_change_cache.py | 16 +++++------ tests/metrics/test_metric.py | 23 +++++++--------- 8 files changed, 82 insertions(+), 70 deletions(-) (limited to 'synapse/util') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 5664d5a381..c38f24485a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -33,11 +33,7 @@ from .metric import ( logger = logging.getLogger(__name__) -# We'll keep all the available metrics in a single toplevel dict, one shared -# for the entire process. We don't currently support per-HomeServer instances -# of metrics, because in practice any one python VM will host only one -# HomeServer anyway. This makes a lot of implementation neater -all_metrics = {} +all_metrics = [] class Metrics(object): @@ -53,7 +49,7 @@ class Metrics(object): metric = metric_class(full_name, *args, **kwargs) - all_metrics[full_name] = metric + all_metrics.append(metric) return metric def register_counter(self, *args, **kwargs): @@ -84,12 +80,12 @@ def render_all(): # TODO(paul): Internal hack update_resource_metrics() - for name in sorted(all_metrics.keys()): + for metric in all_metrics: try: - strs += all_metrics[name].render() + strs += metric.render() except Exception: - strs += ["# FAILED to render %s" % name] - logger.exception("Failed to render %s metric", name) + strs += ["# FAILED to render"] + logger.exception("Failed to render metric") strs.append("") # to generate a final CRLF diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 368fc24984..341043952a 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -47,9 +47,6 @@ class BaseMetric(object): for k, v in zip(self.labels, values)]) ) - def render(self): - return map_concat(self.render_item, sorted(self.counts.keys())) - class CounterMetric(BaseMetric): """The simplest kind of metric; one that stores a monotonically-increasing @@ -83,6 +80,9 @@ class CounterMetric(BaseMetric): def render_item(self, k): return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] + def render(self): + return map_concat(self.render_item, sorted(self.counts.keys())) + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever @@ -126,30 +126,30 @@ class DistributionMetric(object): class CacheMetric(object): - """A combination of two CounterMetrics, one to count cache hits and one to - count a total, and a callback metric to yield the current size. + __slots__ = ("name", "cache_name", "hits", "misses", "size_callback") - This metric generates standard metric name pairs, so that monitoring rules - can easily be applied to measure hit ratio.""" - - def __init__(self, name, size_callback, labels=[]): + def __init__(self, name, size_callback, cache_name): self.name = name + self.cache_name = cache_name - self.hits = CounterMetric(name + ":hits", labels=labels) - self.total = CounterMetric(name + ":total", labels=labels) + self.hits = 0 + self.misses = 0 - self.size = CallbackMetric( - name + ":size", - callback=size_callback, - labels=labels, - ) + self.size_callback = size_callback - def inc_hits(self, *values): - self.hits.inc(*values) - self.total.inc(*values) + def inc_hits(self): + self.hits += 1 - def inc_misses(self, *values): - self.total.inc(*values) + def inc_misses(self): + self.misses += 1 def render(self): - return self.hits.render() + self.total.render() + self.size.render() + size = self.size_callback() + hits = self.hits + total = self.misses + self.hits + + return [ + """%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits), + """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total), + """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size), + ] diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index d53569ca49..ebd715c5dc 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -24,11 +24,21 @@ DEBUG_CACHES = False metrics = synapse.metrics.get_metrics_for("synapse.util.caches") caches_by_name = {} -cache_counter = metrics.register_cache( - "cache", - lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, - labels=["name"], -) +# cache_counter = metrics.register_cache( +# "cache", +# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, +# labels=["name"], +# ) + + +def register_cache(name, cache): + caches_by_name[name] = cache + return metrics.register_cache( + "cache", + lambda: len(cache), + name, + ) + _string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 758f5982b0..5d25c9e762 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -22,7 +22,7 @@ from synapse.util.logcontext import ( PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn ) -from . import caches_by_name, DEBUG_CACHES, cache_counter +from . import DEBUG_CACHES, register_cache from twisted.internet import defer @@ -43,6 +43,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) class Cache(object): + __slots__ = ( + "cache", + "max_entries", + "name", + "keylen", + "sequence", + "thread", + "metrics", + ) def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): if lru: @@ -59,7 +68,7 @@ class Cache(object): self.keylen = keylen self.sequence = 0 self.thread = None - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -74,10 +83,10 @@ class Cache(object): def get(self, key, default=_CacheSentinel): val = self.cache.get(key, _CacheSentinel) if val is not _CacheSentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return val - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() if default is _CacheSentinel: raise KeyError() diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index f92d80542b..b0ca1bb79d 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -15,7 +15,7 @@ from synapse.util.caches.lrucache import LruCache from collections import namedtuple -from . import caches_by_name, cache_counter +from . import register_cache import threading import logging @@ -43,7 +43,7 @@ class DictionaryCache(object): __slots__ = [] self.sentinel = Sentinel() - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -58,7 +58,7 @@ class DictionaryCache(object): def get(self, key, dict_keys=None): entry = self.cache.get(key, self.sentinel) if entry is not self.sentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() if dict_keys is None: return DictionaryEntry(entry.full, dict(entry.value)) @@ -69,7 +69,7 @@ class DictionaryCache(object): if k in entry.value }) - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return DictionaryEntry(False, {}) def invalidate(self, key): diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 2b68c1ac93..080388958f 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache import logging @@ -49,7 +49,7 @@ class ExpiringCache(object): self._cache = {} - caches_by_name[cache_name] = self._cache + self.metrics = register_cache(cache_name, self._cache) def start(self): if not self._expiry_ms: @@ -78,9 +78,9 @@ class ExpiringCache(object): def __getitem__(self, key): try: entry = self._cache[key] - cache_counter.inc_hits(self._cache_name) + self.metrics.inc_hits() except KeyError: - cache_counter.inc_misses(self._cache_name) + self.metrics.inc_misses() raise if self._reset_expiry_on_get: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index ea8a74ca69..3c051dabc4 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache from blist import sorteddict @@ -42,7 +42,7 @@ class StreamChangeCache(object): self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name - caches_by_name[self.name] = self._cache + self.metrics = register_cache(self.name, self._cache) for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) @@ -53,19 +53,19 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos < self._earliest_known_stream_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False if stream_pos < latest_entity_change_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False def get_entities_changed(self, entities, stream_pos): @@ -82,10 +82,10 @@ class StreamChangeCache(object): self._cache[k] for k in keys[i:] ).intersection(entities) - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() else: result = entities - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return result diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py index f3c1927ce1..f85455a5af 100644 --- a/tests/metrics/test_metric.py +++ b/tests/metrics/test_metric.py @@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase): 'vector{method="PUT"} 1', ]) - # Check that passing too few values errors - self.assertRaises(ValueError, counter.inc) - class CallbackMetricTestCase(unittest.TestCase): @@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase): def test_cache(self): d = dict() - metric = CacheMetric("cache", lambda: len(d)) + metric = CacheMetric("cache", lambda: len(d), "cache_name") self.assertEquals(metric.render(), [ - 'cache:hits 0', - 'cache:total 0', - 'cache:size 0', + 'cache:hits{name="cache_name"} 0', + 'cache:total{name="cache_name"} 0', + 'cache:size{name="cache_name"} 0', ]) metric.inc_misses() d["key"] = "value" self.assertEquals(metric.render(), [ - 'cache:hits 0', - 'cache:total 1', - 'cache:size 1', + 'cache:hits{name="cache_name"} 0', + 'cache:total{name="cache_name"} 1', + 'cache:size{name="cache_name"} 1', ]) metric.inc_hits() self.assertEquals(metric.render(), [ - 'cache:hits 1', - 'cache:total 2', - 'cache:size 1', + 'cache:hits{name="cache_name"} 1', + 'cache:total{name="cache_name"} 2', + 'cache:size{name="cache_name"} 1', ]) -- cgit 1.5.1 From 58a224a6515dceacebc729f1e6fbb87a22f3a35a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 11:47:07 +0100 Subject: Pull out update_results_dict --- synapse/util/caches/descriptors.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 5be4097279..799fd2a9c6 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -32,7 +32,7 @@ import os import functools import inspect import threading -import itertools + logger = logging.getLogger(__name__) @@ -357,17 +357,16 @@ class CacheListDescriptor(object): cached_defers[arg] = res if cached_defers: + def update_results_dict(res): + results.update(res) + return results + return preserve_context_over_deferred(defer.gatherResults( cached_defers.values(), consumeErrors=True, - ).addCallback( - lambda res: { - k: v - for k, v in itertools.chain(results.items(), res) - } - )).addErrback( + ).addCallback(update_results_dict).addErrback( unwrapFirstError - ) + )) else: return results -- cgit 1.5.1