From f683b5de47ae57a4fb6e9b80ad2f83c34c913486 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 7 May 2015 21:27:53 +0100 Subject: Store presence cachemap in an ordered dict, so that the newer serials will be at the end --- synapse/handlers/presence.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e15610401..6547e0434e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -26,6 +26,7 @@ import synapse.metrics from ._base import BaseHandler import logging +from collections import OrderedDict logger = logging.getLogger(__name__) @@ -143,7 +144,7 @@ class PresenceHandler(BaseHandler): self._remote_offline_serials = [] # map any user to a UserPresenceCache - self._user_cachemap = {} + self._user_cachemap = OrderedDict() # keep them sorted by serial self._user_cachemap_latest_serial = 0 metrics.register_callback( @@ -165,6 +166,14 @@ class PresenceHandler(BaseHandler): else: return UserPresenceCache() + def _bump_serial(self, user=None): + self._user_cachemap_latest_serial += 1 + + if user: + # Move to end + cache = self._user_cachemap.pop(user) + self._user_cachemap[user] = cache + def registered_user(self, user): return self.store.create_presence(user.localpart) @@ -301,7 +310,7 @@ class PresenceHandler(BaseHandler): def changed_presencelike_data(self, user, state): statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 + self._bump_serial(user=user) statuscache.update(state, serial=self._user_cachemap_latest_serial) return self.push_presence(user, statuscache=statuscache) @@ -323,7 +332,7 @@ class PresenceHandler(BaseHandler): # No actual update but we need to bump the serial anyway for the # event source - self._user_cachemap_latest_serial += 1 + self._bump_serial() statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( @@ -706,7 +715,7 @@ class PresenceHandler(BaseHandler): statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 + self._bump_serial(user=user) statuscache.update(state, serial=self._user_cachemap_latest_serial) if not observers and not room_ids: -- cgit 1.4.1 From 45543028bbeb8395e8bbc5768680f6bf074d366f Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 7 May 2015 22:40:10 +0100 Subject: Use the presence cachemap ordering to early-abort the iteration loop --- synapse/handlers/presence.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6547e0434e..601a4c6dba 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -875,10 +875,15 @@ class PresenceEventSource(object): updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in reversed(cachemap.keys()): cached = cachemap[observed_user] - if cached.serial <= from_key or cached.serial > max_serial: + # Since this is ordered in descending order of serial, we can just + # stop once we've seen enough + if cached.serial <= from_key: + break + + if cached.serial > max_serial: continue if not (yield self.is_visible(observer_user, observed_user)): -- cgit 1.4.1 From 476899295f5fd6cff64799bcbc84cd4bf9005e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 16:32:18 +0100 Subject: Change the way we do logging contexts so that they survive divergences --- demo/start.sh | 1 + synapse/crypto/keyclient.py | 17 +++++++----- synapse/federation/federation_server.py | 46 ++++++++++++++++----------------- synapse/handlers/events.py | 8 +++--- synapse/handlers/presence.py | 34 +++++++++++------------- synapse/handlers/profile.py | 15 +++++------ synapse/http/client.py | 6 ++++- synapse/http/matrixfederationclient.py | 32 +++++++++++------------ synapse/notifier.py | 16 +++++------- synapse/storage/_base.py | 11 ++++---- synapse/util/__init__.py | 8 +++--- synapse/util/async.py | 6 ++--- synapse/util/logcontext.py | 31 ++++++++++++++++++++++ 13 files changed, 131 insertions(+), 100 deletions(-) (limited to 'synapse/handlers') diff --git a/demo/start.sh b/demo/start.sh index 5b3daef57f..b9cc14b9d2 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -31,6 +31,7 @@ for port in 8080 8081 8082; do #rm $DIR/etc/$port.config python -m synapse.app.homeserver \ --generate-config \ + --enable_registration \ -H "localhost:$https_port" \ --config-path "$DIR/etc/$port.config" \ diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 4911f0896b..24f15f3154 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -18,7 +18,9 @@ from twisted.web.http import HTTPClient from twisted.internet.protocol import Factory from twisted.internet import defer, reactor from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_context_over_fn, preserve_context_over_deferred +) import simplejson as json import logging @@ -40,11 +42,14 @@ def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1): for i in range(5): try: - with PreserveLoggingContext(): - protocol = yield endpoint.connect(factory) - server_response, server_certificate = yield protocol.remote_key - defer.returnValue((server_response, server_certificate)) - return + protocol = yield preserve_context_over_fn( + endpoint.connect, factory + ) + server_response, server_certificate = yield preserve_context_over_deferred( + protocol.remote_key + ) + defer.returnValue((server_response, server_certificate)) + return except SynapseKeyClientError as e: logger.exception("Error getting key for %r" % (server_name,)) if e.status.startswith("4"): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2b46188c91..cd79e23f4b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -20,7 +20,6 @@ from .federation_base import FederationBase from .units import Transaction, Edu from synapse.util.logutils import log_function -from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent import synapse.metrics @@ -123,29 +122,28 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - with PreserveLoggingContext(): - results = [] - - for pdu in pdu_list: - d = self._handle_new_pdu(transaction.origin, pdu) - - try: - yield d - results.append({}) - except FederationError as e: - self.send_failure(e, transaction.origin) - results.append({"error": str(e)}) - except Exception as e: - results.append({"error": str(e)}) - logger.exception("Failed to handle PDU") - - if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( - transaction.origin, - edu.edu_type, - edu.content - ) + results = [] + + for pdu in pdu_list: + d = self._handle_new_pdu(transaction.origin, pdu) + + try: + yield d + results.append({}) + except FederationError as e: + self.send_failure(e, transaction.origin) + results.append({"error": str(e)}) + except Exception as e: + results.append({"error": str(e)}) + logger.exception("Failed to handle PDU") + + if hasattr(transaction, "edus"): + for edu in [Edu(**x) for x in transaction.edus]: + self.received_edu( + transaction.origin, + edu.edu_type, + edu.content + ) for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f9f855213b..993d33ba47 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,7 +15,6 @@ from twisted.internet import defer -from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event @@ -81,10 +80,9 @@ class EventStreamHandler(BaseHandler): # thundering herds on restart. timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) - with PreserveLoggingContext(): - events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout - ) + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) time_now = self.clock.time_msec() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e15610401..6ae39a1d37 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -19,7 +19,6 @@ from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState from synapse.util.logutils import log_function -from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID import synapse.metrics @@ -278,15 +277,14 @@ class PresenceHandler(BaseHandler): now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap - with PreserveLoggingContext(): - if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) - elif not now_online and was_polling: - self.stop_polling_presence(target_user) + if now_online and not was_polling: + self.start_polling_presence(target_user, state=state) + elif not now_online and was_polling: + self.stop_polling_presence(target_user) - # TODO(paul): perform a presence push as part of start/stop poll so - # we don't have to do this all the time - self.changed_presencelike_data(target_user, state) + # TODO(paul): perform a presence push as part of start/stop poll so + # we don't have to do this all the time + self.changed_presencelike_data(target_user, state) def bump_presence_active_time(self, user, now=None): if now is None: @@ -408,10 +406,10 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.start_polling_presence( - observer_user, target_user=observed_user - ) + + self.start_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): @@ -430,10 +428,9 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - with PreserveLoggingContext(): - self.stop_polling_presence( - observer_user, target_user=observed_user - ) + self.stop_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): @@ -766,8 +763,7 @@ class PresenceHandler(BaseHandler): if not self._remote_sendmap[user]: del self._remote_sendmap[user] - with PreserveLoggingContext(): - yield defer.DeferredList(deferreds, consumeErrors=True) + yield defer.DeferredList(deferreds, consumeErrors=True) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index ee2732b848..a7de7a80f8 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -154,14 +154,13 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): defer.returnValue(None) - with PreserveLoggingContext(): - (displayname, avatar_url) = yield defer.gatherResults( - [ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ], - consumeErrors=True - ) + (displayname, avatar_url) = yield defer.gatherResults( + [ + self.store.get_profile_displayname(user.localpart), + self.store.get_profile_avatar_url(user.localpart), + ], + consumeErrors=True + ) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/http/client.py b/synapse/http/client.py index e8a5dedab4..5b3cefb2dc 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api.errors import CodeMessageException +from synapse.util.logcontext import preserve_context_over_fn from syutil.jsonutil import encode_canonical_json import synapse.metrics @@ -61,7 +62,10 @@ class SimpleHttpClient(object): # A small wrapper around self.agent.request() so we can easily attach # counters to it outgoing_requests_counter.inc(method) - d = self.agent.request(method, *args, **kwargs) + d = preserve_context_over_fn( + self.agent.request, + method, *args, **kwargs + ) def _cb(response): incoming_responses_counter.inc(method, response.code) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7fa295cad5..c99d237c73 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.async import sleep -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import preserve_context_over_fn import synapse.metrics from syutil.jsonutil import encode_canonical_json @@ -144,22 +144,22 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, url_bytes, headers_dict) try: - with PreserveLoggingContext(): - request_deferred = self.agent.request( - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + request_deferred = preserve_context_over_fn( + self.agent.request, + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=60, - ) + response = yield self.clock.time_bound_deferred( + request_deferred, + time_out=60, + ) logger.debug("Got response to %s", method) break diff --git a/synapse/notifier.py b/synapse/notifier.py index 78eb28e4b2..fbbccb38e6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import preserve_context_over_deferred from synapse.types import StreamToken import synapse.metrics @@ -223,11 +223,10 @@ class Notifier(object): def eb(failure): logger.exception("Failed to notify listener", failure) - with PreserveLoggingContext(): - yield defer.DeferredList( + yield defer.DeferredList( [notify(l).addErrback(eb) for l in listeners], consumeErrors=True, - ) + ) @defer.inlineCallbacks @log_function @@ -298,11 +297,10 @@ class Notifier(object): failure.getTracebackObject()) ) - with PreserveLoggingContext(): - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + yield defer.DeferredList( + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, + ) @defer.inlineCallbacks def wait_for_events(self, user, rooms, filter, timeout, callback): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ee5587c721..b0020f51db 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,7 +18,7 @@ from synapse.api.errors import StoreError from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logutils import log_function -from synapse.util.logcontext import PreserveLoggingContext, LoggingContext +from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache import synapse.metrics @@ -419,10 +419,11 @@ class SQLBaseStore(object): self._txn_perf_counters.update(desc, start, end) sql_txn_timer.inc_by(duration, desc) - with PreserveLoggingContext(): - result = yield self._db_pool.runWithConnection( - inner_func, *args, **kwargs - ) + result = yield preserve_context_over_fn( + self._db_pool.runWithConnection, + inner_func, *args, **kwargs + ) + for after_callback, after_args in after_callbacks: after_callback(*after_args) defer.returnValue(result) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 79109d0b19..364b927851 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -50,8 +50,10 @@ class Clock(object): current_context = LoggingContext.current_context() def wrapped_callback(): - LoggingContext.thread_local.current_context = current_context - callback() + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = current_context + callback() + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): diff --git a/synapse/util/async.py b/synapse/util/async.py index d8febdb90c..f78395a431 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,15 +16,13 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import preserve_context_over_deferred -@defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, seconds) - with PreserveLoggingContext(): - yield d + return preserve_context_over_deferred(d) def run_on_reactor(): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index da7872e95d..192e3f49f0 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + import threading import logging @@ -129,3 +131,32 @@ class PreserveLoggingContext(object): def __exit__(self, type, value, traceback): """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context + + +def preserve_context_over_fn(fn, *args, **kwargs): + with PreserveLoggingContext(): + deferred = fn(*args, **kwargs) + + return preserve_context_over_deferred(deferred) + + +def preserve_context_over_deferred(deferred): + d = defer.Deferred() + + current_context = LoggingContext.current_context() + + def cb(res): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = current_context + res = d.callback(res) + return res + + def eb(failure): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = current_context + res = d.errback(failure) + return res + + deferred.addCallbacks(cb, eb) + + return d -- cgit 1.4.1 From 4ac194159200e9cfb0020003409592e5324cbd18 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 16:32:56 +0100 Subject: PEP8 --- synapse/handlers/profile.py | 1 - synapse/notifier.py | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index a7de7a80f8..ffb449d457 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership -from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID from ._base import BaseHandler diff --git a/synapse/notifier.py b/synapse/notifier.py index fbbccb38e6..7282dfd7f3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,7 +16,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from synapse.util.logcontext import preserve_context_over_deferred from synapse.types import StreamToken import synapse.metrics @@ -224,8 +223,8 @@ class Notifier(object): logger.exception("Failed to notify listener", failure) yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, ) @defer.inlineCallbacks -- cgit 1.4.1 From 2236ef6c92b7964665f5c43b941754d70aa506d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 19:53:34 +0100 Subject: Fix up leak. Add warnings. --- synapse/handlers/_base.py | 11 +++++++---- synapse/handlers/federation.py | 29 ++++++++++++++++----------- synapse/handlers/presence.py | 10 ++++++---- synapse/handlers/typing.py | 4 +++- synapse/http/server.py | 6 ++++-- synapse/util/__init__.py | 3 ++- synapse/util/distributor.py | 45 ++++++++++++++++++++---------------------- synapse/util/logcontext.py | 11 ++++++++++- 8 files changed, 70 insertions(+), 49 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4b3f4eadab..ddc5c21e7d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID +from synapse.util.logcontext import PreserveLoggingContext + import logging @@ -137,10 +139,11 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + notify_d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..77c315c47c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -197,9 +198,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -431,9 +433,10 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + new_event, extra_users=[joinee] + ) def log_failure(f): logger.warn( @@ -512,9 +515,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -594,9 +598,10 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - d = self.notifier.on_new_room_event( - event, extra_users=[target_user], - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=[target_user], + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6ae39a1d37..1edab05492 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.types import UserID import synapse.metrics @@ -808,10 +809,11 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, observed_user, users_to_push=[], room_ids=[], statuscache=None): - self.notifier.on_new_user_event( - users_to_push, - room_ids, - ) + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + users_to_push, + room_ids, + ) class PresenceEventSource(object): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c0b2bd7db0..64fe51aa3e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID import logging @@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler): self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial - self.notifier.on_new_user_event(rooms=[room_id]) + with PreserveLoggingContext(): + self.notifier.on_new_user_event(rooms=[room_id]) class TypingNotificationEventSource(object): diff --git a/synapse/http/server.py b/synapse/http/server.py index 93ecbd7589..73efbff4f2 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -17,7 +17,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError ) -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext import synapse.metrics from syutil.jsonutil import ( @@ -85,7 +85,9 @@ def request_handler(request_handler): "Received request: %s %s", request.method, request.path ) - yield request_handler(self, request) + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d code = request.code except CodeMessageException as e: code = e.code diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 364b927851..fd3eb1f574 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -54,7 +54,8 @@ class Clock(object): LoggingContext.thread_local.current_context = current_context callback() - return reactor.callLater(delay, wrapped_callback) + with PreserveLoggingContext(): + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): timer.cancel() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 9d9c350397..5b150cb0e5 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext - from twisted.internet import defer import logging @@ -93,7 +91,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -101,24 +98,24 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - with PreserveLoggingContext(): - deferreds = [] - for observer in self.observers: - d = defer.maybeDeferred(observer, *args, **kwargs) - - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - failure.raiseException() - deferreds.append(d.addErrback(eb)) - results = [] - for deferred in deferreds: - result = yield deferred - results.append(result) - defer.returnValue(results) + + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + failure.raiseException() + + deferreds = [ + defer.maybeDeferred(observer, *args, **kwargs) + for observer in self.observers + ] + + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addErrback(eb) + + return d diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 3dce8d2bf3..a92d518b43 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -132,6 +132,13 @@ class PreserveLoggingContext(object): """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context + if self.current_context is not LoggingContext.sentinel: + if self.current_context.parent_context is None: + logger.warn( + "Restoring dead context: %s", + self.current_context, + ) + def preserve_context_over_fn(fn, *args, **kwargs): """Takes a function and invokes it with the given arguments, but removes @@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred): res = d.errback(failure) return res - deferred.addCallbacks(cb, eb) + if deferred.called: + return deferred + deferred.addCallbacks(cb, eb) return d -- cgit 1.4.1 From 84e6b4001f22b0e8c2f806053189fcdb1e85205b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 May 2015 18:01:31 +0100 Subject: Initial hack at wiring together pagination and backfill --- synapse/handlers/federation.py | 108 +++++++++++++++++++++++++++++++++++- synapse/handlers/message.py | 10 +++- synapse/storage/event_federation.py | 28 +++++++++- 3 files changed, 141 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..4d39cd4b30 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -218,10 +218,11 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): + def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` """ - extremities = yield self.store.get_oldest_events_in_room(room_id) + if not extremities: + extremities = yield self.store.get_oldest_events_in_room(room_id) pdus = yield self.replication_layer.backfill( dest, @@ -248,6 +249,109 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def maybe_backfill(self, room_id, current_depth): + """Checks the database to see if we should backfill before paginating + """ + extremities = yield self.store.get_oldest_events_with_depth_in_room( + room_id + ) + + logger.debug("Got extremeties: %r", extremities) + + if not extremities: + return + + # Check if we reached a point where we should start backfilling. + sorted_extremeties_tuple = sorted( + extremities.items(), + key=lambda e: -int(e[1]) + ) + max_depth = sorted_extremeties_tuple[0][1] + + logger.debug("max_depth: %r", max_depth) + if current_depth > max_depth: + return + + # Now we need to decide which hosts to hit first. + + # First we try hosts that are already in the room, that were around + # at the time. TODO: HEURISTIC ALERT. + + curr_state = yield self.state_handler.get_current_state(room_id) + + def get_domains_from_state(state): + joined_users = [ + (state_key, int(event.depth)) + for (e_type, state_key), event in state.items() + if e_type == EventTypes.Member + and event.membership == Membership.JOIN + ] + + joined_domains = {} + for u, d in joined_users: + try: + dom = UserID.from_string(u).domain + old_d = joined_domains.get(dom) + if old_d: + joined_domains[dom] = min(d, old_d) + else: + joined_domains[dom] = d + except: + pass + + return sorted(joined_domains.items(), key=lambda d: d[1]) + + curr_domains = get_domains_from_state(curr_state) + + logger.debug("curr_domains: %r", curr_domains) + + likely_domains = [ + domain for domain, depth in curr_domains + ] + + @defer.inlineCallbacks + def try_backfill(domains): + # TODO: Should we try multiple of these at a time? + for dom in domains: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + + if events: + defer.returnValue(True) + defer.returnValue(False) + + success = yield try_backfill(likely_domains) + if success: + defer.returnValue(True) + + # Huh, well *those* domains didn't work out. Lets try some domains + # from the time. + + tried_domains = set(likely_domains) + + states = yield defer.gatherResults({ + e: self.state_handler.resolve_state_groups([e])[1] + for e in extremities.keys() + }) + + for e_id, _ in sorted_extremeties_tuple: + likely_domains = get_domains_from_state(states[e_id])[0] + + success = yield try_backfill([ + dom for dom in likely_domains + if dom not in tried_domains + ]) + if success: + defer.returnValue(True) + + tried_domains.update(likely_domains) + + defer.returnValue(False) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17f..38e375f86a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -21,7 +21,7 @@ from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util.logcontext import PreserveLoggingContext -from synapse.types import UserID +from synapse.types import UserID, RoomStreamToken from ._base import BaseHandler @@ -92,6 +92,14 @@ class MessageHandler(BaseHandler): yield self.hs.get_event_sources().get_current_token() ) + room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + if room_token.topological is None: + raise SynapseError(400, "Invalid token") + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, room_token.topological + ) + user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 74b4e23590..2b5424ced4 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -79,6 +79,28 @@ class EventFederationStore(SQLBaseStore): room_id, ) + def get_oldest_events_with_depth_in_room(self, room_id): + return self.runInteraction( + "get_oldest_events_with_depth_in_room", + self.get_oldest_events_with_depth_in_room_txn, + room_id, + ) + + def get_oldest_events_with_depth_in_room_txn(self, txn, room_id): + sql = ( + "SELECT b.event_id, MAX(e.depth) FROM events as e" + " INNER JOIN event_edges as g" + " ON g.event_id = e.event_id AND g.room_id = e.room_id" + " INNER JOIN event_backward_extremities as b" + " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id" + " WHERE b.room_id = ? AND g.is_state is ?" + " GROUP BY b.event_id" + ) + + txn.execute(sql, (room_id, False,)) + + return dict(txn.fetchall()) + def _get_oldest_events_in_room_txn(self, txn, room_id): return self._simple_select_onecol_txn( txn, @@ -247,11 +269,13 @@ class EventFederationStore(SQLBaseStore): do_insert = depth < min_depth if min_depth else True if do_insert: - self._simple_insert_txn( + self._simple_upsert_txn( txn, table="room_depth", - values={ + keyvalues={ "room_id": room_id, + }, + values={ "min_depth": depth, }, ) -- cgit 1.4.1 From 4df11b503957a74a948150950da49574c21887bf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 10:28:10 +0100 Subject: Make get_current_token accept a direction parameter, which tells whether the source whether we want a token for going 'forwards' or 'backwards' --- synapse/handlers/message.py | 4 +++- synapse/handlers/room.py | 4 ++-- synapse/storage/stream.py | 20 ++++++++++++++++++-- synapse/streams/events.py | 6 +++--- 4 files changed, 26 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 38e375f86a..1809a44a99 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -89,7 +89,9 @@ class MessageHandler(BaseHandler): if not pagin_config.from_token: pagin_config.from_token = ( - yield self.hs.get_event_sources().get_current_token() + yield self.hs.get_event_sources().get_current_token( + direction='b' + ) ) room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cfa2e38ed2..29b6d52757 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -577,8 +577,8 @@ class RoomEventSource(object): defer.returnValue((events, end_key)) - def get_current_key(self): - return self.store.get_room_events_max_id() + def get_current_key(self, direction='f'): + return self.store.get_room_events_max_id(direction) @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b03fc67f71..8045e17fd7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -364,9 +364,25 @@ class StreamStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_room_events_max_id(self): + def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token(self) - defer.returnValue("s%d" % (token,)) + if direction != 'b': + defer.returnValue("s%d" % (token,)) + else: + topo = yield self.runInteraction( + "_get_max_topological_txn", self._get_max_topological_txn + ) + defer.returnValue("t%d-%d" % (topo, token)) + + def _get_max_topological_txn(self, txn): + txn.execute( + "SELECT MAX(topological_ordering) FROM events" + " WHERE outlier = ?", + (False,) + ) + + rows = txn.fetchall() + return rows[0][0] if rows else 0 @defer.inlineCallbacks def _get_min_token(self): diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 5c8e54b78b..dff7970bea 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -31,7 +31,7 @@ class NullSource(object): def get_new_events_for_user(self, user, from_key, limit): return defer.succeed(([], from_key)) - def get_current_key(self): + def get_current_key(self, direction='f'): return defer.succeed(0) def get_pagination_rows(self, user, pagination_config, key): @@ -52,10 +52,10 @@ class EventSources(object): } @defer.inlineCallbacks - def get_current_token(self): + def get_current_token(self, direction='f'): token = StreamToken( room_key=( - yield self.sources["room"].get_current_key() + yield self.sources["room"].get_current_key(direction) ), presence_key=( yield self.sources["presence"].get_current_key() -- cgit 1.4.1 From 367382b575a61f780f3e70a62cc01a790dcc9375 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 10:35:45 +0100 Subject: Handle the case where the other side is unreachable when backfilling --- synapse/handlers/federation.py | 56 +++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 14 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4d39cd4b30..8b5ac5d6c4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -18,7 +18,7 @@ from ._base import BaseHandler from synapse.api.errors import ( - AuthError, FederationError, StoreError, + AuthError, FederationError, StoreError, CodeMessageException, SynapseError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function @@ -29,6 +29,8 @@ from synapse.crypto.event_signing import ( ) from synapse.types import UserID +from synapse.util.retryutils import NotRetryingDestination + from twisted.internet import defer import itertools @@ -251,15 +253,15 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def maybe_backfill(self, room_id, current_depth): - """Checks the database to see if we should backfill before paginating + """Checks the database to see if we should backfill before paginating, + and if so do. """ extremities = yield self.store.get_oldest_events_with_depth_in_room( room_id ) - logger.debug("Got extremeties: %r", extremities) - if not extremities: + logger.debug("Not backfilling as no extremeties found.") return # Check if we reached a point where we should start backfilling. @@ -269,14 +271,17 @@ class FederationHandler(BaseHandler): ) max_depth = sorted_extremeties_tuple[0][1] - logger.debug("max_depth: %r", max_depth) if current_depth > max_depth: + logger.debug( + "Not backfilling as we don't need to. %d < %d", + current_depth, max_depth, + ) return # Now we need to decide which hosts to hit first. - # First we try hosts that are already in the room, that were around - # at the time. TODO: HEURISTIC ALERT. + # First we try hosts that are already in the room + # TODO: HEURISTIC ALERT. curr_state = yield self.state_handler.get_current_state(room_id) @@ -304,8 +309,6 @@ class FederationHandler(BaseHandler): curr_domains = get_domains_from_state(curr_state) - logger.debug("curr_domains: %r", curr_domains) - likely_domains = [ domain for domain, depth in curr_domains ] @@ -314,11 +317,36 @@ class FederationHandler(BaseHandler): def try_backfill(domains): # TODO: Should we try multiple of these at a time? for dom in domains: - events = yield self.backfill( - dom, room_id, - limit=100, - extremities=[e for e in extremities.keys()] - ) + try: + events = yield self.backfill( + dom, room_id, + limit=100, + extremities=[e for e in extremities.keys()] + ) + except SynapseError: + logger.info( + "Failed to backfil from %s because %s", + dom, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to backfil from %s because %s", + dom, e, + ) + continue + except NotRetryingDestination as e: + logger.info(e.message) + continue + except Exception as e: + logger.info( + "Failed to backfil from %s because %s", + dom, e, + ) + continue if events: defer.returnValue(True) -- cgit 1.4.1 From 95dedb866f04ee4ae034c35130f2a8dc86243fbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 13:14:29 +0100 Subject: Unwrap defer.gatherResults failures --- synapse/federation/federation_base.py | 4 +++- synapse/handlers/federation.py | 3 ++- synapse/handlers/message.py | 5 +++-- synapse/handlers/profile.py | 3 ++- synapse/handlers/room.py | 4 ++-- synapse/util/__init__.py | 6 ++++++ 6 files changed, 18 insertions(+), 7 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 21a763214b..5217d91aab 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -24,6 +24,8 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError +from synapse.util import unwrapFirstError + import logging @@ -94,7 +96,7 @@ class FederationBase(object): yield defer.gatherResults( [do(pdu) for pdu in pdus], consumeErrors=True - ) + ).addErrback(unwrapFirstError) defer.returnValue(signed_pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 77c315c47c..cd85001578 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor @@ -926,7 +927,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17f..b7d52647d7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,6 +20,7 @@ from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID @@ -303,7 +304,7 @@ class MessageHandler(BaseHandler): event.room_id ), ] - ) + ).addErrback(unwrapFirstError) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -328,7 +329,7 @@ class MessageHandler(BaseHandler): yield defer.gatherResults( [handle_room(e) for e in room_list], consumeErrors=True - ) + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index ffb449d457..71ff78ab23 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership from synapse.types import UserID +from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -159,7 +160,7 @@ class ProfileHandler(BaseHandler): self.store.get_profile_avatar_url(user.localpart), ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cfa2e38ed2..ea5abba6ab 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,7 +21,7 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import StoreError, SynapseError -from synapse.util import stringutils +from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event @@ -537,7 +537,7 @@ class RoomListHandler(BaseHandler): for room in chunk ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) for i, room in enumerate(chunk): room["num_joined_members"] = len(results[i]) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index fd3eb1f574..c1a16b639a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -23,6 +23,12 @@ import logging logger = logging.getLogger(__name__) +def unwrapFirstError(failure): + # defer.gatherResults and DeferredLists wrap failures. + failure.trap(defer.FirstError) + return failure.value.subFailure + + class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. -- cgit 1.4.1 From 6e5ac4a28fe79162e62b68cc62aa4e37badcc8b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 13:58:14 +0100 Subject: Err, gatherResults doesn't take a dict... --- synapse/handlers/federation.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8b5ac5d6c4..31c09365e3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -361,10 +361,13 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) - states = yield defer.gatherResults({ - e: self.state_handler.resolve_state_groups([e])[1] - for e in extremities.keys() - }) + event_ids = list(extremities.keys()) + + states = yield defer.gatherResults([ + self.state_handler.resolve_state_groups([e])[1] + for e in event_ids + ]) + states = dict(zip(event_ids, states)) for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id])[0] -- cgit 1.4.1 From a0dfffb33cf8ca721526be0c6a1e05199f2b6258 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 14:00:31 +0100 Subject: And another typo. --- synapse/handlers/federation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 31c09365e3..6f97127aec 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -274,7 +274,7 @@ class FederationHandler(BaseHandler): if current_depth > max_depth: logger.debug( "Not backfilling as we don't need to. %d < %d", - current_depth, max_depth, + max_depth, current_depth, ) return @@ -364,10 +364,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups([e])[1] + self.state_handler.resolve_state_groups([e]) for e in event_ids ]) - states = dict(zip(event_ids, states)) + states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id])[0] -- cgit 1.4.1 From 0d31ad5101546380308e7735d4543102b7e60bca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 14:02:01 +0100 Subject: Typos everywhere --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6f97127aec..7b7b998f05 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -370,7 +370,7 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: - likely_domains = get_domains_from_state(states[e_id])[0] + likely_domains = get_domains_from_state(states[e_id]) success = yield try_backfill([ dom for dom in likely_domains -- cgit 1.4.1 From 07a12231569189be1699f50d71b38414ba822bdc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 14:09:54 +0100 Subject: s/backfil/backfill/ --- synapse/handlers/federation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7b7b998f05..1093112587 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -325,7 +325,7 @@ class FederationHandler(BaseHandler): ) except SynapseError: logger.info( - "Failed to backfil from %s because %s", + "Failed to backfill from %s because %s", dom, e, ) continue @@ -334,7 +334,7 @@ class FederationHandler(BaseHandler): raise logger.info( - "Failed to backfil from %s because %s", + "Failed to backfill from %s because %s", dom, e, ) continue @@ -342,8 +342,8 @@ class FederationHandler(BaseHandler): logger.info(e.message) continue except Exception as e: - logger.info( - "Failed to backfil from %s because %s", + logger.warn( + "Failed to backfill from %s because %s", dom, e, ) continue -- cgit 1.4.1 From d7b3ac46f8ac32048559e06770e4fc2d57caeaf7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 12 May 2015 15:44:21 +0100 Subject: Revert "Improvement to performance of presence event stream handling" --- synapse/handlers/presence.py | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 28688d532d..1edab05492 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -26,7 +26,6 @@ import synapse.metrics from ._base import BaseHandler import logging -from collections import OrderedDict logger = logging.getLogger(__name__) @@ -144,7 +143,7 @@ class PresenceHandler(BaseHandler): self._remote_offline_serials = [] # map any user to a UserPresenceCache - self._user_cachemap = OrderedDict() # keep them sorted by serial + self._user_cachemap = {} self._user_cachemap_latest_serial = 0 metrics.register_callback( @@ -166,14 +165,6 @@ class PresenceHandler(BaseHandler): else: return UserPresenceCache() - def _bump_serial(self, user=None): - self._user_cachemap_latest_serial += 1 - - if user: - # Move to end - cache = self._user_cachemap.pop(user) - self._user_cachemap[user] = cache - def registered_user(self, user): return self.store.create_presence(user.localpart) @@ -309,7 +300,7 @@ class PresenceHandler(BaseHandler): def changed_presencelike_data(self, user, state): statuscache = self._get_or_make_usercache(user) - self._bump_serial(user=user) + self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) return self.push_presence(user, statuscache=statuscache) @@ -331,7 +322,7 @@ class PresenceHandler(BaseHandler): # No actual update but we need to bump the serial anyway for the # event source - self._bump_serial() + self._user_cachemap_latest_serial += 1 statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( @@ -713,7 +704,7 @@ class PresenceHandler(BaseHandler): statuscache = self._get_or_make_usercache(user) - self._bump_serial(user=user) + self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) if not observers and not room_ids: @@ -873,15 +864,10 @@ class PresenceEventSource(object): updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in reversed(cachemap.keys()): + for observed_user in cachemap.keys(): cached = cachemap[observed_user] - # Since this is ordered in descending order of serial, we can just - # stop once we've seen enough - if cached.serial <= from_key: - break - - if cached.serial > max_serial: + if cached.serial <= from_key or cached.serial > max_serial: continue if not (yield self.is_visible(observer_user, observed_user)): -- cgit 1.4.1 From 80fd2b574c9e56637e67b996289607185c590109 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 16:19:42 +0100 Subject: Don't talk to yourself when backfilling --- synapse/handlers/federation.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7d9906039e..880cbd77e7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -314,6 +314,7 @@ class FederationHandler(BaseHandler): likely_domains = [ domain for domain, depth in curr_domains + if domain is not self.server_name ] @defer.inlineCallbacks @@ -363,6 +364,7 @@ class FederationHandler(BaseHandler): # from the time. tried_domains = set(likely_domains) + tried_domains.add(self.server_name) event_ids = list(extremities.keys()) -- cgit 1.4.1 From 63878c03794d33a8767425e114845159e5c1cb9a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 13:42:21 +0100 Subject: Don't bother checking for updates if the stream token hasn't advanced for a user --- synapse/handlers/_base.py | 7 +++- synapse/handlers/federation.py | 25 +++++++----- synapse/handlers/presence.py | 4 ++ synapse/handlers/typing.py | 4 +- synapse/notifier.py | 75 ++++++++++++++++++++++++----------- synapse/storage/events.py | 3 ++ synapse/types.py | 19 ++++++++- tests/handlers/test_federation.py | 4 +- tests/handlers/test_room.py | 8 ++-- tests/handlers/test_typing.py | 12 +++--- tests/rest/client/v1/test_presence.py | 15 ++++--- tests/utils.py | 2 +- 12 files changed, 123 insertions(+), 55 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ddc5c21e7d..833ff41377 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -105,7 +105,9 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) - yield self.store.persist_event(event, context=context) + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) federation_handler = self.hs.get_handlers().federation_handler @@ -142,7 +144,8 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7d9906039e..bc0f7b0ee7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -160,7 +160,7 @@ class FederationHandler(BaseHandler): ) try: - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event, state=state, @@ -203,7 +203,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, + extra_users=extra_users ) def log_failure(f): @@ -561,7 +562,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } - yield self._handle_new_event( + _, event_stream_id, max_stream_id = yield self._handle_new_event( origin, new_event, state=state, @@ -571,7 +572,8 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] + new_event, event_stream_id, max_stream_id, + extra_users=[joinee] ) def log_failure(f): @@ -637,7 +639,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, event + ) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -653,7 +657,7 @@ class FederationHandler(BaseHandler): with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=extra_users + event, event_stream_id, max_stream_id, extra_users=extra_users ) def log_failure(f): @@ -727,7 +731,7 @@ class FederationHandler(BaseHandler): context = yield self.state_handler.compute_event_context(event) - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=False, @@ -736,7 +740,8 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): d = self.notifier.on_new_room_event( - event, extra_users=[target_user], + event, event_stream_id, max_stream_id, + extra_users=[target_user], ) def log_failure(f): @@ -914,7 +919,7 @@ class FederationHandler(BaseHandler): ) raise - yield self.store.persist_event( + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, backfilled=backfilled, @@ -922,7 +927,7 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - defer.returnValue(context) + defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 28688d532d..7db4b062d2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -345,6 +345,8 @@ class PresenceHandler(BaseHandler): curr_users = yield rm_handler.get_room_members(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: + statuscache = self._get_or_offline_usercache(local_user) + statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], @@ -820,6 +822,8 @@ class PresenceHandler(BaseHandler): room_ids=[], statuscache=None): with PreserveLoggingContext(): self.notifier.on_new_user_event( + "presence_key", + self._user_cachemap_latest_serial, users_to_push, room_ids, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 64fe51aa3e..a9895292c2 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event(rooms=[room_id]) + self.notifier.on_new_user_event( + "typing_key", self._latest_room_serial, rooms=[room_id] + ) class TypingNotificationEventSource(object): diff --git a/synapse/notifier.py b/synapse/notifier.py index 214a2b28ca..4d10c05038 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -52,12 +52,11 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self): + def notify(self, token): """ Inform whoever is listening about the new events. """ - try: - self.deferred.callback(None) + self.deferred.callback(token) except defer.AlreadyCalledError: pass @@ -73,15 +72,18 @@ class _NotifierUserStream(object): """ def __init__(self, user, rooms, current_token, appservice=None): - self.user = user + self.user = str(user) self.appservice = appservice self.listeners = set() - self.rooms = rooms + self.rooms = set(rooms) self.current_token = current_token - def notify(self, new_token): + def notify(self, stream_key, stream_id): + self.current_token = self.current_token.copy_and_replace( + stream_key, stream_id + ) for listener in self.listeners: - listener.notify(new_token) + listener.notify(self.current_token) self.listeners.clear() def remove(self, notifier): @@ -117,6 +119,7 @@ class Notifier(object): self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() + self.pending_new_room_events = [] self.clock = hs.get_clock() @@ -153,9 +156,21 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) + def notify_pending_new_room_events(self, max_room_stream_id): + pending = sorted(self.pending_new_room_events) + self.pending_new_room_events = [] + for event, room_stream_id, extra_users in pending: + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, new_token, extra_users=[]): + def on_new_room_event(self, event, room_stream_id, max_room_stream_id, + extra_users=[]): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -163,8 +178,18 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() + + self.notify_pending_new_room_events(max_room_stream_id) + + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) + + def _on_new_room_event(self, event, room_stream_id, extra_users=[]): # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -197,33 +222,32 @@ class Notifier(object): for user_stream in user_streams: try: - user_stream.notify(new_token) + user_stream.notify("room_key", "s%d" % (room_stream_id,)) except: logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function - def on_new_user_event(self, new_token, users=[], rooms=[]): + def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. Will wake up all listeners for the given users and rooms. """ - assert isinstance(new_token, StreamToken) yield run_on_reactor() user_streams = set() for user in users: user_stream = self.user_to_user_stream.get(user) - if user_stream: - user_stream.add(user_stream) + if user_stream is not None: + user_streams.add(user_stream) for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) for user_stream in user_streams: try: - user_streams.notify(new_token) + user_stream.notify(stream_key, new_token) except: logger.exception("Failed to notify listener") @@ -236,12 +260,12 @@ class Notifier(object): deferred = defer.Deferred() - user_stream = self.user_to_user_streams.get(user) + user = str(user) + user_stream = self.user_to_user_stream.get(user) if user_stream is None: - appservice = yield self.store.get_app_service_by_user_id( - user.to_string() - ) + appservice = yield self.store.get_app_service_by_user_id(user) current_token = yield self.event_sources.get_current_token() + rooms = yield self.store.get_rooms_for_user(user) user_stream = _NotifierUserStream( user=user, rooms=rooms, @@ -252,8 +276,9 @@ class Notifier(object): else: current_token = user_stream.current_token + listener = [_NotificationListener(deferred)] + if timeout and not current_token.is_after(from_token): - listener = [_NotificationListener(deferred)] user_stream.listeners.add(listener[0]) if current_token.is_after(from_token): @@ -334,7 +359,7 @@ class Notifier(object): self.user_to_user_stream[user_stream.user] = user_stream for room in user_stream.rooms: - s = self.room_to_user_stream.setdefault(room, set()) + s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) if user_stream.appservice: @@ -343,10 +368,12 @@ class Notifier(object): ).add(user_stream) def _user_joined_room(self, user, room_id): + user = str(user) new_user_stream = self.user_to_user_stream.get(user) - room_streams = self.room_to_user_streams.setdefault(room_id, set()) - room_streams.add(new_user_stream) - new_user_stream.rooms.add(room_id) + if new_user_stream is not None: + room_streams = self.room_to_user_streams.setdefault(room_id, set()) + room_streams.add(new_user_stream) + new_user_stream.rooms.add(room_id) def _discard_if_notified(listener_set): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a5a6869079..7d6df5f4c6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -64,6 +64,9 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass + max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_ordering, max_persisted_id)) + @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, diff --git a/synapse/types.py b/synapse/types.py index 0f16867d75..d89a04f7c3 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -70,6 +70,8 @@ class DomainSpecificString( """Return a string encoding the fields of the structure object.""" return "%s%s:%s" % (self.SIGIL, self.localpart, self.domain) + __str__ = to_string + @classmethod def create(cls, localpart, domain,): return cls(localpart=localpart, domain=domain) @@ -107,7 +109,6 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) - return cls(*keys) except: raise SynapseError(400, "Invalid Token") @@ -115,6 +116,22 @@ class StreamToken( def to_string(self): return self._SEPARATOR.join([str(k) for k in self]) + @property + def room_stream_id(self): + # TODO(markjh): Awful hack to work around hacks in the presence tests + if type(self.room_key) is int: + return self.room_key + else: + return int(self.room_key[1:].split("-")[-1]) + + def is_after(self, other_token): + """Does this token contain events that the other doesn't?""" + return ( + (other_token.room_stream_id < self.room_stream_id) + or (int(other_token.presence_key) < int(self.presence_key)) + or (int(other_token.typing_key) < int(self.typing_key)) + ) + def copy_and_replace(self, key, new_value): d = self._asdict() d[key] = new_value diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 08d2404b6c..f3821242bc 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -83,7 +83,7 @@ class FederationTestCase(unittest.TestCase): "hashes": {"sha256":"AcLrgtUIqqwaGoHhrEvYG1YLDIsVPYJdSRGhkp3jJp8"}, }) - self.datastore.persist_event.return_value = defer.succeed(None) + self.datastore.persist_event.return_value = defer.succeed((1,1)) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) @@ -126,5 +126,5 @@ class FederationTestCase(unittest.TestCase): self.auth.check.assert_called_once_with(ANY, auth_events={}) self.notifier.on_new_room_event.assert_called_once_with( - ANY, extra_users=[] + ANY, 1, 1, extra_users=[] ) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 6417f73309..a2d7635995 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -87,6 +87,8 @@ class RoomMemberHandlerTestCase(unittest.TestCase): self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.send_message.return_value = (True, 0) + self.datastore.persist_event.return_value = (1,1) + @defer.inlineCallbacks def test_invite(self): room_id = "!foo:red" @@ -160,7 +162,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context, ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[UserID.from_string(target_user_id)] + event, 1, 1, extra_users=[UserID.from_string(target_user_id)] ) self.assertFalse(self.datastore.get_room.called) self.assertFalse(self.datastore.store_room.called) @@ -226,7 +228,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) join_signal_observer.assert_called_with( @@ -304,7 +306,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): event, context=context ) self.notifier.on_new_room_event.assert_called_once_with( - event, extra_users=[user] + event, 1, 1, extra_users=[user] ) leave_signal_observer.assert_called_with( diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index b318d4944a..7ccbe2ea9c 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -183,7 +183,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -246,7 +246,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 1) @@ -300,7 +300,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) yield put_json.await_calls() @@ -332,7 +332,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 1, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() @@ -352,7 +352,7 @@ class TypingNotificationsTestCase(unittest.TestCase): self.clock.advance_time(11) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 2, rooms=[self.room_id]), ]) self.assertEquals(self.event_source.get_current_key(), 2) @@ -378,7 +378,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) self.on_new_user_event.assert_has_calls([ - call(rooms=[self.room_id]), + call('typing_key', 3, rooms=[self.room_id]), ]) self.on_new_user_event.reset_mock() diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 8e0c5fa630..c0c52796ad 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -27,6 +27,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.rest.client.v1 import presence from synapse.rest.client.v1 import events from synapse.types import UserID +from synapse.util.async import run_on_reactor OFFLINE = PresenceState.OFFLINE @@ -264,6 +265,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): datastore=Mock(spec=[ "set_presence_state", "get_presence_list", + "get_rooms_for_user", ]), clock=Mock(spec=[ "call_later", @@ -298,6 +300,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.get_app_service_by_user_id = Mock( return_value=defer.succeed(None) ) + self.mock_datastore.get_rooms_for_user = ( + lambda u: get_rooms_for_user(UserID.from_string(u)) + ) def get_profile_displayname(user_id): return defer.succeed("Frank") @@ -350,19 +355,19 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE} ) - self.mock_datastore.get_presence_list.return_value = defer.succeed( - [] - ) + self.mock_datastore.get_presence_list.return_value = defer.succeed([]) yield self.presence.set_state(self.u_banana, self.u_banana, state={"presence": ONLINE} ) + yield run_on_reactor() + (code, response) = yield self.mock_resource.trigger("GET", - "/events?from=0_1_0&timeout=0", None) + "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "0_1_0", "end": "0_2_0", "chunk": [ + self.assertEquals({"start": "s0_1_0", "end": "s0_2_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", diff --git a/tests/utils.py b/tests/utils.py index cc038fecf1..a67530bd63 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -355,7 +355,7 @@ class MemoryDataStore(object): return [] def get_room_events_max_id(self): - return 0 # TODO (erikj) + return "s0" # TODO (erikj) def get_send_event_level(self, room_id): return defer.succeed(0) -- cgit 1.4.1 From 3edd2d5c93ccbec46f101e65c6c7874a90bf0018 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 11:25:30 +0100 Subject: Fix v2 sync, update the last_notified_ms only if there was an active listener --- synapse/handlers/sync.py | 2 +- synapse/notifier.py | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 35a62fda47..bd8c603681 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -92,7 +92,7 @@ class SyncHandler(BaseHandler): result = yield self.current_sync_for_user(sync_config, since_token) defer.returnValue(result) else: - def current_sync_callback(): + def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) rm_handler = self.hs.get_handlers().room_member_handler diff --git a/synapse/notifier.py b/synapse/notifier.py index 1f7f0a143f..2de7dca8a5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -81,14 +81,15 @@ class _NotifierUserStream(object): self.last_notified_ms = time_now_ms def notify(self, stream_key, stream_id, time_now_ms): - self.last_notified_ms = time_now_ms self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) - listeners = self.listeners - self.listeners = set() - for listener in listeners: - listener.notify(self.current_token) + if self.listeners: + self.last_notified_ms = time_now_ms + listeners = self.listeners + self.listeners = set() + for listener in listeners: + listener.notify(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier -- cgit 1.4.1 From 0c894e1ebdb204cf0a0dce16ed819b9e5d9f3fc0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 13:11:28 +0100 Subject: Throw error when creating room if alias contains whitespace #SYN-335 --- synapse/handlers/room.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index dac683616a..401cc677d1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -26,6 +26,7 @@ from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event import logging +import string logger = logging.getLogger(__name__) @@ -50,6 +51,10 @@ class RoomCreationHandler(BaseHandler): self.ratelimit(user_id) if "room_alias_name" in config: + for wchar in string.whitespace: + if wchar in config["room_alias_name"]: + raise SynapseError(400, "Invalid characters in room alias") + room_alias = RoomAlias.create( config["room_alias_name"], self.hs.hostname, -- cgit 1.4.1 From 92e1c8983dcbc1b9e75ff71b06928fd51627f61a Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 13:21:55 +0100 Subject: Disallow whitespace in aliases here too --- synapse/handlers/directory.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f76febee8f..e41a688836 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes from synapse.types import RoomAlias import logging +import string logger = logging.getLogger(__name__) @@ -40,6 +41,10 @@ class DirectoryHandler(BaseHandler): def _create_association(self, room_alias, room_id, servers=None): # general association creation for both human users and app services + for wchar in string.whitespace: + if wchar in room_alias.localpart: + raise SynapseError(400, "Invalid characters in room alias") + if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") # TODO(erikj): Change this. -- cgit 1.4.1 From 67800f7626d07202b19da9090432aab4b0bc1aef Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 14:19:10 +0100 Subject: Treat setting your display name to the empty string as removing it (SYN-186). --- synapse/handlers/profile.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 71ff78ab23..799faffe53 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -88,6 +88,9 @@ class ProfileHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") + if new_displayname == '': + new_displayname = None + yield self.store.set_profile_displayname( target_user.localpart, new_displayname ) -- cgit 1.4.1 From c5d1b4986bbb5983054b64fdc3dd3c32e80e3c17 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 14:59:31 +0100 Subject: Remove unused arguments and doc PresenceHandler.push_update_to_clients --- synapse/handlers/presence.py | 20 ++++++++----------- tests/handlers/test_presence.py | 22 +++++---------------- tests/handlers/test_presencelike.py | 39 ++++++++----------------------------- 3 files changed, 21 insertions(+), 60 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1edab05492..0c246958ac 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -496,9 +496,7 @@ class PresenceHandler(BaseHandler): # We want to tell the person that just came online # presence state of people they are interested in? self.push_update_to_clients( - observed_user=target_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(target_user), ) deferreds = [] @@ -712,10 +710,7 @@ class PresenceHandler(BaseHandler): continue self.push_update_to_clients( - observed_user=user, - users_to_push=observers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=observers, room_ids=room_ids ) user_id = user.to_string() @@ -779,10 +774,7 @@ class PresenceHandler(BaseHandler): localusers = set(localusers) self.push_update_to_clients( - observed_user=observed_user, - users_to_push=localusers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=localusers, room_ids=room_ids ) remote_domains = set(remote_domains) @@ -807,8 +799,12 @@ class PresenceHandler(BaseHandler): defer.returnValue((localusers, remote_domains)) - def push_update_to_clients(self, observed_user, users_to_push=[], - room_ids=[], statuscache=None): + def push_update_to_clients(self, users_to_push=[], room_ids=[]): + """Notify clients of a new presence event. + Args: + users_to_push(list): List of users to notify. + room_ids(list): List of room_ids to notify. + """ with PreserveLoggingContext(): self.notifier.on_new_user_event( users_to_push, diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 70147b017e..ee773797e7 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -1097,12 +1097,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # apple should see both banana and clementine currently offline self.mock_update_client.assert_has_calls([ - call(users_to_push=[self.u_apple], - observed_user=self.u_banana, - statuscache=ANY), - call(users_to_push=[self.u_apple], - observed_user=self.u_clementine, - statuscache=ANY), + call(users_to_push=[self.u_apple]), + call(users_to_push=[self.u_apple]), ], any_order=True) # Gut-wrenching tests @@ -1121,13 +1117,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # apple and banana should now both see each other online self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple]), - observed_user=self.u_banana, - room_ids=[], - statuscache=ANY), - call(users_to_push=[self.u_banana], - observed_user=self.u_apple, - statuscache=ANY), + call(users_to_push=set([self.u_apple]), room_ids=[]), + call(users_to_push=[self.u_banana]), ], any_order=True) self.assertTrue("apple" in self.handler._local_pushmap) @@ -1143,10 +1134,7 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # banana should now be told apple is offline self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_banana, self.u_apple]), - observed_user=self.u_apple, - room_ids=[], - statuscache=ANY), + call(users_to_push=set([self.u_banana, self.u_apple]), room_ids=[]), ], any_order=True) self.assertFalse("banana" in self.handler._local_pushmap) diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index 977e832da7..1f2e66ac11 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -209,20 +209,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): ], presence) self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), - room_ids=[], - observed_user=self.u_apple, - statuscache=ANY), # self-reflection + call( + users_to_push={self.u_apple, self.u_banana, self.u_clementine}, + room_ids=[] + ), ], any_order=True) - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({ - "presence": ONLINE, - "last_active": 1000000, # MockClock - "displayname": "Frank", - "avatar_url": "http://foo", - }, statuscache.state) - self.mock_update_client.reset_mock() self.datastore.set_profile_displayname.return_value = defer.succeed( @@ -232,21 +224,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.u_apple, "I am an Apple") self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), + call( + users_to_push={self.u_apple, self.u_banana, self.u_clementine}, room_ids=[], - observed_user=self.u_apple, - statuscache=ANY), # self-reflection + ), ], any_order=True) - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({ - "presence": ONLINE, - "last_active": 1000000, # MockClock - "displayname": "I am an Apple", - "avatar_url": "http://foo", - }, statuscache.state) - - @defer.inlineCallbacks def test_push_remote(self): self.presence_list = [ @@ -314,13 +297,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.mock_update_client.assert_called_with( users_to_push=set([self.u_apple]), room_ids=[], - observed_user=self.u_potato, - statuscache=ANY) - - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({"presence": ONLINE, - "displayname": "Frank", - "avatar_url": "http://foo"}, statuscache.state) + ) state = yield self.handlers.presence_handler.get_state(self.u_potato, self.u_apple) -- cgit 1.4.1 From 47ec693e29ce61885b605191b97a69c1cbf7ab09 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 15:29:58 +0100 Subject: More doc-strings --- synapse/handlers/presence.py | 241 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 202 insertions(+), 39 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0c246958ac..23302242bd 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -317,6 +317,13 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def user_joined_room(self, user, room_id): + """Called via the distributor whenever a user joins a room. + Notifies the new member of the presence of the current members. + Notifies the current members of the room of the new member's presence. + Args: + user(UserID): The user who joined the room. + room_id(str): The room id the user joined. + """ if self.hs.is_mine(user): statuscache = self._get_or_make_usercache(user) @@ -344,6 +351,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def send_invite(self, observer_user, observed_user): + """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -378,6 +386,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def invite_presence(self, observed_user, observer_user): + """Handles a m.presence_invite EDU. A remote or local user has + requested presence updates for a local user. If the invite is accepted + then allow the local or remote user to see the presence of the local + user. + + Args: + observed_user(UserID): The local user whose presence is requested. + observer_user(UserID): The remote or local user requesting presence. + + """ accept = yield self._should_accept_invite(observed_user, observer_user) if accept: @@ -404,6 +422,14 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def accept_presence(self, observed_user, observer_user): + """Handles a m.presence_accept EDU. Mark a presence invite from a + local or remote user as accepted in a local user's presence list. + Starts polling for presence updates from the local or remote user. + + Args: + observed_user(UserID): The user to update in the presence list. + observer_user(UserID): The owner of the presence list to update. + """ yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) @@ -414,6 +440,15 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): + """Handle a m.presence_deny EDU. Removes a local or remote user from a + local user's presence list. + + Args: + observed_user: The local or remote user to remove from the list. + observer_user: The local owner of the presence list. + Returns: + A Deferred. + """ yield self.store.del_presence_list( observer_user.localpart, observed_user.to_string() ) @@ -422,6 +457,15 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def drop(self, observed_user, observer_user): + """Remove a local or remote user from a local user's presence list and + unsubscribe the local user from updates that user. + + Args: + observed_user: The local or remote user to remove from the list. + observer_user: The local owner of the presence list. + Returns: + A Deferred. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -435,6 +479,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): + """Get the presence list for a local user. The retured list includes + the current presence state for each user listed. + + Args: + observer_user(UserID): The local user whose presence list to fetch. + accepted(bool or None): If not none then only include users who + have or have not accepted the presence invite request. + Returns: + A Deferred list of presence state events. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -456,6 +510,23 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def start_polling_presence(self, user, target_user=None, state=None): + """Subscribe a local user to presence updates from a local or remote + user. If no target_user is supplied then subscribe to all users stored + in the presence list for the local user. + + Additonally this pushes the current presence state of this user to all + target_users. That state can be provided directly or will be read from + the stored state for the local user. + + Also this attempts to notify the local user of the current state of + any local target users. + + Args: + user(UserID): The local user that whishes for presence updates. + target_user(UserID): The local or remote user whose updates are + wanted. + state(dict): Optional presence state for the local user. + """ logger.debug("Start polling for presence from %s", user) if target_user: @@ -513,6 +584,11 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) def _start_polling_local(self, user, target_user): + """Subscribe a local user to presence updates for a local user + Args: + user(UserId): The local user that wishes for updates. + target_user(UserId): The local users whose updates are wanted. + """ target_localpart = target_user.localpart if target_localpart not in self._local_pushmap: @@ -521,6 +597,16 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) def _start_polling_remote(self, user, domain, remoteusers): + """Subscribe a local user to presence updates for remote users on a + given domain. + Args: + user(UserID): The local user that wishes for updates. + domain(str): The remote server the local user wants updates from. + remoteusers(UserID): The remote users that local user wants to be + told about. + Returns: + A Deferred. + """ to_poll = set() for u in remoteusers: @@ -541,6 +627,16 @@ class PresenceHandler(BaseHandler): @log_function def stop_polling_presence(self, user, target_user=None): + """Unsubscribe a local user from presence updates from a local or + remote user. If no target user is supplied then unsubscribe the user + from all presence updates that the user had subscribed to. + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID or None): The user whose updates are no longer + wanted. + Returns: + A Deferred. + """ logger.debug("Stop polling for presence from %s", user) if not target_user or self.hs.is_mine(target_user): @@ -569,6 +665,12 @@ class PresenceHandler(BaseHandler): return defer.DeferredList(deferreds, consumeErrors=True) def _stop_polling_local(self, user, target_user): + """Unsubscribe a local user from presence updates from a local user on + this server. + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID): The user whose updates are no longer wanted. + """ for localpart in self._local_pushmap.keys(): if target_user and localpart != target_user.localpart: continue @@ -581,6 +683,16 @@ class PresenceHandler(BaseHandler): @log_function def _stop_polling_remote(self, user, domain, remoteusers): + """Unsubscribe a local user from presence updates from remote users on + a given domain. + Args: + user(UserID): The local user that no longer wishes for updates. + domain(str): The remote server to unsubscribe from. + remoteusers([UserID]): The users on that remote server that the + local user no longer wishes to be updated about. + Returns: + A Deferred. + """ to_unpoll = set() for u in remoteusers: @@ -602,6 +714,18 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def push_presence(self, user, statuscache): + """ + Notify local and remote users of a change in presence of a local user. + Pushes the update to local clients and remote domains that are directly + subscribed to the presence of the local user. + Also pushes that update to any local user or remote domain that shares + a room with the local user. + Args: + user(UserID): The local user whose presence was updated. + statuscache(UserPresenceCache): Cache of the user's presence state + Returns: + A Deferred. + """ assert(self.hs.is_mine(user)) logger.debug("Pushing presence update from %s", user) @@ -628,45 +752,23 @@ class PresenceHandler(BaseHandler): ) yield self.distributor.fire("user_presence_changed", user, statuscache) - @defer.inlineCallbacks - def _push_presence_remote(self, user, destination, state=None): - if state is None: - state = yield self.store.get_presence_state(user.localpart) - del state["mtime"] - state["presence"] = state.pop("state") - - if user in self._user_cachemap: - state["last_active"] = ( - self._user_cachemap[user].get_state()["last_active"] - ) - - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) - - if "last_active" in state: - state = dict(state) - state["last_active_ago"] = int( - self.clock.time_msec() - state.pop("last_active") - ) - - user_state = { - "user_id": user.to_string(), - } - user_state.update(**state) - - yield self.federation.send_edu( - destination=destination, - edu_type="m.presence", - content={ - "push": [ - user_state, - ], - } - ) - @defer.inlineCallbacks def incoming_presence(self, origin, content): + """Handle an incoming m.presence EDU. + For each presence update in the "push" list update our local cache and + notify the appropriate local clients. Only clients that share a room + or are directly subscribed to the presence for a user should be + notified of the update. + For each subscription request in the "poll" list start pushing presence + updates to the remote server. + For unsubscribe request in the "unpoll" list stop pushing presence + updates to the remote server. + Args: + orgin(str): The source of this m.presence EDU. + content(dict): The content of this m.presence EDU. + Returns: + A Deferred. + """ deferreds = [] for push in content.get("push", []): @@ -765,6 +867,22 @@ class PresenceHandler(BaseHandler): def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], remote_domains=[]): + """Notify local clients and remote servers of a change in the presence + of a user. + Args: + observed_user(UserID): The user to push the presence state for. + statuscache(UserPresenceCache): The cache for the presence state to + push. + users_to_push([UserID]): A list of local and remote users to + notify. + room_ids([str]): Notify the local and remote occupants of these + rooms. + remote_domains([str]): A list of remote servers to notify in + addition to those implied by the users_to_push and the + room_ids. + Returns: + A Deferred. + """ localusers, remoteusers = partitionbool( users_to_push, @@ -802,8 +920,8 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, users_to_push=[], room_ids=[]): """Notify clients of a new presence event. Args: - users_to_push(list): List of users to notify. - room_ids(list): List of room_ids to notify. + users_to_push([UserID]): List of users to notify. + room_ids([str]): List of room_ids to notify. """ with PreserveLoggingContext(): self.notifier.on_new_user_event( @@ -811,6 +929,51 @@ class PresenceHandler(BaseHandler): room_ids, ) + @defer.inlineCallbacks + def _push_presence_remote(self, user, destination, state=None): + """Push a user's presence to a remote server. If a presence state event + that event is sent. Otherwise a new state event is constructed from the + stored presence state. + The last_active is replaced with last_active_ago in case the wallclock + time on the remote server is different to the time on this server. + Sends an EDU to the remote server with the current presence state. + Args: + user(UserID): The user to push the presence state for. + destination(str): The remote server to send state to. + state(dict): The state to push, or None to use the current stored + state. + Returns: + A Deferred. + """ + if state is None: + state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state.pop("state") + + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) + + yield self.distributor.fire( + "collect_presencelike_data", user, state + ) + + if "last_active" in state: + state = dict(state) + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) + + user_state = {"user_id": user.to_string(), } + user_state.update(state) + + yield self.federation.send_edu( + destination=destination, + edu_type="m.presence", + content={"push": [user_state, ], } + ) + class PresenceEventSource(object): def __init__(self, hs): -- cgit 1.4.1 From 0a4330cd5d5e2230fb9e1ff4e24952829d03ef76 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 17:48:12 +0100 Subject: Add some missed argument types, cleanup the whitespace a bit --- synapse/handlers/presence.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 23302242bd..9638faf4b9 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -394,7 +394,6 @@ class PresenceHandler(BaseHandler): Args: observed_user(UserID): The local user whose presence is requested. observer_user(UserID): The remote or local user requesting presence. - """ accept = yield self._should_accept_invite(observed_user, observer_user) @@ -444,8 +443,9 @@ class PresenceHandler(BaseHandler): local user's presence list. Args: - observed_user: The local or remote user to remove from the list. - observer_user: The local owner of the presence list. + observed_user(UserID): The local or remote user to remove from the + list. + observer_user(UserID): The local owner of the presence list. Returns: A Deferred. """ @@ -461,8 +461,9 @@ class PresenceHandler(BaseHandler): unsubscribe the local user from updates that user. Args: - observed_user: The local or remote user to remove from the list. - observer_user: The local owner of the presence list. + observed_user(UserId): The local or remote user to remove from the + list. + observer_user(UserId): The local owner of the presence list. Returns: A Deferred. """ @@ -585,6 +586,7 @@ class PresenceHandler(BaseHandler): def _start_polling_local(self, user, target_user): """Subscribe a local user to presence updates for a local user + Args: user(UserId): The local user that wishes for updates. target_user(UserId): The local users whose updates are wanted. @@ -598,7 +600,8 @@ class PresenceHandler(BaseHandler): def _start_polling_remote(self, user, domain, remoteusers): """Subscribe a local user to presence updates for remote users on a - given domain. + given remote domain. + Args: user(UserID): The local user that wishes for updates. domain(str): The remote server the local user wants updates from. @@ -630,6 +633,7 @@ class PresenceHandler(BaseHandler): """Unsubscribe a local user from presence updates from a local or remote user. If no target user is supplied then unsubscribe the user from all presence updates that the user had subscribed to. + Args: user(UserID): The local user that no longer wishes for updates. target_user(UserID or None): The user whose updates are no longer @@ -667,6 +671,7 @@ class PresenceHandler(BaseHandler): def _stop_polling_local(self, user, target_user): """Unsubscribe a local user from presence updates from a local user on this server. + Args: user(UserID): The local user that no longer wishes for updates. target_user(UserID): The user whose updates are no longer wanted. @@ -685,6 +690,7 @@ class PresenceHandler(BaseHandler): def _stop_polling_remote(self, user, domain, remoteusers): """Unsubscribe a local user from presence updates from remote users on a given domain. + Args: user(UserID): The local user that no longer wishes for updates. domain(str): The remote server to unsubscribe from. @@ -720,6 +726,7 @@ class PresenceHandler(BaseHandler): subscribed to the presence of the local user. Also pushes that update to any local user or remote domain that shares a room with the local user. + Args: user(UserID): The local user whose presence was updated. statuscache(UserPresenceCache): Cache of the user's presence state @@ -763,6 +770,7 @@ class PresenceHandler(BaseHandler): updates to the remote server. For unsubscribe request in the "unpoll" list stop pushing presence updates to the remote server. + Args: orgin(str): The source of this m.presence EDU. content(dict): The content of this m.presence EDU. @@ -869,6 +877,7 @@ class PresenceHandler(BaseHandler): remote_domains=[]): """Notify local clients and remote servers of a change in the presence of a user. + Args: observed_user(UserID): The user to push the presence state for. statuscache(UserPresenceCache): The cache for the presence state to @@ -919,6 +928,7 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, users_to_push=[], room_ids=[]): """Notify clients of a new presence event. + Args: users_to_push([UserID]): List of users to notify. room_ids([str]): List of room_ids to notify. @@ -937,6 +947,7 @@ class PresenceHandler(BaseHandler): The last_active is replaced with last_active_ago in case the wallclock time on the remote server is different to the time on this server. Sends an EDU to the remote server with the current presence state. + Args: user(UserID): The user to push the presence state for. destination(str): The remote server to send state to. -- cgit 1.4.1 From 415b158ce229d4f740bf577aca5cc3d5f73e1bf6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 15 May 2015 11:09:47 +0100 Subject: More whitespace --- synapse/handlers/presence.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9638faf4b9..a01020e202 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -320,6 +320,7 @@ class PresenceHandler(BaseHandler): """Called via the distributor whenever a user joins a room. Notifies the new member of the presence of the current members. Notifies the current members of the room of the new member's presence. + Args: user(UserID): The user who joined the room. room_id(str): The room id the user joined. -- cgit 1.4.1 From 755def8083ec887feabcb45b3bc111db4aef20ab Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 13:46:47 +0100 Subject: Add more doc string, reduce C+P boilerplate for getting room list --- synapse/handlers/presence.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a01020e202..ce9dd64394 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -297,7 +297,26 @@ class PresenceHandler(BaseHandler): self.changed_presencelike_data(user, {"last_active": now}) + def get_joined_rooms_for_user(self, user): + """Get the list of rooms a user is joined to. + + Args: + user(UserID): The user. + Returns: + A Deferred of a list of room id strings. + """ + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_joined_rooms_for_user(user) + def changed_presencelike_data(self, user, state): + """Updates the presence state of a local user. + + Args: + user(UserID): The user being updated. + state(dict): The new presence state for the user. + Returns: + A Deferred + """ statuscache = self._get_or_make_usercache(user) self._user_cachemap_latest_serial += 1 @@ -544,8 +563,7 @@ class PresenceHandler(BaseHandler): # Also include people in all my rooms - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if state is None: state = yield self.store.get_presence_state(user.localpart) @@ -745,8 +763,7 @@ class PresenceHandler(BaseHandler): # and also user is informed of server-forced pushes localusers.add(user) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if not localusers and not room_ids: defer.returnValue(None) @@ -791,8 +808,7 @@ class PresenceHandler(BaseHandler): " | %d interested local observers %r", len(observers), observers ) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if room_ids: logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids) -- cgit 1.4.1 From e1150cac4bceab88097ea2421323f3b3852028e3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 15:46:37 +0100 Subject: Move updating the serial and state of the presence cache into a single function --- synapse/handlers/presence.py | 60 +++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 17 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0c3290b307..d129d4ca8a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -308,6 +308,11 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler return rm_handler.get_joined_rooms_for_user(user) + def get_joined_users_for_room_id(self, room_id): + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_room_members(room_id) + + @defer.inlineCallbacks def changed_presencelike_data(self, user, state): """Updates the presence state of a local user. @@ -317,12 +322,9 @@ class PresenceHandler(BaseHandler): Returns: A Deferred """ - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) - - return self.push_presence(user, statuscache=statuscache) + statuscache = yield self.update_presence_cache(user, state) + yield self.push_presence(user, statuscache=statuscache) @log_function def started_user_eventstream(self, user): @@ -345,13 +347,12 @@ class PresenceHandler(BaseHandler): room_id(str): The room id the user joined. """ if self.hs.is_mine(user): - statuscache = self._get_or_make_usercache(user) - # No actual update but we need to bump the serial anyway for the # event source self._user_cachemap_latest_serial += 1 - statuscache.update({}, serial=self._user_cachemap_latest_serial) - + statuscache = yield self.update_presence_cache( + user, room_ids=[room_id] + ) self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], @@ -359,16 +360,17 @@ class PresenceHandler(BaseHandler): ) # We also want to tell them about current presence of people. - rm_handler = self.homeserver.get_handlers().room_member_handler - curr_users = yield rm_handler.get_room_members(room_id) + curr_users = yield self.get_joined_users_for_room_id(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: - statuscache = self._get_or_offline_usercache(local_user) - statuscache.update({}, serial=self._user_cachemap_latest_serial) + statuscache = yield self.update_presence_cache( + local_user, room_ids=[room_id], add_to_cache=False + ) + self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(local_user), + statuscache=statuscache, ) @defer.inlineCallbacks @@ -829,10 +831,8 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("last_active_ago") ) - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) + yield self.update_presence_cache(user, state, room_ids=room_ids) if not observers and not room_ids: logger.debug(" | no interested observers or room IDs") @@ -890,6 +890,32 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) + @defer.inlineCallbacks + def update_presence_cache(self, user, state={}, room_ids=None, + add_to_cache=True): + """Update the presence cache for a user with a new state and bump the + serial to the latest value. + + Args: + user(UserID): The user being updated + state(dict): The presence state being updated + room_ids(None or list of str): A list of room_ids to update. If + room_ids is None then fetch the list of room_ids the user is + joined to. + add_to_cache: Whether to add an entry to the presence cache if the + user isn't already in the cache. + Returns: + A Deferred UserPresenceCache for the user being updated. + """ + if room_ids is None: + room_ids = yield self.get_joined_rooms_for_user(user) + if add_to_cache: + statuscache = self._get_or_make_usercache(user) + else: + statuscache = self._get_or_offline_usercache(user) + statuscache.update(state, serial=self._user_cachemap_latest_serial) + defer.returnValue(statuscache) + @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], -- cgit 1.4.1 From 591c4bf223a4a8698f51ba258984e769f593e32b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 16:21:51 +0100 Subject: Cache the most recent serial for each room --- synapse/handlers/presence.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d129d4ca8a..aa1d73f2f9 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler): self._user_cachemap = {} self._user_cachemap_latest_serial = 0 + # map room_ids to the latest presence serial for a member of that + # room + self._room_serials = {} + metrics.register_callback( "userCachemap:size", lambda: len(self._user_cachemap), @@ -909,6 +913,9 @@ class PresenceHandler(BaseHandler): """ if room_ids is None: room_ids = yield self.get_joined_rooms_for_user(user) + + for room_id in room_ids: + self._room_serials[room_id] = self._user_cachemap_latest_serial if add_to_cache: statuscache = self._get_or_make_usercache(user) else: @@ -1069,8 +1076,6 @@ class PresenceEventSource(object): def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) - observer_user = user - presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap @@ -1079,17 +1084,28 @@ class PresenceEventSource(object): clock = self.clock latest_serial = 0 + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is None: + presence_list = () + user_ids_to_check = set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] > from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): cached = cachemap[observed_user] if cached.serial <= from_key or cached.serial > max_serial: continue - if not (yield self.is_visible(observer_user, observed_user)): - continue - latest_serial = max(cached.serial, latest_serial) updates.append(cached.make_event(user=observed_user, clock=clock)) -- cgit 1.4.1 From ef910a0358d1a1bd608576cfc07edc0a4f2649aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 17:17:04 +0100 Subject: Do work in parellel when joining a room --- synapse/handlers/federation.py | 69 ++++++++++++++++++++++++++++++------------ 1 file changed, 50 insertions(+), 19 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 880cbd77e7..78f2bfc212 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -516,30 +516,59 @@ class FederationHandler(BaseHandler): # FIXME pass - for e in auth_chain: - e.internal_metadata.outlier = True + auth_ids_to_deferred = {} + + def process_auth_ev(ev): + auth_ids = [e_id for e_id, _ in ev.auth_events] + + prev_ds = [ + auth_ids_to_deferred[i] + for i in auth_ids + if i in auth_ids_to_deferred + ] + + d = defer.Deferred() + + auth_ids_to_deferred[ev.event_id] = d + + @defer.inlineCallbacks + def f(*_): + ev.internal_metadata.outlier = True + + try: + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + except: + logger.exception( + "Failed to handle auth event %s", + ev.event_id, + ) + + d.callback(None) + + if prev_ds: + dx = defer.DeferredList(prev_ds) + dx.addBoth(f) + else: + f() + for e in auth_chain: if e.event_id == event.event_id: - continue + return + process_auth_ev(e) - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - e.event_id, - ) + yield defer.DeferredList(auth_ids_to_deferred.values()) - for e in state: + @defer.inlineCallbacks + def handle_state(e): if e.event_id == event.event_id: - continue + return e.internal_metadata.outlier = True try: @@ -557,6 +586,8 @@ class FederationHandler(BaseHandler): e.event_id, ) + yield defer.DeferredList([handle_state(e) for e in state]) + auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { (e.type, e.state_key): e for e in auth_chain -- cgit 1.4.1 From e4c65b338d44fadc058cbd8e4cd79ae1601d3526 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 18:21:06 +0100 Subject: Speed up the get_pagination_rows as well --- synapse/handlers/presence.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aa1d73f2f9..6537a37385 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1154,14 +1154,28 @@ class PresenceEventSource(object): presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap + user_ids_to_check = {user} + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is None: + presence_list = () + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] >= from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): if not (to_key < cachemap[observed_user].serial <= from_key): continue - if (yield self.is_visible(observer_user, observed_user)): - updates.append((observed_user, cachemap[observed_user])) + updates.append((observed_user, cachemap[observed_user])) # TODO(paul): limit -- cgit 1.4.1 From 722312991694846045fae31ec6e0cbbbc59c6a33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 14:15:05 +0100 Subject: Don't apply new room join hack if depth > 5 --- synapse/handlers/federation.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 03f8444ad5..d85b1cf5de 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -924,9 +924,12 @@ class FederationHandler(BaseHandler): # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: - if len(event.prev_events) == 1: - c = yield self.store.get_event(event.prev_events[0][0]) - if c.type == EventTypes.Create: + if len(event.prev_events) == 1 and event.depth < 5: + c = yield self.store.get_event( + event.prev_events[0][0], + allow_none=True, + ) + if c and c.type == EventTypes.Create: auth_events[(c.type, c.state_key)] = c try: -- cgit 1.4.1 From 20814fabdd001ee6a04efc5277d71e80fdbf5a14 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 11:59:02 +0100 Subject: Actually fetch state for new backwards extremeties when backfilling. --- synapse/federation/federation_client.py | 6 +- synapse/handlers/federation.py | 164 ++++++++++++++++++++------------ 2 files changed, 108 insertions(+), 62 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4b3bf97835..6febc8618c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -168,7 +168,11 @@ class FederationClient(FederationBase): for i, pdu in enumerate(pdus): pdus[i] = yield self._check_sigs_and_hash(pdu) - # FIXME: We should handle signature failures more gracefully. + # FIXME: We should handle signature failures more gracefully. + pdus[:] = yield defer.gatherResults( + [self._check_sigs_and_hash(pdu) for pdu in pdus], + consumeErrors=True, + ).addErrback(unwrapFirstError) defer.returnValue(pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d85b1cf5de..46ce3699d7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -230,27 +230,65 @@ class FederationHandler(BaseHandler): if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) - pdus = yield self.replication_layer.backfill( + events = yield self.replication_layer.backfill( dest, room_id, - limit, + limit=limit, extremities=extremities, ) - events = [] + event_map = {e.event_id: e for e in events} - for pdu in pdus: - event = pdu + event_ids = set(e.event_id for e in events) - # FIXME (erikj): Not sure this actually works :/ - context = yield self.state_handler.compute_event_context(event) + edges = [ + ev.event_id + for ev in events + if set(e_id for e_id, _ in ev.prev_events) - event_ids + ] - events.append((event, context)) + # For each edge get the current state. - yield self.store.persist_event( - event, - context=context, - backfilled=True + auth_events = {} + events_to_state = {} + for e_id in edges: + state, auth = yield self.replication_layer.get_state_for_room( + destination=dest, + room_id=room_id, + event_id=e_id + ) + auth_events.update({a.event_id: a for a in auth}) + events_to_state[e_id] = state + + yield defer.gatherResults( + [ + self._handle_new_event(dest, a) + for a in auth_events.values() + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + yield defer.gatherResults( + [ + self._handle_new_event( + dest, event_map[e_id], + state=events_to_state[e_id], + backfilled=True, + ) + for e_id in events_to_state + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + + events.sort(key=lambda e: e.depth) + + for event in events: + if event in events_to_state: + continue + + yield self._handle_new_event( + dest, event, + backfilled=True, ) defer.returnValue(events) @@ -347,7 +385,7 @@ class FederationHandler(BaseHandler): logger.info(e.message) continue except Exception as e: - logger.warn( + logger.exception( "Failed to backfill from %s because %s", dom, e, ) @@ -517,54 +555,9 @@ class FederationHandler(BaseHandler): # FIXME pass - auth_ids_to_deferred = {} - - def process_auth_ev(ev): - auth_ids = [e_id for e_id, _ in ev.auth_events] - - prev_ds = [ - auth_ids_to_deferred[i] - for i in auth_ids - if i in auth_ids_to_deferred - ] - - d = defer.Deferred() - - auth_ids_to_deferred[ev.event_id] = d - - @defer.inlineCallbacks - def f(*_): - ev.internal_metadata.outlier = True - - try: - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - - yield self._handle_new_event( - origin, ev, auth_events=auth - ) - except: - logger.exception( - "Failed to handle auth event %s", - ev.event_id, - ) - - d.callback(None) - - if prev_ds: - dx = defer.DeferredList(prev_ds) - dx.addBoth(f) - else: - f() - - for e in auth_chain: - if e.event_id == event.event_id: - return - process_auth_ev(e) - - yield defer.DeferredList(auth_ids_to_deferred.values()) + yield self._handle_auth_events( + origin, [e for e in auth_chain if e.event_id != event.event_id] + ) @defer.inlineCallbacks def handle_state(e): @@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler): }, "missing": [e.event_id for e in missing_locals], }) + + @defer.inlineCallbacks + def _handle_auth_events(self, origin, auth_events): + auth_ids_to_deferred = {} + + def process_auth_ev(ev): + auth_ids = [e_id for e_id, _ in ev.auth_events] + + prev_ds = [ + auth_ids_to_deferred[i] + for i in auth_ids + if i in auth_ids_to_deferred + ] + + d = defer.Deferred() + + auth_ids_to_deferred[ev.event_id] = d + + @defer.inlineCallbacks + def f(*_): + ev.internal_metadata.outlier = True + + try: + auth = { + (e.type, e.state_key): e for e in auth_events + if e.event_id in auth_ids + } + + yield self._handle_new_event( + origin, ev, auth_events=auth + ) + except: + logger.exception( + "Failed to handle auth event %s", + ev.event_id, + ) + + d.callback(None) + + if prev_ds: + dx = defer.DeferredList(prev_ds) + dx.addBoth(f) + else: + f() + + for e in auth_events: + process_auth_ev(e) + + yield defer.DeferredList(auth_ids_to_deferred.values()) -- cgit 1.4.1 From e01b825cc929e16b6a60be0688bbe6d8d9b3866e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 May 2015 13:21:59 +0100 Subject: Clean up the presence_list checking logic a bit --- synapse/handlers/presence.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6537a37385..226d6a0f51 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1084,14 +1084,14 @@ class PresenceEventSource(object): clock = self.clock latest_serial = 0 + user_ids_to_check = {user} presence_list = yield presence.store.get_presence_list( user.localpart, accepted=True ) - if presence_list is None: - presence_list = () - user_ids_to_check = set( - UserID.from_string(p["observed_user_id"]) for p in presence_list - ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) room_ids = yield presence.get_joined_rooms_for_user(user) for room_id in set(room_ids) & set(presence._room_serials): if presence._room_serials[room_id] > from_key: @@ -1142,8 +1142,6 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): # TODO (erikj): Does this make sense? Ordering? - observer_user = user - from_key = int(pagination_config.from_key) if pagination_config.to_key: @@ -1158,11 +1156,10 @@ class PresenceEventSource(object): presence_list = yield presence.store.get_presence_list( user.localpart, accepted=True ) - if presence_list is None: - presence_list = () - user_ids_to_check |= set( - UserID.from_string(p["observed_user_id"]) for p in presence_list - ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) room_ids = yield presence.get_joined_rooms_for_user(user) for room_id in set(room_ids) & set(presence._room_serials): if presence._room_serials[room_id] >= from_key: -- cgit 1.4.1 From 7ae8afb7ef5a0fb3162339737682e9248980600d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 May 2015 14:48:11 +0100 Subject: Removed unused 'is_visible' method --- synapse/handlers/presence.py | 27 --------------------------- 1 file changed, 27 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 226d6a0f51..6c48b1d20e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1045,32 +1045,6 @@ class PresenceEventSource(object): self.hs = hs self.clock = hs.get_clock() - @defer.inlineCallbacks - def is_visible(self, observer_user, observed_user): - if observer_user == observed_user: - defer.returnValue(True) - - presence = self.hs.get_handlers().presence_handler - - if (yield presence.store.user_rooms_intersect( - [u.to_string() for u in observer_user, observed_user])): - defer.returnValue(True) - - if self.hs.is_mine(observed_user): - pushmap = presence._local_pushmap - - defer.returnValue( - observed_user.localpart in pushmap and - observer_user in pushmap[observed_user.localpart] - ) - else: - recvmap = presence._remote_recvmap - - defer.returnValue( - observed_user in recvmap and - observer_user in recvmap[observed_user] - ) - @defer.inlineCallbacks @log_function def get_new_events_for_user(self, user, from_key, limit): @@ -1099,7 +1073,6 @@ class PresenceEventSource(object): user_ids_to_check |= set(joined) updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. for observed_user in user_ids_to_check & set(cachemap): cached = cachemap[observed_user] -- cgit 1.4.1 From f43544eecc362943f9d64a004d40984739a68d98 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 11:01:28 +0100 Subject: Make the appservice use 'users_in_room' rather than get_room_members since it is cached --- synapse/appservice/__init__.py | 6 +++--- synapse/handlers/appservice.py | 5 +---- tests/appservice/test_appservice.py | 15 +++------------ 3 files changed, 7 insertions(+), 19 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 63a18b802b..e3ca45de83 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -148,8 +148,8 @@ class ApplicationService(object): and self.is_interested_in_user(event.state_key)): return True # check joined member events - for member in member_list: - if self.is_interested_in_user(member.state_key): + for user_id in member_list: + if self.is_interested_in_user(user_id): return True return False @@ -173,7 +173,7 @@ class ApplicationService(object): restrict_to(str): The namespace to restrict regex tests to. aliases_for_event(list): A list of all the known room aliases for this event. - member_list(list): A list of all joined room members in this room. + member_list(list): A list of all joined user_ids in this room. Returns: bool: True if this service would like to know about this event. """ diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 355ab317df..05735137d8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -147,10 +147,7 @@ class ApplicationServicesHandler(object): ) # We need to know the members associated with this event.room_id, # if any. - member_list = yield self.store.get_room_members( - room_id=event.room_id, - membership=Membership.JOIN - ) + member_list = yield self.store.get_users_in_room(event.room_id) services = yield self.store.get_app_services() interested_list = [ diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 62149d6902..8ce8dc0a87 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -217,18 +217,9 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("@irc_.*") ) join_list = [ - Mock( - type="m.room.member", room_id="!foo:bar", sender="@alice:here", - state_key="@alice:here" - ), - Mock( - type="m.room.member", room_id="!foo:bar", sender="@irc_fo:here", - state_key="@irc_fo:here" # AS user - ), - Mock( - type="m.room.member", room_id="!foo:bar", sender="@bob:here", - state_key="@bob:here" - ) + "@alice:here", + "@irc_fo:here", # AS user + "@bob:here", ] self.event.sender = "@xmpp_foobar:matrix.org" -- cgit 1.4.1 From c8135f808b4e0a344e6d5d592049d27cc43af9b1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 14:45:46 +0100 Subject: Remove unused import --- synapse/handlers/appservice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 05735137d8..8269482e47 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService from synapse.types import UserID -- cgit 1.4.1 From 106a3051b88be742d24ace05f72d9ab6bff29dd2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 15:53:03 +0100 Subject: Remove spurious TODO comment --- synapse/handlers/presence.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6c48b1d20e..670c1d353f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1140,7 +1140,6 @@ class PresenceEventSource(object): user_ids_to_check |= set(joined) updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. for observed_user in user_ids_to_check & set(cachemap): if not (to_key < cachemap[observed_user].serial <= from_key): continue -- cgit 1.4.1 From 1a9a9abcc73ebbd14fce0f45689e4648a71d55bc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 16:11:17 +0100 Subject: Add a cache for getting the presence list for a user --- synapse/handlers/presence.py | 24 +++++++++++++++--------- synapse/storage/presence.py | 35 +++++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 17 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 670c1d353f..023ad33ab0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -521,20 +521,26 @@ class PresenceHandler(BaseHandler): if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") - presence = yield self.store.get_presence_list( + presence_list = yield self.store.get_presence_list( observer_user.localpart, accepted=accepted ) - for p in presence: - observed_user = UserID.from_string(p.pop("observed_user_id")) - p["observed_user"] = observed_user - p.update(self._get_or_offline_usercache(observed_user).get_state()) - if "last_active" in p: - p["last_active_ago"] = int( - self.clock.time_msec() - p.pop("last_active") + results = [] + for row in presence_list: + observed_user = UserID.from_string(row["observed_user_id"]) + result = { + "observed_user": observed_user, "accepted": row["accepted"] + } + result.update( + self._get_or_offline_usercache(observed_user).get_state() + ) + if "last_active" in result: + result["last_active_ago"] = int( + self.clock.time_msec() - result.pop("last_active") ) + results.append(result) - defer.returnValue(presence) + defer.returnValue(results) @defer.inlineCallbacks @log_function diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 22ec94bc16..fefcf6bce0 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached + +from twisted.internet import defer class PresenceStore(SQLBaseStore): @@ -87,31 +89,48 @@ class PresenceStore(SQLBaseStore): desc="add_presence_list_pending", ) + @defer.inlineCallbacks def set_presence_list_accepted(self, observer_localpart, observed_userid): - return self._simple_update_one( + result = yield self._simple_update_one( table="presence_list", keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, updatevalues={"accepted": True}, desc="set_presence_list_accepted", ) + self.get_presence_list_accepted.invalidate(observer_localpart) + defer.returnValue(result) def get_presence_list(self, observer_localpart, accepted=None): - keyvalues = {"user_id": observer_localpart} - if accepted is not None: - keyvalues["accepted"] = accepted + if accepted: + return self.get_presence_list_accepted(observer_localpart) + else: + keyvalues = {"user_id": observer_localpart} + if accepted is not None: + keyvalues["accepted"] = accepted + return self._simple_select_list( + table="presence_list", + keyvalues=keyvalues, + retcols=["observed_user_id", "accepted"], + desc="get_presence_list", + ) + + @cached() + def get_presence_list_accepted(self, observer_localpart): return self._simple_select_list( table="presence_list", - keyvalues=keyvalues, + keyvalues={"user_id": observer_localpart, "accepted": True}, retcols=["observed_user_id", "accepted"], - desc="get_presence_list", + desc="get_presence_list_accepted", ) + @defer.inlineCallbacks def del_presence_list(self, observer_localpart, observed_userid): - return self._simple_delete_one( + yield self._simple_delete_one( table="presence_list", keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, desc="del_presence_list", ) + self.get_presence_list_accepted.invalidate(observer_localpart) -- cgit 1.4.1