summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py17
-rw-r--r--synapse/handlers/e2e_keys.py2
-rw-r--r--synapse/handlers/federation.py151
-rw-r--r--synapse/handlers/message.py25
-rw-r--r--synapse/handlers/profile.py10
-rw-r--r--synapse/handlers/sync.py44
-rw-r--r--synapse/handlers/typing.py23
7 files changed, 181 insertions, 91 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index f0f89af7dc..17eedf4dbf 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -28,6 +28,7 @@ from synapse.metrics import (
     event_processing_loop_room_count,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import log_failure
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.metrics import Measure
 
@@ -36,17 +37,6 @@ logger = logging.getLogger(__name__)
 events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
 
 
-def log_failure(failure):
-    logger.error(
-        "Application Services Failure",
-        exc_info=(
-            failure.type,
-            failure.value,
-            failure.getTracebackObject()
-        )
-    )
-
-
 class ApplicationServicesHandler(object):
 
     def __init__(self, hs):
@@ -112,7 +102,10 @@ class ApplicationServicesHandler(object):
 
                         if not self.started_scheduler:
                             def start_scheduler():
-                                return self.scheduler.start().addErrback(log_failure)
+                                return self.scheduler.start().addErrback(
+                                    log_failure, "Application Services Failure",
+                                )
+
                             run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 578e9250fb..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -341,7 +341,7 @@ class E2eKeysHandler(object):
 def _exception_to_failure(e):
     if isinstance(e, CodeMessageException):
         return {
-            "status": e.code, "message": e.message,
+            "status": e.code, "message": str(e),
         }
 
     if isinstance(e, NotRetryingDestination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2ccdc3bfa7..45d955e6f5 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,7 +18,6 @@
 
 import itertools
 import logging
-import sys
 
 import six
 from six import iteritems, itervalues
@@ -106,7 +105,7 @@ class FederationHandler(BaseHandler):
 
         self.hs = hs
 
-        self.store = hs.get_datastore()
+        self.store = hs.get_datastore()  # type: synapse.storage.DataStore
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -323,14 +322,22 @@ class FederationHandler(BaseHandler):
                         affected=pdu.event_id,
                     )
 
-                # Calculate the state of the previous events, and
-                # de-conflict them to find the current state.
-                state_groups = []
+                # Calculate the state after each of the previous events, and
+                # resolve them to find the correct state at the current event.
                 auth_chains = set()
+                event_map = {
+                    event_id: pdu,
+                }
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(room_id, list(seen))
-                    state_groups.append(ours)
+                    ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+                    # state_maps is a list of mappings from (type, state_key) to event_id
+                    # type: list[dict[tuple[str, str], str]]
+                    state_maps = list(ours.values())
+
+                    # we don't need this any more, let's delete it.
+                    del ours
 
                     # Ask the remote server for the states we don't
                     # know about
@@ -339,27 +346,65 @@ class FederationHandler(BaseHandler):
                             "[%s %s] Requesting state at missing prev_event %s",
                             room_id, event_id, p,
                         )
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, room_id, p,
+
+                        with logcontext.nested_logging_context(p):
+                            # note that if any of the missing prevs share missing state or
+                            # auth events, the requests to fetch those events are deduped
+                            # by the get_pdu_cache in federation_client.
+                            remote_state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
                             )
-                        )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
+
+                            # we want the state *after* p; get_state_for_room returns the
+                            # state *before* p.
+                            remote_event = yield self.federation_client.get_pdu(
+                                [origin], p, outlier=True,
+                            )
+
+                            if remote_event is None:
+                                raise Exception(
+                                    "Unable to get missing prev_event %s" % (p, )
+                                )
+
+                            if remote_event.is_state():
+                                remote_state.append(remote_event)
+
+                            # XXX hrm I'm not convinced that duplicate events will compare
+                            # for equality, so I'm not sure this does what the author
+                            # hoped.
+                            auth_chains.update(got_auth_chain)
+
+                            remote_state_map = {
+                                (x.type, x.state_key): x.event_id for x in remote_state
+                            }
+                            state_maps.append(remote_state_map)
+
+                            for x in remote_state:
+                                event_map[x.event_id] = x
 
                     # Resolve any conflicting state
