summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-05-12 15:31:26 +0100
committerMark Haines <mark.haines@matrix.org>2015-05-12 15:31:26 +0100
commit4429e4bf2499b0b3fecc8637c2a7a932aab908e4 (patch)
tree0c17e1767434df9684c3d34263a161ba4fa1d9b9 /synapse/handlers
parentUpdate the end_token correctly, otherwise the token doesn't advance and the c... (diff)
parentMerge pull request #143 from matrix-org/erikj/SYN-375 (diff)
downloadsynapse-4429e4bf2499b0b3fecc8637c2a7a932aab908e4.tar.xz
Merge branch 'develop' into notifier_unify
Conflicts:
	synapse/notifier.py
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py11
-rw-r--r--synapse/handlers/events.py8
-rw-r--r--synapse/handlers/federation.py173
-rw-r--r--synapse/handlers/message.py19
-rw-r--r--synapse/handlers/presence.py70
-rw-r--r--synapse/handlers/profile.py17
-rw-r--r--synapse/handlers/room.py8
-rw-r--r--synapse/handlers/typing.py4
8 files changed, 238 insertions, 72 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 4b3f4eadab..ddc5c21e7d 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID
 
+from synapse.util.logcontext import PreserveLoggingContext
+
 import logging
 
 
@@ -137,10 +139,11 @@ class BaseHandler(object):
                     "Failed to get destination from event %s", s.event_id
                 )
 
-        # Don't block waiting on waking up all the listeners.
-        notify_d = self.notifier.on_new_room_event(
-            event, extra_users=extra_users
-        )
+        with PreserveLoggingContext():
+            # Don't block waiting on waking up all the listeners.
+            notify_d = self.notifier.on_new_room_event(
+                event, extra_users=extra_users
+            )
 
         def log_failure(f):
             logger.warn(
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f9f855213b..993d33ba47 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -15,7 +15,6 @@
 
 from twisted.internet import defer
 
-from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.types import UserID
 from synapse.events.utils import serialize_event
@@ -81,10 +80,9 @@ class EventStreamHandler(BaseHandler):
                 # thundering herds on restart.
                 timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
 
-            with PreserveLoggingContext():
-                events, tokens = yield self.notifier.get_events_for(
-                    auth_user, room_ids, pagin_config, timeout
-                )
+            events, tokens = yield self.notifier.get_events_for(
+                auth_user, room_ids, pagin_config, timeout
+            )
 
             time_now = self.clock.time_msec()
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 85e2757227..7d9906039e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,9 +18,11 @@
 from ._base import BaseHandler
 
 from synapse.api.errors import (
-    AuthError, FederationError, StoreError,
+    AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.util import unwrapFirstError
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.util.frozenutils import unfreeze
@@ -29,6 +31,8 @@ from synapse.crypto.event_signing import (
 )
 from synapse.types import UserID
 
+from synapse.util.retryutils import NotRetryingDestination
+
 from twisted.internet import defer
 
 import itertools
@@ -197,9 +201,10 @@ class FederationHandler(BaseHandler):
                 target_user = UserID.from_string(target_user_id)
                 extra_users.append(target_user)
 
-            d = self.notifier.on_new_room_event(
-                event, extra_users=extra_users
-            )
+            with PreserveLoggingContext():
+                d = self.notifier.on_new_room_event(
+                    event, extra_users=extra_users
+                )
 
             def log_failure(f):
                 logger.warn(
@@ -218,10 +223,11 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def backfill(self, dest, room_id, limit):
+    def backfill(self, dest, room_id, limit, extremities=[]):
         """ Trigger a backfill request to `dest` for the given `room_id`
         """
-        extremities = yield self.store.get_oldest_events_in_room(room_id)
+        if not extremities:
+            extremities = yield self.store.get_oldest_events_in_room(room_id)
 
         pdus = yield self.replication_layer.backfill(
             dest,
@@ -249,6 +255,138 @@ class FederationHandler(BaseHandler):
         defer.returnValue(events)
 
     @defer.inlineCallbacks
+    def maybe_backfill(self, room_id, current_depth):
+        """Checks the database to see if we should backfill before paginating,
+        and if so do.
+        """
+        extremities = yield self.store.get_oldest_events_with_depth_in_room(
+            room_id
+        )
+
+        if not extremities:
+            logger.debug("Not backfilling as no extremeties found.")
+            return
+
+        # Check if we reached a point where we should start backfilling.
+        sorted_extremeties_tuple = sorted(
+            extremities.items(),
+            key=lambda e: -int(e[1])
+        )
+        max_depth = sorted_extremeties_tuple[0][1]
+
+        if current_depth > max_depth:
+            logger.debug(
+                "Not backfilling as we don't need to. %d < %d",
+                max_depth, current_depth,
+            )
+            return
+
+        # Now we need to decide which hosts to hit first.
+
+        # First we try hosts that are already in the room
+        # TODO: HEURISTIC ALERT.
+
+        curr_state = yield self.state_handler.get_current_state(room_id)
+
+        def get_domains_from_state(state):
+            joined_users = [
+                (state_key, int(event.depth))
+                for (e_type, state_key), event in state.items()
+                if e_type == EventTypes.Member
+                and event.membership == Membership.JOIN
+            ]
+
+            joined_domains = {}
+            for u, d in joined_users:
+                try:
+                    dom = UserID.from_string(u).domain
+                    old_d = joined_domains.get(dom)
+                    if old_d:
+                        joined_domains[dom] = min(d, old_d)
+                    else:
+                        joined_domains[dom] = d
+                except:
+                    pass
+
+            return sorted(joined_domains.items(), key=lambda d: d[1])
+
+        curr_domains = get_domains_from_state(curr_state)
+
+        likely_domains = [
+            domain for domain, depth in curr_domains
+        ]
+
+        @defer.inlineCallbacks
+        def try_backfill(domains):
+            # TODO: Should we try multiple of these at a time?
+            for dom in domains:
+                try:
+                    events = yield self.backfill(
+                        dom, room_id,
+                        limit=100,
+                        extremities=[e for e in extremities.keys()]
+                    )
+                except SynapseError:
+                    logger.info(
+                        "Failed to backfill from %s because %s",
+                        dom, e,
+                    )
+                    continue
+                except CodeMessageException as e:
+                    if 400 <= e.code < 500:
+                        raise
+
+                    logger.info(
+                        "Failed to backfill from %s because %s",
+                        dom, e,
+                    )
+                    continue
+                except NotRetryingDestination as e:
+                    logger.info(e.message)
+                    continue
+                except Exception as e:
+                    logger.warn(
+                        "Failed to backfill from %s because %s",
+                        dom, e,
+                    )
+                    continue
+
+                if events:
+                    defer.returnValue(True)
+            defer.returnValue(False)
+
+        success = yield try_backfill(likely_domains)
+        if success:
+            defer.returnValue(True)
+
+        # Huh, well *those* domains didn't work out. Lets try some domains
+        # from the time.
+
+        tried_domains = set(likely_domains)
+
+        event_ids = list(extremities.keys())
+
+        states = yield defer.gatherResults([
+            self.state_handler.resolve_state_groups([e])
+            for e in event_ids
+        ])
+        states = dict(zip(event_ids, [s[1] for s in states]))
+
+        for e_id, _ in sorted_extremeties_tuple:
+            likely_domains = get_domains_from_state(states[e_id])
+
+            success = yield try_backfill([
+                dom for dom in likely_domains
+                if dom not in tried_domains
+            ])
+            if success:
+                defer.returnValue(True)
+
+            tried_domains.update(likely_domains)
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
     def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
 
@@ -431,9 +569,10 @@ class FederationHandler(BaseHandler):
                 auth_events=auth_events,
             )
 
-            d = self.notifier.on_new_room_event(
-                new_event, extra_users=[joinee]
-            )
+            with PreserveLoggingContext():
+                d = self.notifier.on_new_room_event(
+                    new_event, extra_users=[joinee]
+                )
 
             def log_failure(f):
                 logger.warn(
@@ -512,9 +651,10 @@ class FederationHandler(BaseHandler):
             target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
-        d = self.notifier.on_new_room_event(
-            event, extra_users=extra_users
-        )
+        with PreserveLoggingContext():
+            d = self.notifier.on_new_room_event(
+                event, extra_users=extra_users
+            )
 
         def log_failure(f):
             logger.warn(
@@ -594,9 +734,10 @@ class FederationHandler(BaseHandler):
         )
 
         target_user = UserID.from_string(event.state_key)
-        d = self.notifier.on_new_room_event(
-            event, extra_users=[target_user],
-        )
+        with PreserveLoggingContext():
+            d = self.notifier.on_new_room_event(
+                event, extra_users=[target_user],
+            )
 
         def log_failure(f):
             logger.warn(
@@ -921,7 +1062,7 @@ class FederationHandler(BaseHandler):
                     if d in have_events and not have_events[d]
                 ],
                 consumeErrors=True
-            )
+            ).addErrback(unwrapFirstError)
 
             if different_events:
                 local_view = dict(auth_events)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 22e19af17f..867fdbefb0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,8 +20,9 @@ from synapse.api.errors import RoomError, SynapseError
 from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.util import unwrapFirstError
 from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID
+from synapse.types import UserID, RoomStreamToken
 
 from ._base import BaseHandler
 
@@ -89,9 +90,19 @@ class MessageHandler(BaseHandler):
 
         if not pagin_config.from_token:
             pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token()
+                yield self.hs.get_event_sources().get_current_token(
+                    direction='b'
+                )
             )
 
+        room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+        if room_token.topological is None:
+            raise SynapseError(400, "Invalid token")
+
+        yield self.hs.get_handlers().federation_handler.maybe_backfill(
+            room_id, room_token.topological
+        )
+
         user = UserID.from_string(user_id)
 
         events, next_key = yield data_source.get_pagination_rows(
@@ -303,7 +314,7 @@ class MessageHandler(BaseHandler):
                             event.room_id
                         ),
                     ]
