diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/events/snapshot.py | 159 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 5 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 4 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 137 | ||||
-rw-r--r-- | synapse/storage/controllers/persist_events.py | 5 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 15 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 2 | ||||
-rw-r--r-- | synapse/util/__init__.py | 5 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 8 |
9 files changed, 259 insertions, 81 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e7e8225b8e..a43498ed4d 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple import attr from immutabledict import immutabledict @@ -107,33 +107,32 @@ class EventContext(UnpersistedEventContextBase): state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None then this is the delta of the state between the two groups. - prev_group: If it is known, ``state_group``'s prev_group. Note that this being - None does not necessarily mean that ``state_group`` does not have - a prev_group! + state_group_deltas: If not empty, this is a dict collecting a mapping of the state + difference between state groups. - If the event is a state event, this is normally the same as - ``state_group_before_event``. + The keys are a tuple of two integers: the initial group and final state group. + The corresponding value is a state map representing the state delta between + these state groups. - If ``state_group`` is None (ie, the event is an outlier), ``prev_group`` - will always also be ``None``. + The dictionary is expected to have at most two entries with state groups of: - Note that this *not* (necessarily) the state group associated with - ``_prev_state_ids``. + 1. The state group before the event and after the event. + 2. The state group preceding the state group before the event and the + state group before the event. - delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` - and ``state_group``. + This information is collected and stored as part of an optimization for persisting + events. partial_state: if True, we may be storing this event with a temporary, incomplete state. """ _storage: "StorageControllers" + state_group_deltas: Dict[Tuple[int, int], StateMap[str]] rejected: Optional[str] = None _state_group: Optional[int] = None state_group_before_event: Optional[int] = None _state_delta_due_to_event: Optional[StateMap[str]] = None - prev_group: Optional[int] = None - delta_ids: Optional[StateMap[str]] = None app_service: Optional[ApplicationService] = None partial_state: bool = False @@ -145,16 +144,14 @@ class EventContext(UnpersistedEventContextBase): state_group_before_event: Optional[int], state_delta_due_to_event: Optional[StateMap[str]], partial_state: bool, - prev_group: Optional[int] = None, - delta_ids: Optional[StateMap[str]] = None, + state_group_deltas: Dict[Tuple[int, int], StateMap[str]], ) -> "EventContext": return EventContext( storage=storage, state_group=state_group, state_group_before_event=state_group_before_event, state_delta_due_to_event=state_delta_due_to_event, - prev_group=prev_group, - delta_ids=delta_ids, + state_group_deltas=state_group_deltas, partial_state=partial_state, ) @@ -163,7 +160,7 @@ class EventContext(UnpersistedEventContextBase): storage: "StorageControllers", ) -> "EventContext": """Return an EventContext instance suitable for persisting an outlier event""" - return EventContext(storage=storage) + return EventContext(storage=storage, state_group_deltas={}) async def persist(self, event: EventBase) -> "EventContext": return self @@ -183,13 +180,15 @@ class EventContext(UnpersistedEventContextBase): "state_group": self._state_group, "state_group_before_event": self.state_group_before_event, "rejected": self.rejected, - "prev_group": self.prev_group, + "state_group_deltas": _encode_state_group_delta(self.state_group_deltas), "state_delta_due_to_event": _encode_state_dict( self._state_delta_due_to_event ), - "delta_ids": _encode_state_dict(self.delta_ids), "app_service_id": self.app_service.id if self.app_service else None, "partial_state": self.partial_state, + # add dummy delta_ids and prev_group for backwards compatibility + "delta_ids": None, + "prev_group": None, } @staticmethod @@ -204,17 +203,24 @@ class EventContext(UnpersistedEventContextBase): Returns: The event context. """ + # workaround for backwards/forwards compatibility: if the input doesn't have a value + # for "state_group_deltas" just assign an empty dict + state_group_deltas = input.get("state_group_deltas", None) + if state_group_deltas: + state_group_deltas = _decode_state_group_delta(state_group_deltas) + else: + state_group_deltas = {} + context = EventContext( # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. storage=storage, state_group=input["state_group"], state_group_before_event=input["state_group_before_event"], - prev_group=input["prev_group"], + state_group_deltas=state_group_deltas, state_delta_due_to_event=_decode_state_dict( input["state_delta_due_to_event"] ), - delta_ids=_decode_state_dict(input["delta_ids"]), rejected=input["rejected"], partial_state=input.get("partial_state", False), ) @@ -349,7 +355,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase): _storage: "StorageControllers" state_group_before_event: Optional[int] state_group_after_event: Optional[int] - state_delta_due_to_event: Optional[dict] + state_delta_due_to_event: Optional[StateMap[str]] prev_group_for_state_group_before_event: Optional[int] delta_ids_to_state_group_before_event: Optional[StateMap[str]] partial_state: bool @@ -380,26 +386,16 @@ class UnpersistedEventContext(UnpersistedEventContextBase): events_and_persisted_context = [] for event, unpersisted_context in amended_events_and_context: - if event.is_state(): - context = EventContext( - storage=unpersisted_context._storage, - state_group=unpersisted_context.state_group_after_event, - state_group_before_event=unpersisted_context.state_group_before_event, - state_delta_due_to_event=unpersisted_context.state_delta_due_to_event, - partial_state=unpersisted_context.partial_state, - prev_group=unpersisted_context.state_group_before_event, - delta_ids=unpersisted_context.state_delta_due_to_event, - ) - else: - context = EventContext( - storage=unpersisted_context._storage, - state_group=unpersisted_context.state_group_after_event, - state_group_before_event=unpersisted_context.state_group_before_event, - state_delta_due_to_event=unpersisted_context.state_delta_due_to_event, - partial_state=unpersisted_context.partial_state, - prev_group=unpersisted_context.prev_group_for_state_group_before_event, - delta_ids=unpersisted_context.delta_ids_to_state_group_before_event, - ) + state_group_deltas = unpersisted_context._build_state_group_deltas() + + context = EventContext( + storage=unpersisted_context._storage, + state_group=unpersisted_context.state_group_after_event, + state_group_before_event=unpersisted_context.state_group_before_event, + state_delta_due_to_event=unpersisted_context.state_delta_due_to_event, + partial_state=unpersisted_context.partial_state, + state_group_deltas=state_group_deltas, + ) events_and_persisted_context.append((event, context)) return events_and_persisted_context @@ -452,11 +448,11 @@ class UnpersistedEventContext(UnpersistedEventContextBase): # if the event isn't a state event the state group doesn't change if not self.state_delta_due_to_event: - state_group_after_event = self.state_group_before_event + self.state_group_after_event = self.state_group_before_event # otherwise if it is a state event we need to get a state group for it else: - state_group_after_event = await self._storage.state.store_state_group( + self.state_group_after_event = await self._storage.state.store_state_group( event.event_id, event.room_id, prev_group=self.state_group_before_event, @@ -464,16 +460,81 @@ class UnpersistedEventContext(UnpersistedEventContextBase): current_state_ids=None, ) + state_group_deltas = self._build_state_group_deltas() + return EventContext.with_state( storage=self._storage, - state_group=state_group_after_event, + state_group=self.state_group_after_event, state_group_before_event=self.state_group_before_event, state_delta_due_to_event=self.state_delta_due_to_event, + state_group_deltas=state_group_deltas, partial_state=self.partial_state, - prev_group=self.state_group_before_event, - delta_ids=self.state_delta_due_to_event, ) + def _build_state_group_deltas(self) -> Dict[Tuple[int, int], StateMap]: + """ + Collect deltas between the state groups associated with this context + """ + state_group_deltas = {} + + # if we know the state group before the event and after the event, add them and the + # state delta between them to state_group_deltas + if self.state_group_before_event and self.state_group_after_event: + # if we have the state groups we should have the delta + assert self.state_delta_due_to_event is not None + state_group_deltas[ + ( + self.state_group_before_event, + self.state_group_after_event, + ) + ] = self.state_delta_due_to_event + + # the state group before the event may also have a state group which precedes it, if + # we have that and the state group before the event, add them and the state + # delta between them to state_group_deltas + if ( + self.prev_group_for_state_group_before_event + and self.state_group_before_event + ): + # if we have both state groups we should have the delta between them + assert self.delta_ids_to_state_group_before_event is not None + state_group_deltas[ + ( + self.prev_group_for_state_group_before_event, + self.state_group_before_event, + ) + ] = self.delta_ids_to_state_group_before_event + + return state_group_deltas + + +def _encode_state_group_delta( + state_group_delta: Dict[Tuple[int, int], StateMap[str]] +) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]: + if not state_group_delta: + return [] + + state_group_delta_encoded = [] + for key, value in state_group_delta.items(): + state_group_delta_encoded.append((key[0], key[1], _encode_state_dict(value))) + + return state_group_delta_encoded + + +def _decode_state_group_delta( + input: List[Tuple[int, int, List[Tuple[str, str, str]]]] +) -> Dict[Tuple[int, int], StateMap[str]]: + if not input: + return {} + + state_group_deltas = {} + for state_group_1, state_group_2, state_dict in input: + state_map = _decode_state_dict(state_dict) + assert state_map is not None + state_group_deltas[(state_group_1, state_group_2)] = state_map + + return state_group_deltas + def _encode_state_dict( state_dict: Optional[StateMap[str]], diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index a2cf3a96c6..e5359ca558 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -260,7 +260,9 @@ class FederationClient(FederationBase): use_unstable = False for user_id, one_time_keys in query.items(): for device_id, algorithms in one_time_keys.items(): - if any(count > 1 for count in algorithms.values()): + # If more than one algorithm is requested, attempt to use the unstable + # endpoint. + if sum(algorithms.values()) > 1: use_unstable = True if algorithms: # For the stable query, choose only the first algorithm. @@ -296,6 +298,7 @@ class FederationClient(FederationBase): else: logger.debug("Skipping unstable claim client keys API") + # TODO Potentially attempt multiple queries and combine the results? return await self.transport_layer.claim_client_keys( user, destination, content, timeout ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9425b32507..61fa3b30af 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1016,7 +1016,9 @@ class FederationServer(FederationBase): for user_id, device_keys in result.items(): for device_id, keys in device_keys.items(): for key_id, key in keys.items(): - json_result.setdefault(user_id, {})[device_id] = {key_id: key} + json_result.setdefault(user_id, {}).setdefault(device_id, {})[ + key_id + ] = key logger.info( "Claimed one-time-keys: %s", diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index d5257acb7d..19b8728db9 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -40,6 +40,11 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# How many single event gaps we tolerate returning in a `/messages` response before we +# backfill and try to fill in the history. This is an arbitrarily picked number so feel +# free to tune it in the future. +BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 + @attr.s(slots=True, auto_attribs=True) class PurgeStatus: @@ -486,35 +491,35 @@ class PaginationHandler: room_id, room_token.stream ) - if not use_admin_priviledge and membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - - # This is only None if the room is world_readable, in which - # case "JOIN" would have been returned. - assert member_event_id + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + if ( + pagin_config.direction == Direction.BACKWARDS + and not use_admin_priviledge + and membership == Membership.LEAVE + ): + # This is only None if the room is world_readable, in which case + # "Membership.JOIN" would have been returned and we should never hit + # this branch. + assert member_event_id + + leave_token = await self.store.get_topological_token_for_event( + member_event_id + ) + assert leave_token.topological is not None - leave_token = await self.store.get_topological_token_for_event( - member_event_id + if leave_token.topological < curr_topo: + from_token = from_token.copy_and_replace( + StreamKeyType.ROOM, leave_token ) - assert leave_token.topological is not None - - if leave_token.topological < curr_topo: - from_token = from_token.copy_and_replace( - StreamKeyType.ROOM, leave_token - ) - - await self.hs.get_federation_handler().maybe_backfill( - room_id, - curr_topo, - limit=pagin_config.limit, - ) to_room_key = None if pagin_config.to_token: to_room_key = pagin_config.to_token.room_key + # Initially fetch the events from the database. With any luck, we can return + # these without blocking on backfill (handled below). events, next_key = await self.store.paginate_room_events( room_id=room_id, from_key=from_token.room_key, @@ -524,6 +529,94 @@ class PaginationHandler: event_filter=event_filter, ) + if pagin_config.direction == Direction.BACKWARDS: + # We use a `Set` because there can be multiple events at a given depth + # and we only care about looking at the unique continum of depths to + # find gaps. + event_depths: Set[int] = {event.depth for event in events} + sorted_event_depths = sorted(event_depths) + + # Inspect the depths of the returned events to see if there are any gaps + found_big_gap = False + number_of_gaps = 0 + previous_event_depth = ( + sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0 + ) + for event_depth in sorted_event_depths: + # We don't expect a negative depth but we'll just deal with it in + # any case by taking the absolute value to get the true gap between + # any two integers. + depth_gap = abs(event_depth - previous_event_depth) + # A `depth_gap` of 1 is a normal continuous chain to the next event + # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's + # also possible there is no event at a given depth but we can't ever + # know that for sure) + if depth_gap > 1: + number_of_gaps += 1 + + # We only tolerate a small number single-event long gaps in the + # returned events because those are most likely just events we've + # failed to pull in the past. Anything longer than that is probably + # a sign that we're missing a decent chunk of history and we should + # try to backfill it. + # + # XXX: It's possible we could tolerate longer gaps if we checked + # that a given events `prev_events` is one that has failed pull + # attempts and we could just treat it like a dead branch of history + # for now or at least something that we don't need the block the + # client on to try pulling. + # + # XXX: If we had something like MSC3871 to indicate gaps in the + # timeline to the client, we could also get away with any sized gap + # and just have the client refetch the holes as they see fit. + if depth_gap > 2: + found_big_gap = True + break + previous_event_depth = event_depth + + # Backfill in the foreground if we found a big gap, have too many holes, + # or we don't have enough events to fill the limit that the client asked + # for. + missing_too_many_events = ( + number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD + ) + not_enough_events_to_fill_response = len(events) < pagin_config.limit + if ( + found_big_gap + or missing_too_many_events + or not_enough_events_to_fill_response + ): + did_backfill = ( + await self.hs.get_federation_handler().maybe_backfill( + room_id, + curr_topo, + limit=pagin_config.limit, + ) + ) + + # If we did backfill something, refetch the events from the database to + # catch anything new that might have been added since we last fetched. + if did_backfill: + events, next_key = await self.store.paginate_room_events( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) + else: + # Otherwise, we can backfill in the background for eventual + # consistency's sake but we don't need to block the client waiting + # for a costly federation call and processing. + run_as_background_process( + "maybe_backfill_in_the_background", + self.hs.get_federation_handler().maybe_backfill, + room_id, + curr_topo, + limit=pagin_config.limit, + ) + next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) # if no events are returned from pagination, that implies diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f1d2c71c91..35c0680365 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -839,9 +839,8 @@ class EventsPersistenceStorageController: "group" % (ev.event_id,) ) continue - - if ctx.prev_group: - state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids + if ctx.state_group_deltas: + state_group_deltas.update(ctx.state_group_deltas) # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index e2e6eb479f..44af3357af 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1729,13 +1729,22 @@ class PersistEventsStore: if not row["rejects"] and not row["redacts"]: to_prefill.append(EventCacheEntry(event=event, redacted_event=None)) - async def prefill() -> None: + async def external_prefill() -> None: for cache_entry in to_prefill: - await self.store._get_event_cache.set( + await self.store._get_event_cache.set_external( (cache_entry.event.event_id,), cache_entry ) - txn.async_call_after(prefill) + def local_prefill() -> None: + for cache_entry in to_prefill: + self.store._get_event_cache.set_local( + (cache_entry.event.event_id,), cache_entry + ) + + # The order these are called here is not as important as knowing that after the + # transaction is finished, the async_call_after will run before the call_after. + txn.async_call_after(external_prefill) + txn.call_after(local_prefill) def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: assert event.redacts is not None diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d93ffc4efa..7e7648c951 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -883,7 +883,7 @@ class EventsWorkerStore(SQLBaseStore): async def _invalidate_async_get_event_cache(self, event_id: str) -> None: """ - Invalidates an event in the asyncronous get event cache, which may be remote. + Invalidates an event in the asynchronous get event cache, which may be remote. Arguments: event_id: the event ID to invalidate diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 7ea0c4c36b..9f3b8741c1 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -116,6 +116,11 @@ class Clock: Waits `msec` initially before calling `f` for the first time. + If the function given to `looping_call` returns an awaitable/deferred, the next + call isn't scheduled until after the returned awaitable has finished. We get + this functionality thanks to this function being a thin wrapper around + `twisted.internet.task.LoopingCall`. + Note that the function will be called with no logcontext, so if it is anything other than trivial, you probably want to wrap it in run_as_background_process. diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 6137c85e10..be6554319a 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -842,7 +842,13 @@ class AsyncLruCache(Generic[KT, VT]): return self._lru_cache.get(key, update_metrics=update_metrics) async def set(self, key: KT, value: VT) -> None: - self._lru_cache.set(key, value) + # This will add the entries in the correct order, local first external second + self.set_local(key, value) + await self.set_external(key, value) + + async def set_external(self, key: KT, value: VT) -> None: + # This method should add an entry to any configured external cache, in this case noop. + pass def set_local(self, key: KT, value: VT) -> None: self._lru_cache.set(key, value) |