+                    @defer.inlineCallbacks
                     def fetch(ev_ids):
-                        return self.store.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False
+                        fetched = yield self.store.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False,
                         )
+                        # add any events we fetch here to the `event_map` so that we
+                        # can use them to build the state event list below.
+                        event_map.update(fetched)
+                        defer.returnValue(fetched)
 
                     room_version = yield self.store.get_room_version(room_id)
                     state_map = yield resolve_events_with_factory(
-                        room_version, state_groups, {event_id: pdu}, fetch
+                        room_version, state_maps, event_map, fetch,
                     )
 
-                    state = (yield self.store.get_events(state_map.values())).values()
+                    # we need to give _process_received_pdu the actual state events
+                    # rather than event ids, so generate that now.
+                    state = [
+                        event_map[e] for e in six.itervalues(state_map)
+                    ]
                     auth_chain = list(auth_chains)
                 except Exception:
                     logger.warn(
@@ -483,20 +528,21 @@ class FederationHandler(BaseHandler):
                 "[%s %s] Handling received prev_event %s",
                 room_id, event_id, ev.event_id,
             )
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    ev,
-                    sent_to_us_directly=False,
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn(
-                        "[%s %s] Received prev_event %s failed history check.",
-                        room_id, event_id, ev.event_id,
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
                     )
-                else:
-                    raise
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
     @defer.inlineCallbacks
     def _process_received_pdu(self, origin, event, state, auth_chain):
@@ -572,6 +618,10 @@ class FederationHandler(BaseHandler):
                     })
                     seen_ids.add(e.event_id)
 
+                logger.info(
+                    "[%s %s] persisting newly-received auth/state events %s",
+                    room_id, event_id, [e["event"].event_id for e in event_infos]
+                )
                 yield self._handle_new_events(origin, event_infos)
 
             try:
@@ -1135,7 +1185,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1550,6 +1601,9 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             if not event.internal_metadata.is_outlier() and not backfilled:
                 yield self.action_generator.handle_push_actions_for_event(
@@ -1560,15 +1614,13 @@ class FederationHandler(BaseHandler):
                 [(event, context)],
                 backfilled=backfilled,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            tp, value, tb = sys.exc_info()
-
-            logcontext.run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
-
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                logcontext.run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
         defer.returnValue(context)
 
@@ -1581,15 +1633,22 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e484061cc0..4954b23a0d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,9 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import sys
 
-import six
 from six import iteritems, itervalues, string_types
 
 from canonicaljson import encode_canonical_json, json
@@ -624,6 +622,9 @@ class EventCreationHandler(object):
             event, context
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
@@ -636,6 +637,7 @@ class EventCreationHandler(object):
                     ratelimit=ratelimit,
                     extra_users=extra_users,
                 )
+                success = True
                 return
 
             yield self.persist_and_notify_client_event(
@@ -645,17 +647,16 @@ class EventCreationHandler(object):
                 ratelimit=ratelimit,
                 extra_users=extra_users,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area, if we calculated them.
-            tp, value, tb = sys.exc_info()
-
-            run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
 
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                # Ensure that we actually remove the entries in the push actions
+                # staging area, if we calculated them.
+                run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..1dfbde84fd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -142,10 +142,8 @@ class BaseProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
                 raise
-            except Exception:
-                logger.exception("Failed to get displayname")
-            else:
-                defer.returnValue(result["displayname"])
+
+            defer.returnValue(result["displayname"])
 
     @defer.inlineCallbacks
     def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
@@ -199,8 +197,6 @@ class BaseProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get avatar_url")
                 raise
-            except Exception:
-                logger.exception("Failed to get avatar_url")
 
             defer.returnValue(result["avatar_url"])
 
@@ -278,7 +274,7 @@ class BaseProfileHandler(BaseHandler):
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    room_id, str(e.message)
+                    room_id, str(e)
                 )
 
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c7d69d9d80..351892a94f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,8 @@ import logging
 
 from six import iteritems, itervalues
 
