summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/events/snapshot.py159
-rw-r--r--synapse/federation/federation_client.py5
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/handlers/pagination.py137
-rw-r--r--synapse/storage/controllers/persist_events.py5
-rw-r--r--synapse/storage/databases/main/events.py15
-rw-r--r--synapse/storage/databases/main/events_worker.py2
-rw-r--r--synapse/util/__init__.py5
-rw-r--r--synapse/util/caches/lrucache.py8
9 files changed, 259 insertions, 81 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e7e8225b8e..a43498ed4d 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
 import attr
 from immutabledict import immutabledict
@@ -107,33 +107,32 @@ class EventContext(UnpersistedEventContextBase):
         state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None
             then this is the delta of the state between the two groups.
 
-        prev_group: If it is known, ``state_group``'s prev_group. Note that this being
-            None does not necessarily mean that ``state_group`` does not have
-            a prev_group!
+        state_group_deltas: If not empty, this is a dict collecting a mapping of the state
+            difference between state groups.
 
-            If the event is a state event, this is normally the same as
-            ``state_group_before_event``.
+            The keys are a tuple of two integers: the initial group and final state group.
+            The corresponding value is a state map representing the state delta between
+            these state groups.
 
-            If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
-            will always also be ``None``.
+            The dictionary is expected to have at most two entries with state groups of:
 
-            Note that this *not* (necessarily) the state group associated with
-            ``_prev_state_ids``.
+            1. The state group before the event and after the event.
+            2. The state group preceding the state group before the event and the
+               state group before the event.
 
-        delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
-            and ``state_group``.
+            This information is collected and stored as part of an optimization for persisting
+            events.
 
         partial_state: if True, we may be storing this event with a temporary,
             incomplete state.
     """
 
     _storage: "StorageControllers"
+    state_group_deltas: Dict[Tuple[int, int], StateMap[str]]
     rejected: Optional[str] = None
     _state_group: Optional[int] = None
     state_group_before_event: Optional[int] = None
     _state_delta_due_to_event: Optional[StateMap[str]] = None
-    prev_group: Optional[int] = None
-    delta_ids: Optional[StateMap[str]] = None
     app_service: Optional[ApplicationService] = None
 
     partial_state: bool = False
@@ -145,16 +144,14 @@ class EventContext(UnpersistedEventContextBase):
         state_group_before_event: Optional[int],
         state_delta_due_to_event: Optional[StateMap[str]],
         partial_state: bool,
-        prev_group: Optional[int] = None,
-        delta_ids: Optional[StateMap[str]] = None,
+        state_group_deltas: Dict[Tuple[int, int], StateMap[str]],
     ) -> "EventContext":
         return EventContext(
             storage=storage,
             state_group=state_group,
             state_group_before_event=state_group_before_event,
             state_delta_due_to_event=state_delta_due_to_event,
-            prev_group=prev_group,
-            delta_ids=delta_ids,
+            state_group_deltas=state_group_deltas,
             partial_state=partial_state,
         )
 
@@ -163,7 +160,7 @@ class EventContext(UnpersistedEventContextBase):
         storage: "StorageControllers",
     ) -> "EventContext":
         """Return an EventContext instance suitable for persisting an outlier event"""
-        return EventContext(storage=storage)
+        return EventContext(storage=storage, state_group_deltas={})
 
     async def persist(self, event: EventBase) -> "EventContext":
         return self
@@ -183,13 +180,15 @@ class EventContext(UnpersistedEventContextBase):
             "state_group": self._state_group,
             "state_group_before_event": self.state_group_before_event,
             "rejected": self.rejected,
-            "prev_group": self.prev_group,
+            "state_group_deltas": _encode_state_group_delta(self.state_group_deltas),
             "state_delta_due_to_event": _encode_state_dict(
                 self._state_delta_due_to_event
             ),
-            "delta_ids": _encode_state_dict(self.delta_ids),
             "app_service_id": self.app_service.id if self.app_service else None,
             "partial_state": self.partial_state,
+            # add dummy delta_ids and prev_group for backwards compatibility
+            "delta_ids": None,
+            "prev_group": None,
         }
 
     @staticmethod
@@ -204,17 +203,24 @@ class EventContext(UnpersistedEventContextBase):
         Returns:
             The event context.
         """
