From d7b3ac46f8ac32048559e06770e4fc2d57caeaf7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 12 May 2015 15:44:21 +0100 Subject: Revert "Improvement to performance of presence event stream handling" --- synapse/handlers/presence.py | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 28688d532d..1edab05492 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -26,7 +26,6 @@ import synapse.metrics from ._base import BaseHandler import logging -from collections import OrderedDict logger = logging.getLogger(__name__) @@ -144,7 +143,7 @@ class PresenceHandler(BaseHandler): self._remote_offline_serials = [] # map any user to a UserPresenceCache - self._user_cachemap = OrderedDict() # keep them sorted by serial + self._user_cachemap = {} self._user_cachemap_latest_serial = 0 metrics.register_callback( @@ -166,14 +165,6 @@ class PresenceHandler(BaseHandler): else: return UserPresenceCache() - def _bump_serial(self, user=None): - self._user_cachemap_latest_serial += 1 - - if user: - # Move to end - cache = self._user_cachemap.pop(user) - self._user_cachemap[user] = cache - def registered_user(self, user): return self.store.create_presence(user.localpart) @@ -309,7 +300,7 @@ class PresenceHandler(BaseHandler): def changed_presencelike_data(self, user, state): statuscache = self._get_or_make_usercache(user) - self._bump_serial(user=user) + self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) return self.push_presence(user, statuscache=statuscache) @@ -331,7 +322,7 @@ class PresenceHandler(BaseHandler): # No actual update but we need to bump the serial anyway for the # event source - self._bump_serial() + self._user_cachemap_latest_serial += 1 statuscache.update({}, serial=self._user_cachemap_latest_serial) self.push_update_to_local_and_remote( @@ -713,7 +704,7 @@ class PresenceHandler(BaseHandler): statuscache = self._get_or_make_usercache(user) - self._bump_serial(user=user) + self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) if not observers and not room_ids: @@ -873,15 +864,10 @@ class PresenceEventSource(object): updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in reversed(cachemap.keys()): + for observed_user in cachemap.keys(): cached = cachemap[observed_user] - # Since this is ordered in descending order of serial, we can just - # stop once we've seen enough - if cached.serial <= from_key: - break - - if cached.serial > max_serial: + if cached.serial <= from_key or cached.serial > max_serial: continue if not (yield self.is_visible(observer_user, observed_user)): -- cgit 1.4.1 From e12268597804d759f82ccd827359059ba7cb05ef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 16:12:37 +0100 Subject: You need to call contextmanager --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a5a6869079..9242b0a84e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -48,6 +48,7 @@ class EventsStore(SQLBaseStore): @contextmanager def stream_ordering_manager(): yield stream_ordering + stream_ordering_manager = stream_ordering_manager() try: with stream_ordering_manager as stream_ordering: -- cgit 1.4.1 From 80fd2b574c9e56637e67b996289607185c590109 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 16:19:42 +0100 Subject: Don't talk to yourself when backfilling --- synapse/handlers/federation.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7d9906039e..880cbd77e7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -314,6 +314,7 @@ class FederationHandler(BaseHandler): likely_domains = [ domain for domain, depth in curr_domains + if domain is not self.server_name ] @defer.inlineCallbacks @@ -363,6 +364,7 @@ class FederationHandler(BaseHandler): # from the time. tried_domains = set(likely_domains) + tried_domains.add(self.server_name) event_ids = list(extremities.keys()) -- cgit 1.4.1