+from prometheus_client import Counter
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -36,6 +38,19 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+
+# Counts the number of times we returned a non-empty sync. `type` is one of
+# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
+# "true" or "false" depending on if the request asked for lazy loaded members or
+# not.
+non_empty_sync_counter = Counter(
+    "synapse_handlers_sync_nonempty_total",
+    "Count of non empty sync responses. type is initial_sync/full_state_sync"
+    "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
+    "enabled for that request.",
+    ["type", "lazy_loaded"],
+)
+
 # Store the cache that tracks which lazy-loaded members have been sent to a given
 # client for no more than 30 minutes.
 LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@@ -227,14 +242,16 @@ class SyncHandler(object):
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
                                 full_state):
+        if since_token is None:
+            sync_type = "initial_sync"
+        elif full_state:
+            sync_type = "full_state_sync"
+        else:
+            sync_type = "incremental_sync"
+
         context = LoggingContext.current_context()
         if context:
-            if since_token is None:
-                context.tag = "initial_sync"
-            elif full_state:
-                context.tag = "full_state_sync"
-            else:
-                context.tag = "incremental_sync"
+            context.tag = sync_type
 
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
@@ -242,7 +259,6 @@ class SyncHandler(object):
             result = yield self.current_sync_for_user(
                 sync_config, since_token, full_state=full_state,
             )
-            defer.returnValue(result)
         else:
             def current_sync_callback(before_token, after_token):
                 return self.current_sync_for_user(sync_config, since_token)
@@ -251,7 +267,15 @@ class SyncHandler(object):
                 sync_config.user.to_string(), timeout, current_sync_callback,
                 from_token=since_token,
             )
-            defer.returnValue(result)
+
+        if result:
+            if sync_config.filter_collection.lazy_load_members():
+                lazy_loaded = "true"
+            else:
+                lazy_loaded = "false"
+            non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+
+        defer.returnValue(result)
 
     def current_sync_for_user(self, sync_config, since_token=None,
                               full_state=False):
@@ -567,13 +591,13 @@ class SyncHandler(object):
         # be a valid name or canonical_alias - i.e. we're checking that they
         # haven't been "deleted" by blatting {} over the top.
         if name_id:
-            name = yield self.store.get_event(name_id, allow_none=False)
+            name = yield self.store.get_event(name_id, allow_none=True)
             if name and name.content:
                 defer.returnValue(summary)
 
         if canonical_alias_id:
             canonical_alias = yield self.store.get_event(
-                canonical_alias_id, allow_none=False,
+                canonical_alias_id, allow_none=True,
             )
             if canonical_alias and canonical_alias.content:
                 defer.returnValue(summary)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a0d..c610933dd4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -68,6 +69,11 @@ class TypingHandler(object):
         # map room IDs to sets of users currently typing
         self._room_typing = {}
 
+        # caches which room_ids changed at which serials
+        self._typing_stream_change_cache = StreamChangeCache(
+            "TypingStreamChangeCache", self._latest_room_serial,
+        )
+
         self.clock.looping_call(
             self._handle_timeouts,
             5000,
@@ -218,6 +224,7 @@ class TypingHandler(object):
 
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
+                    logger.debug("sending typing update to %s", domain)
                     self.federation.send_edu(
                         destination=domain,
                         edu_type="m.typing",
@@ -274,19 +281,29 @@ class TypingHandler(object):
 
         self._latest_room_serial += 1
         self._room_serials[member.room_id] = self._latest_room_serial
+        self._typing_stream_change_cache.entity_has_changed(
+            member.room_id, self._latest_room_serial,
+        )
 
         self.notifier.on_new_event(
             "typing_key", self._latest_room_serial, rooms=[member.room_id]
         )
 
     def get_all_typing_updates(self, last_id, current_id):
-        # TODO: Work out a way to do this without scanning the entire state.
         if last_id == current_id:
             return []
 
+        changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+            last_id,
+        )
+
+        if changed_rooms is None:
+            changed_rooms = self._room_serials
+
         rows = []
-        for room_id, serial in self._room_serials.items():
-            if last_id < serial and serial <= current_id:
+        for room_id in changed_rooms:
+            serial = self._room_serials[room_id]
+            if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
                 rows.append((serial, room_id, list(typing)))
         rows.sort()