diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index cf98b0ab48..33ffef521b 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -45,8 +45,14 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
-from synapse.logging import opentracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
+from synapse.logging.opentracing import (
+ SynapseTags,
+ active_span,
+ set_tag,
+ start_active_span_follows_from,
+ trace,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
@@ -198,9 +204,8 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
process to to so, calling the per_item_callback for each item.
Args:
- room_id (str):
- task (_EventPersistQueueTask): A _PersistEventsTask or
- _UpdateCurrentStateTask to process.
+ room_id:
+ task: A _PersistEventsTask or _UpdateCurrentStateTask to process.
Returns:
the result returned by the `_per_item_callback` passed to
@@ -223,7 +228,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue.append(end_item)
# also add our active opentracing span to the item so that we get a link back
- span = opentracing.active_span()
+ span = active_span()
if span:
end_item.parent_opentracing_span_contexts.append(span.context)
@@ -234,7 +239,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
res = await make_deferred_yieldable(end_item.deferred.observe())
# add another opentracing span which links to the persist trace.
- with opentracing.start_active_span_follows_from(
+ with start_active_span_follows_from(
f"{task.name}_complete", (end_item.opentracing_span_context,)
):
pass
@@ -266,7 +271,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
- with opentracing.start_active_span_follows_from(
+ with start_active_span_follows_from(
item.task.name,
item.parent_opentracing_span_contexts,
inherit_force_tracing=True,
@@ -355,7 +360,7 @@ class EventsPersistenceStorageController:
f"Found an unexpected task type in event persistence queue: {task}"
)
- @opentracing.trace
+ @trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@@ -380,9 +385,21 @@ class EventsPersistenceStorageController:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
+ event_ids: List[str] = []
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
+ event_ids.append(event.event_id)
+
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str(event_ids),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(event_ids)),
+ )
+ set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
async def enqueue(
item: Tuple[str, List[Tuple[EventBase, EventContext]]]
@@ -405,20 +422,22 @@ class EventsPersistenceStorageController:
for d in ret_vals:
replaced_events.update(d)
- events = []
+ persisted_events = []
for event, _ in events_and_contexts:
existing_event_id = replaced_events.get(event.event_id)
if existing_event_id:
- events.append(await self.main_store.get_event(existing_event_id))
+ persisted_events.append(
+ await self.main_store.get_event(existing_event_id)
+ )
else:
- events.append(event)
+ persisted_events.append(event)
return (
- events,
+ persisted_events,
self.main_store.get_room_max_token(),
)
- @opentracing.trace
+ @trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
@@ -580,11 +599,6 @@ class EventsPersistenceStorageController:
# room
state_delta_for_room: Dict[str, DeltaState] = {}
- # Set of remote users which were in rooms the server has left. We
- # should check if we still share any rooms and if not we mark their
- # device lists as stale.
- potentially_left_users: Set[str] = set()
-
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
@@ -698,13 +712,9 @@ class EventsPersistenceStorageController:
room_id,
ev_ctx_rm,
delta,
- current_state,
- potentially_left_users,
)
if not is_still_joined:
logger.info("Server no longer in room %s", room_id)
- latest_event_ids = set()
- current_state = {}
delta.no_longer_in_room = True
state_delta_for_room[room_id] = delta
@@ -717,8 +727,6 @@ class EventsPersistenceStorageController:
inhibit_local_membership_updates=backfilled,
)
- await self._handle_potentially_left_users(potentially_left_users)
-
return replaced_events
async def _calculate_new_extremities(
@@ -1094,8 +1102,6 @@ class EventsPersistenceStorageController:
room_id: str,
ev_ctx_rm: List[Tuple[EventBase, EventContext]],
delta: DeltaState,
- current_state: Optional[StateMap[str]],
- potentially_left_users: Set[str],
) -> bool:
"""Check if the server will still be joined after the given events have
been persised.
@@ -1105,11 +1111,6 @@ class EventsPersistenceStorageController:
ev_ctx_rm
delta: The delta of current state between what is in the database
and what the new current state will be.
- current_state: The new current state if it already been calculated,
- otherwise None.
- potentially_left_users: If the server has left the room, then joined
- remote users will be added to this set to indicate that the
- server may no longer be sharing a room with them.
"""
if not any(
@@ -1163,45 +1164,4 @@ class EventsPersistenceStorageController:
):
return True
- # The server will leave the room, so we go and find out which remote
- # users will still be joined when we leave.
- if current_state is None:
- current_state = await self.main_store.get_partial_current_state_ids(room_id)
- current_state = dict(current_state)
- for key in delta.to_delete:
- current_state.pop(key, None)
-
- current_state.update(delta.to_insert)
-
- remote_event_ids = [
- event_id
- for (
- typ,
- state_key,
- ), event_id in current_state.items()
- if typ == EventTypes.Member and not self.is_mine_id(state_key)
- ]
- members = await self.main_store.get_membership_from_event_ids(remote_event_ids)
- potentially_left_users.update(
- member.user_id
- for member in members.values()
- if member and member.membership == Membership.JOIN
- )
-
return False
-
- async def _handle_potentially_left_users(self, user_ids: Set[str]) -> None:
- """Given a set of remote users check if the server still shares a room with
- them. If not then mark those users' device cache as stale.
- """
-
- if not user_ids:
- return
-
- joined_users = await self.main_store.get_users_server_still_shares_room_with(
- user_ids
- )
- left_users = user_ids - joined_users
-
- for user_id in left_users:
- await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
|