summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-11-03 01:12:37 +1100
committerGitHub <noreply@github.com>2018-11-03 01:12:37 +1100
commitea708efedd7f207ce3fc05835612391816f8d4a1 (patch)
tree42bdd87aaffc48f6d92c7e6b4d748e2c8cb821fe /synapse/handlers/sync.py
parentchangelog (diff)
parentFix typing being reset causing infinite syncs (#4127) (diff)
downloadsynapse-ea708efedd7f207ce3fc05835612391816f8d4a1.tar.xz
Merge branch 'develop' into matthew/heroes-for-avatars matthew/heroes-for-avatars
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py136
1 files changed, 78 insertions, 58 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c1257521c1..48b7431433 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,11 +20,14 @@ import logging
 
 from six import iteritems, itervalues
 
+from prometheus_client import Counter
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.roommember import MemberSummary
+from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -36,6 +39,19 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+
+# Counts the number of times we returned a non-empty sync. `type` is one of
+# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
+# "true" or "false" depending on if the request asked for lazy loaded members or
+# not.
+non_empty_sync_counter = Counter(
+    "synapse_handlers_sync_nonempty_total",
+    "Count of non empty sync responses. type is initial_sync/full_state_sync"
+    "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
+    "enabled for that request.",
+    ["type", "lazy_loaded"],
+)
+
 # Store the cache that tracks which lazy-loaded members have been sent to a given
 # client for no more than 30 minutes.
 LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@@ -227,14 +243,16 @@ class SyncHandler(object):
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
                                 full_state):
+        if since_token is None:
+            sync_type = "initial_sync"
+        elif full_state:
+            sync_type = "full_state_sync"
+        else:
+            sync_type = "incremental_sync"
+
         context = LoggingContext.current_context()
         if context:
-            if since_token is None:
-                context.tag = "initial_sync"
-            elif full_state:
-                context.tag = "full_state_sync"
-            else:
-                context.tag = "incremental_sync"
+            context.tag = sync_type
 
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
@@ -242,7 +260,6 @@ class SyncHandler(object):
             result = yield self.current_sync_for_user(
                 sync_config, since_token, full_state=full_state,
             )
-            defer.returnValue(result)
         else:
             def current_sync_callback(before_token, after_token):
                 return self.current_sync_for_user(sync_config, since_token)
@@ -251,7 +268,15 @@ class SyncHandler(object):
                 sync_config.user.to_string(), timeout, current_sync_callback,
                 from_token=since_token,
             )
-            defer.returnValue(result)
+
+        if result:
+            if sync_config.filter_collection.lazy_load_members():
+                lazy_loaded = "true"
+            else:
+                lazy_loaded = "false"
+            non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+
+        defer.returnValue(result)
 
     def current_sync_for_user(self, sync_config, since_token=None,
                               full_state=False):
@@ -445,25 +470,20 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event, types=None, filtered_types=None):
+    def get_state_after_event(self, event, state_filter=StateFilter.all()):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
         state_ids = yield self.store.get_state_ids_for_event(
-            event.event_id, types, filtered_types=filtered_types,
+            event.event_id, state_filter=state_filter,
         )
         if event.is_state():
             state_ids = state_ids.copy()
@@ -471,18 +491,14 @@ class SyncHandler(object):
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
+    def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -498,7 +514,7 @@ class SyncHandler(object):
         if last_events:
             last_event = last_events[-1]
             state = yield self.get_state_after_event(
-                last_event, types, filtered_types=filtered_types,
+                last_event, state_filter=state_filter,
             )
 
         else:
@@ -539,11 +555,12 @@ class SyncHandler(object):
 
         last_event = last_events[-1]
         state_ids = yield self.store.get_state_ids_for_event(
-            last_event.event_id, [
+            last_event.event_id,
+            state_filter=StateFilter.from_types([
                 (EventTypes.Name, ''),
                 (EventTypes.CanonicalAlias, ''),
                 (EventTypes.RoomAvatar, ''),
-            ]
+            ]),
         )
 
         # this is heavily cached, thus: fast.
@@ -568,20 +585,20 @@ class SyncHandler(object):
         # haven't been "deleted" by blatting {} over the top.
         room_avatar_id = state_ids.get((EventTypes.RoomAvatar, ''))
         if room_avatar_id:
-            room_avatar = yield self.store.get_event(room_avatar_id, allow_none=False)
+            room_avatar = yield self.store.get_event(room_avatar_id, allow_none=True)
             if room_avatar and room_avatar.content:
                 # we have a room avatar; check to see if we can skip heroes
                 # because the room has an explicit name or canonical alias:
                 name_id = state_ids.get((EventTypes.Name, ''))
                 if name_id:
-                    name = yield self.store.get_event(name_id, allow_none=False)
+                    name = yield self.store.get_event(name_id, allow_none=True)
                     if name and name.content:
                         defer.returnValue(summary)
 
                 canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
                 if canonical_alias_id:
                     canonical_alias = yield self.store.get_event(
-                        canonical_alias_id, allow_none=False,
+                        canonical_alias_id, allow_none=Tue,
                     )
                     if canonical_alias and canonical_alias.content:
                         defer.returnValue(summary)
@@ -701,8 +718,7 @@ class SyncHandler(object):
 
         with Measure(self.clock, "compute_state_delta"):
 
-            types = None
-            filtered_types = None
+            members_to_fetch = None
 
             lazy_load_members = sync_config.filter_collection.lazy_load_members()
             include_redundant_members = (
@@ -713,16 +729,21 @@ class SyncHandler(object):
                 # We only request state for the members needed to display the
                 # timeline:
 
-                types = [
-                    (EventTypes.Member, state_key)
-                    for state_key in set(
-                        event.sender  # FIXME: we also care about invite targets etc.
-                        for event in batch.events
-                    )
-                ]
+                members_to_fetch = set(
+                    event.sender  # FIXME: we also care about invite targets etc.
+                    for event in batch.events
+                )
+
+                if full_state:
+                    # always make sure we LL ourselves so we know we're in the room
+                    # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+                    # We only need apply this on full state syncs given we disabled
+                    # LL for incr syncs in #3840.
+                    members_to_fetch.add(sync_config.user.to_string())
 
-                # only apply the filtering to room members
-                filtered_types = [EventTypes.Member]
+                state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+            else:
+                state_filter = StateFilter.all()
 
             timeline_state = {
                 (event.type, event.state_key): event.event_id
@@ -732,19 +753,17 @@ class SyncHandler(object):
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id, types=types,
-                        filtered_types=filtered_types,
+                        batch.events[-1].event_id, state_filter=state_filter,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id, types=types,
-                        filtered_types=filtered_types,
+                        batch.events[0].event_id, state_filter=state_filter,
                     )
 
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token, types=types,
-                        filtered_types=filtered_types,
+                        room_id, stream_position=now_token,
+                        state_filter=state_filter,
                     )
 
                     state_ids = current_state_ids
@@ -758,8 +777,7 @@ class SyncHandler(object):
                 )
             elif batch.limited:
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id, types=types,
-                    filtered_types=filtered_types,
+                    batch.events[0].event_id, state_filter=state_filter,
                 )
 
                 # for now, we disable LL for gappy syncs - see
@@ -774,17 +792,15 @@ class SyncHandler(object):
                 # members to just be ones which were timeline senders, which then ensures
                 # all of the rest get included in the state block (if we need to know
                 # about them).
-                types = None
-                filtered_types = None
+                state_filter = StateFilter.all()
 
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token, types=types,
-                    filtered_types=filtered_types,
+                    room_id, stream_position=since_token,
+                    state_filter=state_filter,
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id, types=types,
-                    filtered_types=filtered_types,
+                    batch.events[-1].event_id, state_filter=state_filter,
                 )
 
                 state_ids = _calculate_state(
@@ -798,7 +814,7 @@ class SyncHandler(object):
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    if types:
+                    if members_to_fetch and batch.events:
                         # We're returning an incremental sync, with no
                         # "gap" since the previous sync, so normally there would be
                         # no state to return.
@@ -808,8 +824,12 @@ class SyncHandler(object):
                         # timeline here, and then dedupe any redundant ones below.
 
                         state_ids = yield self.store.get_state_ids_for_event(
-                            batch.events[0].event_id, types=types,
-                            filtered_types=None,  # we only want members!
+                            batch.events[0].event_id,
+                            # we only want members!
+                            state_filter=StateFilter.from_types(
+                                (EventTypes.Member, member)
+                                for member in members_to_fetch
+                            ),
                         )
 
             if lazy_load_members and not include_redundant_members: