summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/partial_state_events_tracker.py120
1 files changed, 120 insertions, 0 deletions
diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py
new file mode 100644
index 0000000000..a61a951ef0
--- /dev/null
+++ b/synapse/storage/util/partial_state_events_tracker.py
@@ -0,0 +1,120 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from collections import defaultdict
+from typing import Collection, Dict, Set
+
+from twisted.internet import defer
+from twisted.internet.defer import Deferred
+
+from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
+from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.util import unwrapFirstError
+
+logger = logging.getLogger(__name__)
+
+
+class PartialStateEventsTracker:
+    """Keeps track of which events have partial state, after a partial-state join"""
+
+    def __init__(self, store: EventsWorkerStore):
+        self._store = store
+        # a map from event id to a set of Deferreds which are waiting for that event to be
+        # un-partial-stated.
+        self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)
+
+    def notify_un_partial_stated(self, event_id: str) -> None:
+        """Notify that we now have full state for a given event
+
+        Called by the state-resynchronization loop whenever we resynchronize the state
+        for a particular event. Unblocks any callers to await_full_state() for that
+        event.
+
+        Args:
+            event_id: the event that now has full state.
+        """
+        observers = self._observers.pop(event_id, None)
+        if not observers:
+            return
+        logger.info(
+            "Notifying %i things waiting for un-partial-stating of event %s",
+            len(observers),
+            event_id,
+        )
+        with PreserveLoggingContext():
+            for o in observers:
+                o.callback(None)
+
+    async def await_full_state(self, event_ids: Collection[str]) -> None:
+        """Wait for all the given events to have full state.
+
+        Args:
+            event_ids: the list of event ids that we want full state for
+        """
+        # first try the happy path: if there are no partial-state events, we can return
+        # quickly
+        partial_state_event_ids = [
+            ev
+            for ev, p in (await self._store.get_partial_state_events(event_ids)).items()
+            if p
+        ]
+
+        if not partial_state_event_ids:
+            return
+
+        logger.info(
+            "Awaiting un-partial-stating of events %s",
+            partial_state_event_ids,
+            stack_info=True,
+        )
+
+        # create an observer for each lazy-joined event
+        observers: Dict[str, Deferred[None]] = {
+            event_id: Deferred() for event_id in partial_state_event_ids
+        }
+        for event_id, observer in observers.items():
+            self._observers[event_id].add(observer)
+
+        try:
+            # some of them may have been un-lazy-joined between us checking the db and
+            # registering the observer, in which case we'd wait forever for the
+            # notification. Call back the observers now.
+            for event_id, partial in (
+                await self._store.get_partial_state_events(observers.keys())
+            ).items():
+                # there may have been a call to notify_un_partial_stated during the
+                # db query, so the observers may already have been called.
+                if not partial and not observers[event_id].called:
+                    observers[event_id].callback(None)
+
+            await make_deferred_yieldable(
+                defer.gatherResults(
+                    observers.values(),
+                    consumeErrors=True,
+                )
+            ).addErrback(unwrapFirstError)
+            logger.info("Events %s all un-partial-stated", observers.keys())
+        finally:
+            # remove any observers we created. This should happen when the notification
+            # is received, but that might not happen for two reasons:
+            #   (a) we're bailing out early on an exception (including us being
+            #       cancelled during the await)
+            #   (b) the event got de-lazy-joined before we set up the observer.
+            for event_id, observer in observers.items():
+                observer_set = self._observers.get(event_id)
+                if observer_set:
+                    observer_set.discard(observer)
+                    if not observer_set:
+                        del self._observers[event_id]