-                )
+                ).addErrback(unwrapFirstError)
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
                 end_token = now_token.copy_and_replace("room_key", token[1])
@@ -328,7 +339,7 @@ class MessageHandler(BaseHandler):
         yield defer.gatherResults(
             [handle_room(e) for e in room_list],
             consumeErrors=True
-        )
+        ).addErrback(unwrapFirstError)
 
         ret = {
             "rooms": rooms_ret,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9e15610401..28688d532d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,14 +18,15 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
 
-from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logutils import log_function
 from synapse.types import UserID
 import synapse.metrics
 
 from ._base import BaseHandler
 
 import logging
+from collections import OrderedDict
 
 
 logger = logging.getLogger(__name__)
@@ -143,7 +144,7 @@ class PresenceHandler(BaseHandler):
         self._remote_offline_serials = []
 
         # map any user to a UserPresenceCache
-        self._user_cachemap = {}
+        self._user_cachemap = OrderedDict()  # keep them sorted by serial
         self._user_cachemap_latest_serial = 0
 
         metrics.register_callback(
@@ -165,6 +166,14 @@ class PresenceHandler(BaseHandler):
         else:
             return UserPresenceCache()
 
+    def _bump_serial(self, user=None):
+        self._user_cachemap_latest_serial += 1
+
+        if user:
+            # Move to end
+            cache = self._user_cachemap.pop(user)
+            self._user_cachemap[user] = cache
+
     def registered_user(self, user):
         return self.store.create_presence(user.localpart)
 
@@ -278,15 +287,14 @@ class PresenceHandler(BaseHandler):
         now_online = state["presence"] != PresenceState.OFFLINE
         was_polling = target_user in self._user_cachemap
 
-        with PreserveLoggingContext():
-            if now_online and not was_polling:
-                self.start_polling_presence(target_user, state=state)
-            elif not now_online and was_polling:
-                self.stop_polling_presence(target_user)
+        if now_online and not was_polling:
+            self.start_polling_presence(target_user, state=state)
+        elif not now_online and was_polling:
+            self.stop_polling_presence(target_user)
 
-            # TODO(paul): perform a presence push as part of start/stop poll so
-            #   we don't have to do this all the time
-            self.changed_presencelike_data(target_user, state)
+        # TODO(paul): perform a presence push as part of start/stop poll so
+        #   we don't have to do this all the time
+        self.changed_presencelike_data(target_user, state)
 
     def bump_presence_active_time(self, user, now=None):
         if now is None:
@@ -301,7 +309,7 @@ class PresenceHandler(BaseHandler):
     def changed_presencelike_data(self, user, state):
         statuscache = self._get_or_make_usercache(user)
 
-        self._user_cachemap_latest_serial += 1
+        self._bump_serial(user=user)
         statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
         return self.push_presence(user, statuscache=statuscache)
@@ -323,7 +331,7 @@ class PresenceHandler(BaseHandler):
 
             # No actual update but we need to bump the serial anyway for the
             # event source
-            self._user_cachemap_latest_serial += 1
+            self._bump_serial()
             statuscache.update({}, serial=self._user_cachemap_latest_serial)
 
             self.push_update_to_local_and_remote(
@@ -408,10 +416,10 @@ class PresenceHandler(BaseHandler):
         yield self.store.set_presence_list_accepted(
             observer_user.localpart, observed_user.to_string()
         )
-        with PreserveLoggingContext():
-            self.start_polling_presence(
-                observer_user, target_user=observed_user
-            )
+
+        self.start_polling_presence(
+            observer_user, target_user=observed_user
+        )
 
     @defer.inlineCallbacks
     def deny_presence(self, observed_user, observer_user):
@@ -430,10 +438,9 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        with PreserveLoggingContext():
-            self.stop_polling_presence(
-                observer_user, target_user=observed_user
-            )
+        self.stop_polling_presence(
+            observer_user, target_user=observed_user
+        )
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
@@ -706,7 +713,7 @@ class PresenceHandler(BaseHandler):
 
             statuscache = self._get_or_make_usercache(user)
 
-            self._user_cachemap_latest_serial += 1
+            self._bump_serial(user=user)
             statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
             if not observers and not room_ids:
@@ -766,8 +773,7 @@ class PresenceHandler(BaseHandler):
                 if not self._remote_sendmap[user]:
                     del self._remote_sendmap[user]
 
-        with PreserveLoggingContext():
-            yield defer.DeferredList(deferreds, consumeErrors=True)
+        yield defer.DeferredList(deferreds, consumeErrors=True)
 
     @defer.inlineCallbacks
     def push_update_to_local_and_remote(self, observed_user, statuscache,
@@ -812,10 +818,11 @@ class PresenceHandler(BaseHandler):
 
     def push_update_to_clients(self, observed_user, users_to_push=[],
                                room_ids=[], statuscache=None):
-        self.notifier.on_new_user_event(
-            users_to_push,
-            room_ids,
-        )
+        with PreserveLoggingContext():
+            self.notifier.on_new_user_event(
+                users_to_push,
+                room_ids,
+            )
 
 
 class PresenceEventSource(object):
@@ -866,10 +873,15 @@ class PresenceEventSource(object):
 
         updates = []
         # TODO(paul): use a DeferredList ? How to limit concurrency.
-        for observed_user in cachemap.keys():
+        for observed_user in reversed(cachemap.keys()):
             cached = cachemap[observed_user]
 
-            if cached.serial <= from_key or cached.serial > max_serial:
+            # Since this is ordered in descending order of serial, we can just
+            # stop once we've seen enough
+            if cached.serial <= from_key:
+                break
+
+            if cached.serial > max_serial:
                 continue
 
             if not (yield self.is_visible(observer_user, observed_user)):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index ee2732b848..71ff78ab23 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,8 +17,8 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError, CodeMessageException
 from synapse.api.constants import EventTypes, Membership
-from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import UserID
+from synapse.util import unwrapFirstError
 
 from ._base import BaseHandler
 
@@ -154,14 +154,13 @@ class ProfileHandler(BaseHandler):
         if not self.hs.is_mine(user):
             defer.returnValue(None)
 
-        with PreserveLoggingContext():
-            (displayname, avatar_url) = yield defer.gatherResults(
-                [
-                    self.store.get_profile_displayname(user.localpart),
-                    self.store.get_profile_avatar_url(user.localpart),
-                ],
-                consumeErrors=True
-            )
+        (displayname, avatar_url) = yield defer.gatherResults(
+            [
+                self.store.get_profile_displayname(user.localpart),
+                self.store.get_profile_avatar_url(user.localpart),
+            ],
+            consumeErrors=True
+        ).addErrback(unwrapFirstError)
 
         state["displayname"] = displayname
         state["avatar_url"] = avatar_url
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cfa2e38ed2..dac683616a 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -21,7 +21,7 @@ from ._base import BaseHandler
 from synapse.types import UserID, RoomAlias, RoomID
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import StoreError, SynapseError
-from synapse.util import stringutils
+from synapse.util import stringutils, unwrapFirstError
 from synapse.util.async import run_on_reactor
 from synapse.events.utils import serialize_event
 
@@ -537,7 +537,7 @@ class RoomListHandler(BaseHandler):
                 for room in chunk
             ],
             consumeErrors=True,
-        )
+        ).addErrback(unwrapFirstError)
 
         for i, room in enumerate(chunk):
             room["num_joined_members"] = len(results[i])
@@ -577,8 +577,8 @@ class RoomEventSource(object):
 
         defer.returnValue((events, end_key))
 
-    def get_current_key(self):
-        return self.store.get_room_events_max_id()
+    def get_current_key(self, direction='f'):
+        return self.store.get_room_events_max_id(direction)
 
     @defer.inlineCallbacks
     def get_pagination_rows(self, user, config, key):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c0b2bd7db0..64fe51aa3e 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, AuthError
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import UserID
 
 import logging
@@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler):
         self._latest_room_serial += 1
         self._room_serials[room_id] = self._latest_room_serial
 
-        self.notifier.on_new_user_event(rooms=[room_id])
+        with PreserveLoggingContext():
+            self.notifier.on_new_user_event(rooms=[room_id])
 
 
 class TypingNotificationEventSource(object):