+        # workaround for backwards/forwards compatibility: if the input doesn't have a value
+        # for "state_group_deltas" just assign an empty dict
+        state_group_deltas = input.get("state_group_deltas", None)
+        if state_group_deltas:
+            state_group_deltas = _decode_state_group_delta(state_group_deltas)
+        else:
+            state_group_deltas = {}
+
         context = EventContext(
             # We use the state_group and prev_state_id stuff to pull the
             # current_state_ids out of the DB and construct prev_state_ids.
             storage=storage,
             state_group=input["state_group"],
             state_group_before_event=input["state_group_before_event"],
-            prev_group=input["prev_group"],
+            state_group_deltas=state_group_deltas,
             state_delta_due_to_event=_decode_state_dict(
                 input["state_delta_due_to_event"]
             ),
-            delta_ids=_decode_state_dict(input["delta_ids"]),
             rejected=input["rejected"],
             partial_state=input.get("partial_state", False),
         )
@@ -349,7 +355,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
     _storage: "StorageControllers"
     state_group_before_event: Optional[int]
     state_group_after_event: Optional[int]
-    state_delta_due_to_event: Optional[dict]
+    state_delta_due_to_event: Optional[StateMap[str]]
     prev_group_for_state_group_before_event: Optional[int]
     delta_ids_to_state_group_before_event: Optional[StateMap[str]]
     partial_state: bool
@@ -380,26 +386,16 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
 
         events_and_persisted_context = []
         for event, unpersisted_context in amended_events_and_context:
-            if event.is_state():
-                context = EventContext(
-                    storage=unpersisted_context._storage,
-                    state_group=unpersisted_context.state_group_after_event,
-                    state_group_before_event=unpersisted_context.state_group_before_event,
-                    state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
-                    partial_state=unpersisted_context.partial_state,
-                    prev_group=unpersisted_context.state_group_before_event,
-                    delta_ids=unpersisted_context.state_delta_due_to_event,
-                )
-            else:
-                context = EventContext(
-                    storage=unpersisted_context._storage,
-                    state_group=unpersisted_context.state_group_after_event,
-                    state_group_before_event=unpersisted_context.state_group_before_event,
-                    state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
-                    partial_state=unpersisted_context.partial_state,
-                    prev_group=unpersisted_context.prev_group_for_state_group_before_event,
-                    delta_ids=unpersisted_context.delta_ids_to_state_group_before_event,
-                )
+            state_group_deltas = unpersisted_context._build_state_group_deltas()
+
+            context = EventContext(
+                storage=unpersisted_context._storage,
+                state_group=unpersisted_context.state_group_after_event,
+                state_group_before_event=unpersisted_context.state_group_before_event,
+                state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
+                partial_state=unpersisted_context.partial_state,
+                state_group_deltas=state_group_deltas,
+            )
             events_and_persisted_context.append((event, context))
         return events_and_persisted_context
 
@@ -452,11 +448,11 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
 
         # if the event isn't a state event the state group doesn't change
         if not self.state_delta_due_to_event:
-            state_group_after_event = self.state_group_before_event
+            self.state_group_after_event = self.state_group_before_event
 
         # otherwise if it is a state event we need to get a state group for it
         else:
-            state_group_after_event = await self._storage.state.store_state_group(
+            self.state_group_after_event = await self._storage.state.store_state_group(
                 event.event_id,
                 event.room_id,
                 prev_group=self.state_group_before_event,
@@ -464,16 +460,81 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
                 current_state_ids=None,
             )
 
+        state_group_deltas = self._build_state_group_deltas()
+
         return EventContext.with_state(
             storage=self._storage,
-            state_group=state_group_after_event,
+            state_group=self.state_group_after_event,
             state_group_before_event=self.state_group_before_event,
             state_delta_due_to_event=self.state_delta_due_to_event,
+            state_group_deltas=state_group_deltas,
             partial_state=self.partial_state,
-            prev_group=self.state_group_before_event,
-            delta_ids=self.state_delta_due_to_event,
         )
 
+    def _build_state_group_deltas(self) -> Dict[Tuple[int, int], StateMap]:
+        """
+        Collect deltas between the state groups associated with this context
+        """
+        state_group_deltas = {}
+
+        # if we know the state group before the event and after the event, add them and the
+        # state delta between them to state_group_deltas
+        if self.state_group_before_event and self.state_group_after_event:
+            # if we have the state groups we should have the delta
+            assert self.state_delta_due_to_event is not None
+            state_group_deltas[
+                (
+                    self.state_group_before_event,
+                    self.state_group_after_event,
+                )
+            ] = self.state_delta_due_to_event
+
+        # the state group before the event may also have a state group which precedes it, if
+        # we have that and the state group before the event, add them and the state
+        # delta between them to state_group_deltas
+        if (
+            self.prev_group_for_state_group_before_event
+            and self.state_group_before_event
+        ):
+            # if we have both state groups we should have the delta between them
+            assert self.delta_ids_to_state_group_before_event is not None
+            state_group_deltas[
+                (
+                    self.prev_group_for_state_group_before_event,
+                    self.state_group_before_event,
+                )
+            ] = self.delta_ids_to_state_group_before_event
+
+        return state_group_deltas
+
+
+def _encode_state_group_delta(
+    state_group_delta: Dict[Tuple[int, int], StateMap[str]]
+) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]:
+    if not state_group_delta:
+        return []
+
+    state_group_delta_encoded = []
+    for key, value in state_group_delta.items():
+        state_group_delta_encoded.append((key[0], key[1], _encode_state_dict(value)))
+
+    return state_group_delta_encoded
+
+
+def _decode_state_group_delta(
+    input: List[Tuple[int, int, List[Tuple[str, str, str]]]]
+) -> Dict[Tuple[int, int], StateMap[str]]:
+    if not input:
+        return {}
+
+    state_group_deltas = {}
+    for state_group_1, state_group_2, state_dict in input:
+        state_map = _decode_state_dict(state_dict)
+        assert state_map is not None
+        state_group_deltas[(state_group_1, state_group_2)] = state_map
+
+    return state_group_deltas
+
 
 def _encode_state_dict(
     state_dict: Optional[StateMap[str]],
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a2cf3a96c6..e5359ca558 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -260,7 +260,9 @@ class FederationClient(FederationBase):
         use_unstable = False
         for user_id, one_time_keys in query.items():
             for device_id, algorithms in one_time_keys.items():
-                if any(count > 1 for count in algorithms.values()):
+                # If more than one algorithm is requested, attempt to use the unstable
+                # endpoint.
+                if sum(algorithms.values()) > 1:
                     use_unstable = True
                 if algorithms:
                     # For the stable query, choose only the first algorithm.
@@ -296,6 +298,7 @@ class FederationClient(FederationBase):
         else:
             logger.debug("Skipping unstable claim client keys API")
 
+        # TODO Potentially attempt multiple queries and combine the results?
         return await self.transport_layer.claim_client_keys(
             user, destination, content, timeout
         )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9425b32507..61fa3b30af 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1016,7 +1016,9 @@ class FederationServer(FederationBase):
             for user_id, device_keys in result.items():
                 for device_id, keys in device_keys.items():
                     for key_id, key in keys.items():
-                        json_result.setdefault(user_id, {})[device_id] = {key_id: key}
+                        json_result.setdefault(user_id, {}).setdefault(device_id, {})[
+                            key_id
+                        ] = key
 
         logger.info(
             "Claimed one-time-keys: %s",
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d5257acb7d..19b8728db9 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -40,6 +40,11 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+# How many single event gaps we tolerate returning in a `/messages` response before we
+# backfill and try to fill in the history. This is an arbitrarily picked number so feel
+# free to tune it in the future.
+BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
+
 
 @attr.s(slots=True, auto_attribs=True)
 class PurgeStatus:
@@ -486,35 +491,35 @@ class PaginationHandler:
                         room_id, room_token.stream
                     )
 
-                if not use_admin_priviledge and membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-
-                    # This is only None if the room is world_readable, in which
-                    # case "JOIN" would have been returned.
-                    assert member_event_id
+            # If they have left the room then clamp the token to be before
+            # they left the room, to save the effort of loading from the
+            # database.
+            if (
+                pagin_config.direction == Direction.BACKWARDS
+                and not use_admin_priviledge
+                and membership == Membership.LEAVE
+            ):
+                # This is only None if the room is world_readable, in which case
+                # "Membership.JOIN" would have been returned and we should never hit
+                # this branch.
+                assert member_event_id
+
+                leave_token = await self.store.get_topological_token_for_event(
+                    member_event_id
+                )
+                assert leave_token.topological is not None
 
-                    leave_token = await self.store.get_topological_token_for_event(
-                        member_event_id
+                if leave_token.topological < curr_topo:
+                    from_token = from_token.copy_and_replace(
+                        StreamKeyType.ROOM, leave_token
                     )
-                    assert leave_token.topological is not None
-
-                    if leave_token.topological < curr_topo:
-                        from_token = from_token.copy_and_replace(
-                            StreamKeyType.ROOM, leave_token
-                        )
-
-                await self.hs.get_federation_handler().maybe_backfill(
-                    room_id,
-                    curr_topo,
-                    limit=pagin_config.limit,
-                )
 
             to_room_key = None
             if pagin_config.to_token:
                 to_room_key = pagin_config.to_token.room_key
 
+            # Initially fetch the events from the database. With any luck, we can return
+            # these without blocking on backfill (handled below).
             events, next_key = await self.store.paginate_room_events(
                 room_id=room_id,
                 from_key=from_token.room_key,
@@ -524,6 +529,94 @@ class PaginationHandler:
                 event_filter=event_filter,
             )
 
+            if pagin_config.direction == Direction.BACKWARDS:
+                # We use a `Set` because there can be multiple events at a given depth
+                # and we only care about looking at the unique continum of depths to
+                # find gaps.
+                event_depths: Set[int] = {event.depth for event in events}
+                sorted_event_depths = sorted(event_depths)
+
+                # Inspect the depths of the returned events to see if there are any gaps
+                found_big_gap = False
+                number_of_gaps = 0
+                previous_event_depth = (
+                    sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
+                )
+                for event_depth in sorted_event_depths:
+                    # We don't expect a negative depth but we'll just deal with it in
+                    # any case by taking the absolute value to get the true gap between
+                    # any two integers.
+                    depth_gap = abs(event_depth - previous_event_depth)
+                    # A `depth_gap` of 1 is a normal continuous chain to the next event
+                    # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
+                    # also possible there is no event at a given depth but we can't ever
+                    # know that for sure)
+                    if depth_gap > 1:
+                        number_of_gaps += 1
+
+                    # We only tolerate a small number single-event long gaps in the
+                    # returned events because those are most likely just events we've
+                    # failed to pull in the past. Anything longer than that is probably
+                    # a sign that we're missing a decent chunk of history and we should
+                    # try to backfill it.
+                    #
+                    # XXX: It's possible we could tolerate longer gaps if we checked
+                    # that a given events `prev_events` is one that has failed pull
+                    # attempts and we could just treat it like a dead branch of history
+                    # for now or at least something that we don't need the block the
+                    # client on to try pulling.
+                    #
+                    # XXX: If we had something like MSC3871 to indicate gaps in the
+                    # timeline to the client, we could also get away with any sized gap
+                    # and just have the client refetch the holes as they see fit.
+                    if depth_gap > 2:
+                        found_big_gap = True
+                        break
+                    previous_event_depth = event_depth
+
+                # Backfill in the foreground if we found a big gap, have too many holes,
+                # or we don't have enough events to fill the limit that the client asked
+                # for.
+                missing_too_many_events = (
+                    number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+                )
+                not_enough_events_to_fill_response = len(events) < pagin_config.limit
+                if (
+                    found_big_gap
+                    or missing_too_many_events
+                    or not_enough_events_to_fill_response
+                ):
+                    did_backfill = (
+                        await self.hs.get_federation_handler().maybe_backfill(
+                            room_id,
+                            curr_topo,
+                            limit=pagin_config.limit,
+                        )
+                    )
+
+                    # If we did backfill something, refetch the events from the database to
+                    # catch anything new that might have been added since we last fetched.
+                    if did_backfill:
+                        events, next_key = await self.store.paginate_room_events(
+                            room_id=room_id,
+                            from_key=from_token.room_key,
+                            to_key=to_room_key,
+                            direction=pagin_config.direction,
+                            limit=pagin_config.limit,
+                            event_filter=event_filter,
+                        )
+                else:
+                    # Otherwise, we can backfill in the background for eventual
+                    # consistency's sake but we don't need to block the client waiting
+                    # for a costly federation call and processing.
+                    run_as_background_process(
+                        "maybe_backfill_in_the_background",
+                        self.hs.get_federation_handler().maybe_backfill,
+                        room_id,
+                        curr_topo,
+                        limit=pagin_config.limit,
+                    )
+
             next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
 
         # if no events are returned from pagination, that implies
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f1d2c71c91..35c0680365 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -839,9 +839,8 @@ class EventsPersistenceStorageController:
                         "group" % (ev.event_id,)
                     )
                 continue
-
-            if ctx.prev_group:
-                state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
+            if ctx.state_group_deltas:
+                state_group_deltas.update(ctx.state_group_deltas)
 
         # We need to map the event_ids to their state groups. First, let's
         # check if the event is one we're persisting, in which case we can
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e2e6eb479f..44af3357af 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1729,13 +1729,22 @@ class PersistEventsStore:
             if not row["rejects"] and not row["redacts"]:
                 to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
 
-        async def prefill() -> None:
+        async def external_prefill() -> None:
             for cache_entry in to_prefill:
-                await self.store._get_event_cache.set(
+                await self.store._get_event_cache.set_external(
                     (cache_entry.event.event_id,), cache_entry
                 )
 
-        txn.async_call_after(prefill)
+        def local_prefill() -> None:
+            for cache_entry in to_prefill:
+                self.store._get_event_cache.set_local(
+                    (cache_entry.event.event_id,), cache_entry
+                )
+
+        # The order these are called here is not as important as knowing that after the
+        # transaction is finished, the async_call_after will run before the call_after.
+        txn.async_call_after(external_prefill)
+        txn.call_after(local_prefill)
 
     def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
         assert event.redacts is not None
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d93ffc4efa..7e7648c951 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -883,7 +883,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
         """
-        Invalidates an event in the asyncronous get event cache, which may be remote.
+        Invalidates an event in the asynchronous get event cache, which may be remote.
 
         Arguments:
             event_id: the event ID to invalidate
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 7ea0c4c36b..9f3b8741c1 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -116,6 +116,11 @@ class Clock:
 
         Waits `msec` initially before calling `f` for the first time.
 
+        If the function given to `looping_call` returns an awaitable/deferred, the next
+        call isn't scheduled until after the returned awaitable has finished. We get
+        this functionality thanks to this function being a thin wrapper around
+        `twisted.internet.task.LoopingCall`.
+
         Note that the function will be called with no logcontext, so if it is anything
         other than trivial, you probably want to wrap it in run_as_background_process.
 
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 6137c85e10..be6554319a 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -842,7 +842,13 @@ class AsyncLruCache(Generic[KT, VT]):
         return self._lru_cache.get(key, update_metrics=update_metrics)
 
     async def set(self, key: KT, value: VT) -> None:
-        self._lru_cache.set(key, value)
+        # This will add the entries in the correct order, local first external second
+        self.set_local(key, value)
+        await self.set_external(key, value)
+
+    async def set_external(self, key: KT, value: VT) -> None:
+        # This method should add an entry to any configured external cache, in this case noop.
+        pass
 
     def set_local(self, key: KT, value: VT) -> None:
         self._lru_cache.set(key, value)