summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/3350.misc1
-rw-r--r--changelog.d/3586.misc1
-rw-r--r--changelog.d/3587.misc1
-rw-r--r--synapse/events/snapshot.py3
-rw-r--r--synapse/federation/federation_server.py11
-rw-r--r--synapse/handlers/federation.py13
-rw-r--r--synapse/state.py143
-rw-r--r--synapse/storage/push_rule.py13
-rw-r--r--synapse/storage/roommember.py15
-rw-r--r--synapse/visibility.py19
10 files changed, 109 insertions, 111 deletions
diff --git a/changelog.d/3350.misc b/changelog.d/3350.misc
new file mode 100644
index 0000000000..3713cd6d63
--- /dev/null
+++ b/changelog.d/3350.misc
@@ -0,0 +1 @@
+Remove redundant checks on who_forgot_in_room
\ No newline at end of file
diff --git a/changelog.d/3586.misc b/changelog.d/3586.misc
new file mode 100644
index 0000000000..e853e2481b
--- /dev/null
+++ b/changelog.d/3586.misc
@@ -0,0 +1 @@
+Fixes and optimisations for resolve_state_groups
diff --git a/changelog.d/3587.misc b/changelog.d/3587.misc
new file mode 100644
index 0000000000..75a3479910
--- /dev/null
+++ b/changelog.d/3587.misc
@@ -0,0 +1 @@
+Improve logging for exceptions when handling PDUs
\ No newline at end of file
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 189212b0fa..368b5f6ae4 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -249,7 +249,7 @@ class EventContext(object):
 
     @defer.inlineCallbacks
     def update_state(self, state_group, prev_state_ids, current_state_ids,
-                     delta_ids):
+                     prev_group, delta_ids):
         """Replace the state in the context
         """
 
@@ -260,6 +260,7 @@ class EventContext(object):
 
         self.state_group = state_group
         self._prev_state_ids = prev_state_ids
+        self.prev_group = prev_group
         self._current_state_ids = current_state_ids
         self.delta_ids = delta_ids
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 48f26db67c..e501251b6e 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -24,6 +24,7 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
+from twisted.python import failure
 
 from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
@@ -186,8 +187,12 @@ class FederationServer(FederationBase):
                     logger.warn("Error handling PDU %s: %s", event_id, e)
                     pdu_results[event_id] = {"error": str(e)}
                 except Exception as e:
+                    f = failure.Failure()
                     pdu_results[event_id] = {"error": str(e)}
-                    logger.exception("Failed to handle PDU %s", event_id)
+                    logger.error(
+                        "Failed to handle PDU %s: %s",
+                        event_id, f.getTraceback().rstrip(),
+                    )
 
         yield async.concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(),
@@ -203,8 +208,8 @@ class FederationServer(FederationBase):
                 )
 
         pdu_failures = getattr(transaction, "pdu_failures", [])
-        for failure in pdu_failures:
-            logger.info("Got failure %r", failure)
+        for fail in pdu_failures:
+            logger.info("Got failure %r", fail)
 
         response = {
             "pdus": pdu_results,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 14654d59f1..145c1a21d4 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1980,10 +1980,6 @@ class FederationHandler(BaseHandler):
 
         current_state_ids.update(state_updates)
 
-        if context.delta_ids is not None:
-            delta_ids = dict(context.delta_ids)
-            delta_ids.update(state_updates)
-
         prev_state_ids = yield context.get_prev_state_ids(self.store)
         prev_state_ids = dict(prev_state_ids)
 
@@ -1991,11 +1987,13 @@ class FederationHandler(BaseHandler):
             k: a.event_id for k, a in iteritems(auth_events)
         })
 
+        # create a new state group as a delta from the existing one.
+        prev_group = context.state_group
         state_group = yield self.store.store_state_group(
             event.event_id,
             event.room_id,
-            prev_group=context.prev_group,
-            delta_ids=delta_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
             current_state_ids=current_state_ids,
         )
 
@@ -2003,7 +2001,8 @@ class FederationHandler(BaseHandler):
             state_group=state_group,
             current_state_ids=current_state_ids,
             prev_state_ids=prev_state_ids,
-            delta_ids=delta_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/state.py b/synapse/state.py
index 32125c95df..033f55d967 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -471,69 +471,39 @@ class StateResolutionHandler(object):
                 "Resolving state for %s with %d groups", room_id, len(state_groups_ids)
             )
 
-            # build a map from state key to the event_ids which set that state.
-            # dict[(str, str), set[str])
-            state = {}
+            # start by assuming we won't have any conflicted state, and build up the new
+            # state map by iterating through the state groups. If we discover a conflict,
+            # we give up and instead use `resolve_events_with_factory`.
+            #
+            # XXX: is this actually worthwhile, or should we just let
+            # resolve_events_with_factory do it?
+            new_state = {}
+            conflicted_state = False
             for st in itervalues(state_groups_ids):
                 for key, e_id in iteritems(st):
-                    state.setdefault(key, set()).add(e_id)
-
-            # build a map from state key to the event_ids which set that state,
-            # including only those where there are state keys in conflict.
-            conflicted_state = {
-                k: list(v)
-                for k, v in iteritems(state)
-                if len(v) > 1
-            }
+                    if key in new_state:
+                        conflicted_state = True
+                        break
+                    new_state[key] = e_id
+                if conflicted_state:
+                    break
 
             if conflicted_state:
                 logger.info("Resolving conflicted state for %r", room_id)
                 with Measure(self.clock, "state._resolve_events"):
                     new_state = yield resolve_events_with_factory(
-                        list(state_groups_ids.values()),
+                        list(itervalues(state_groups_ids)),
                         event_map=event_map,
                         state_map_factory=state_map_factory,
                     )
-            else:
-                new_state = {
-                    key: e_ids.pop() for key, e_ids in iteritems(state)
-                }
 
-            with Measure(self.clock, "state.create_group_ids"):
-                # if the new state matches any of the input state groups, we can
-                # use that state group again. Otherwise we will generate a state_id
-                # which will be used as a cache key for future resolutions, but
-                # not get persisted.
-                state_group = None
-                new_state_event_ids = frozenset(itervalues(new_state))
-                for sg, events in iteritems(state_groups_ids):
-                    if new_state_event_ids == frozenset(e_id for e_id in events):
-                        state_group = sg
-                        break
+            # if the new state matches any of the input state groups, we can
+            # use that state group again. Otherwise we will generate a state_id
+            # which will be used as a cache key for future resolutions, but
+            # not get persisted.
 
-                # TODO: We want to create a state group for this set of events, to
-                # increase cache hits, but we need to make sure that it doesn't
-                # end up as a prev_group without being added to the database
-
-                prev_group = None
-                delta_ids = None
-                for old_group, old_ids in iteritems(state_groups_ids):
-                    if not set(new_state) - set(old_ids):
-                        n_delta_ids = {
-                            k: v
-                            for k, v in iteritems(new_state)
-                            if old_ids.get(k) != v
-                        }
-                        if not delta_ids or len(n_delta_ids) < len(delta_ids):
-                            prev_group = old_group
-                            delta_ids = n_delta_ids
-
-            cache = _StateCacheEntry(
-                state=new_state,
-                state_group=state_group,
-                prev_group=prev_group,
-                delta_ids=delta_ids,
-            )
+            with Measure(self.clock, "state.create_group_ids"):
+                cache = _make_state_cache_entry(new_state, state_groups_ids)
 
             if self._state_cache is not None:
                 self._state_cache[group_names] = cache
@@ -541,6 +511,70 @@ class StateResolutionHandler(object):
             defer.returnValue(cache)
 
 
+def _make_state_cache_entry(
+    new_state,
+    state_groups_ids,
+):
+    """Given a resolved state, and a set of input state groups, pick one to base
+    a new state group on (if any), and return an appropriately-constructed
+    _StateCacheEntry.
+
+    Args:
+        new_state (dict[(str, str), str]): resolved state map (mapping from
+           (type, state_key) to event_id)
+
+        state_groups_ids (dict[int, dict[(str, str), str]]):
+                 map from state group id to the state in that state group
+                (where 'state' is a map from state key to event id)
+
+    Returns:
+        _StateCacheEntry
+    """
+    # if the new state matches any of the input state groups, we can
+    # use that state group again. Otherwise we will generate a state_id
+    # which will be used as a cache key for future resolutions, but
+    # not get persisted.
+
+    # first look for exact matches
+    new_state_event_ids = set(itervalues(new_state))
+    for sg, state in iteritems(state_groups_ids):
+        if len(new_state_event_ids) != len(state):
+            continue
+
+        old_state_event_ids = set(itervalues(state))
+        if new_state_event_ids == old_state_event_ids:
+            # got an exact match.
+            return _StateCacheEntry(
+                state=new_state,
+                state_group=sg,
+            )
+
+    # TODO: We want to create a state group for this set of events, to
+    # increase cache hits, but we need to make sure that it doesn't
+    # end up as a prev_group without being added to the database
+
+    # failing that, look for the closest match.
+    prev_group = None
+    delta_ids = None
+
+    for old_group, old_state in iteritems(state_groups_ids):
+        n_delta_ids = {
+            k: v
+            for k, v in iteritems(new_state)
+            if old_state.get(k) != v
+        }
+        if not delta_ids or len(n_delta_ids) < len(delta_ids):
+            prev_group = old_group
+            delta_ids = n_delta_ids
+
+    return _StateCacheEntry(
+        state=new_state,
+        state_group=None,
+        prev_group=prev_group,
+        delta_ids=delta_ids,
+    )
+
+
 def _ordered_events(events):
     def key_func(e):
         return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest()
@@ -582,7 +616,7 @@ def _seperate(state_sets):
     with them in different state sets.
 
     Args:
-        state_sets(list[dict[(str, str), str]]):
+        state_sets(iterable[dict[(str, str), str]]):
             List of dicts of (type, state_key) -> event_id, which are the
             different state groups to resolve.
 
@@ -596,10 +630,11 @@ def _seperate(state_sets):
             conflicted_state is a dict mapping (type, state_key) to a set of
             event ids for conflicted state keys.
     """
-    unconflicted_state = dict(state_sets[0])
+    state_set_iterator = iter(state_sets)
+    unconflicted_state = dict(next(state_set_iterator))
     conflicted_state = {}
 
-    for state_set in state_sets[1:]:
+    for state_set in state_set_iterator:
         for key, value in iteritems(state_set):
             # Check if there is an unconflicted entry for the state key.
             unconflicted_value = unconflicted_state.get(key)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index af564b1b4e..6a5028961d 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -21,7 +21,6 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
 from synapse.push.baserules import list_with_base_rules
 from synapse.storage.appservice import ApplicationServiceWorkerStore
 from synapse.storage.pusher import PusherWorkerStore
@@ -250,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
             if uid in local_users_in_room:
                 user_ids.add(uid)
 
-        forgotten = yield self.who_forgot_in_room(
-            event.room_id, on_invalidate=cache_context.invalidate,
-        )
-
-        for row in forgotten:
-            user_id = row["user_id"]
-            event_id = row["event_id"]
-
-            mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
-            if event_id == mem_id:
-                user_ids.discard(user_id)
-
         rules_by_user = yield self.bulk_get_push_rules(
             user_ids, on_invalidate=cache_context.invalidate,
         )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index a27702a7a0..01697ab2c9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -461,18 +461,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
     def _get_joined_hosts_cache(self, room_id):
         return _JoinedHostsCache(self, room_id)
 
-    @cached()
-    def who_forgot_in_room(self, room_id):
-        return self._simple_select_list(
-            table="room_memberships",
-            retcols=("user_id", "event_id"),
-            keyvalues={
-                "room_id": room_id,
-                "forgotten": 1,
-            },
-            desc="who_forgot"
-        )
-
 
 class RoomMemberStore(RoomMemberWorkerStore):
     def __init__(self, db_conn, hs):
@@ -581,9 +569,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
             txn.execute(sql, (user_id, room_id))
 
             txn.call_after(self.did_forget.invalidate, (user_id, room_id))
-            self._invalidate_cache_and_stream(
-                txn, self.who_forgot_in_room, (room_id,)
-            )
         return self.runInteraction("forget_membership", f)
 
     @cachedInlineCallbacks(num_args=2)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index ba0499a022..d4680863d3 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -24,7 +24,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.events.utils import prune_event
 from synapse.types import get_domain_from_id
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 
 logger = logging.getLogger(__name__)
 
@@ -76,19 +75,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
         types=types,
     )
 
-    forgotten = yield make_deferred_yieldable(defer.gatherResults([
-        defer.maybeDeferred(
-            preserve_fn(store.who_forgot_in_room),
-            room_id,
-        )
-        for room_id in frozenset(e.room_id for e in events)
-    ], consumeErrors=True))
-
-    # Set of membership event_ids that have been forgotten
-    event_id_forgotten = frozenset(
-        row["event_id"] for rows in forgotten for row in rows
-    )
-
     ignore_dict_content = yield store.get_global_account_data_by_type_for_user(
         "m.ignored_user_list", user_id,
     )
@@ -177,10 +163,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
         if membership is None:
             membership_event = state.get((EventTypes.Member, user_id), None)
             if membership_event:
-                # XXX why do we do this?
-                # https://github.com/matrix-org/synapse/issues/3350
-                if membership_event.event_id not in event_id_forgotten:
-                    membership = membership_event.membership
+                membership = membership_event.membership
 
         # if the user was a member of the room at the time of the event,
         # they can see it.