summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py110
1 files changed, 63 insertions, 47 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py

index 7c7cda3e95..202beee738 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -110,6 +110,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.storage.databases.main import DataStore +from synapse.storage.databases.main.state_deltas import StateDelta from synapse.streams import EventSource from synapse.types import ( JsonDict, @@ -191,7 +192,8 @@ class BasePresenceHandler(abc.ABC): self.state = hs.get_state_handler() self.is_mine_id = hs.is_mine_id - self._presence_enabled = hs.config.server.use_presence + self._presence_enabled = hs.config.server.presence_enabled + self._track_presence = hs.config.server.track_presence self._federation = None if hs.should_send_federation(): @@ -511,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler): ) async def _on_shutdown(self) -> None: - if self._presence_enabled: + if self._track_presence: self.hs.get_replication_command_handler().send_command( ClearUserSyncsCommand(self.instance_id) ) @@ -523,7 +525,7 @@ class WorkerPresenceHandler(BasePresenceHandler): is_syncing: bool, last_sync_ms: int, ) -> None: - if self._presence_enabled: + if self._track_presence: self.hs.get_replication_command_handler().send_user_sync( self.instance_id, user_id, device_id, is_syncing, last_sync_ms ) @@ -570,7 +572,7 @@ class WorkerPresenceHandler(BasePresenceHandler): Called by the sync and events servlets to record that a user has connected to this worker and is waiting for some events. """ - if not affect_presence or not self._presence_enabled: + if not affect_presence or not self._track_presence: return _NullContextManager() # Note that this causes last_active_ts to be incremented which is not @@ -701,8 +703,8 @@ class WorkerPresenceHandler(BasePresenceHandler): user_id = target_user.to_string() - # If presence is disabled, no-op - if not self._presence_enabled: + # If tracking of presence is disabled, no-op + if not self._track_presence: return # Proxy request to instance that writes presence @@ -722,7 +724,7 @@ class WorkerPresenceHandler(BasePresenceHandler): with the app. """ # If presence is disabled, no-op - if not self._presence_enabled: + if not self._track_presence: return # Proxy request to instance that writes presence @@ -759,7 +761,7 @@ class PresenceHandler(BasePresenceHandler): ] = {} now = self.clock.time_msec() - if self._presence_enabled: + if self._track_presence: for state in self.user_to_current_state.values(): # Create a psuedo-device to properly handle time outs. This will # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT. @@ -830,7 +832,7 @@ class PresenceHandler(BasePresenceHandler): self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") - if self._presence_enabled: + if self._track_presence: # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. @@ -838,6 +840,9 @@ class PresenceHandler(BasePresenceHandler): 30, self.clock.looping_call, self._handle_timeouts, 5000 ) + # Presence information is persisted, whether or not it is being tracked + # internally. + if self._presence_enabled: self.clock.call_later( 60, self.clock.looping_call, @@ -853,7 +858,7 @@ class PresenceHandler(BasePresenceHandler): ) # Used to handle sending of presence to newly joined users/servers - if self._presence_enabled: + if self._track_presence: self.notifier.add_replication_callback(self.notify_new_event) # Presence is best effort and quickly heals itself, so lets just always @@ -904,7 +909,9 @@ class PresenceHandler(BasePresenceHandler): ) async def _update_states( - self, new_states: Iterable[UserPresenceState], force_notify: bool = False + self, + new_states: Iterable[UserPresenceState], + force_notify: bool = False, ) -> None: """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state @@ -942,7 +949,7 @@ class PresenceHandler(BasePresenceHandler): for new_state in new_states: user_id = new_state.user_id - # Its fine to not hit the database here, as the only thing not in + # It's fine to not hit the database here, as the only thing not in # the current state cache are OFFLINE states, where the only field # of interest is last_active which is safe enough to assume is 0 # here. @@ -956,6 +963,9 @@ class PresenceHandler(BasePresenceHandler): is_mine=self.is_mine_id(user_id), wheel_timer=self.wheel_timer, now=now, + # When overriding disabled presence, don't kick off all the + # wheel timers. + persist=not self._track_presence, ) if force_notify: @@ -1071,7 +1081,7 @@ class PresenceHandler(BasePresenceHandler): with the app. """ # If presence is disabled, no-op - if not self._presence_enabled: + if not self._track_presence: return user_id = user.to_string() @@ -1123,7 +1133,7 @@ class PresenceHandler(BasePresenceHandler): client that is being used by a user. presence_state: The presence state indicated in the sync request """ - if not affect_presence or not self._presence_enabled: + if not affect_presence or not self._track_presence: return _NullContextManager() curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0) @@ -1283,7 +1293,7 @@ class PresenceHandler(BasePresenceHandler): async def incoming_presence(self, origin: str, content: JsonDict) -> None: """Called when we receive a `m.presence` EDU from a remote server.""" - if not self._presence_enabled: + if not self._track_presence: return now = self.clock.time_msec() @@ -1358,7 +1368,7 @@ class PresenceHandler(BasePresenceHandler): raise SynapseError(400, "Invalid presence state") # If presence is disabled, no-op - if not self._presence_enabled: + if not self._track_presence: return user_id = target_user.to_string() @@ -1499,9 +1509,9 @@ class PresenceHandler(BasePresenceHandler): # We may get multiple deltas for different rooms, but we want to # handle them on a room by room basis, so we batch them up by # room. - deltas_by_room: Dict[str, List[JsonDict]] = {} + deltas_by_room: Dict[str, List[StateDelta]] = {} for delta in deltas: - deltas_by_room.setdefault(delta["room_id"], []).append(delta) + deltas_by_room.setdefault(delta.room_id, []).append(delta) for room_id, deltas_for_room in deltas_by_room.items(): await self._handle_state_delta(room_id, deltas_for_room) @@ -1513,7 +1523,7 @@ class PresenceHandler(BasePresenceHandler): max_pos ) - async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None: + async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None: """Process current state deltas for the room to find new joins that need to be handled. """ @@ -1524,31 +1534,30 @@ class PresenceHandler(BasePresenceHandler): newly_joined_users = set() for delta in deltas: - assert room_id == delta["room_id"] + assert room_id == delta.room_id - typ = delta["type"] - state_key = delta["state_key"] - event_id = delta["event_id"] - prev_event_id = delta["prev_event_id"] - - logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + logger.debug( + "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id + ) # Drop any event that isn't a membership join - if typ != EventTypes.Member: + if delta.event_type != EventTypes.Member: continue - if event_id is None: + if delta.event_id is None: # state has been deleted, so this is not a join. We only care about # joins. continue - event = await self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(delta.event_id, allow_none=True) if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue - if prev_event_id: - prev_event = await self.store.get_event(prev_event_id, allow_none=True) + if delta.prev_event_id: + prev_event = await self.store.get_event( + delta.prev_event_id, allow_none=True + ) if ( prev_event and prev_event.content.get("membership") == Membership.JOIN @@ -1556,7 +1565,7 @@ class PresenceHandler(BasePresenceHandler): # Ignore changes to join events. continue - newly_joined_users.add(state_key) + newly_joined_users.add(delta.state_key) if not newly_joined_users: # If nobody has joined then there's nothing to do. @@ -2118,6 +2127,7 @@ def handle_update( is_mine: bool, wheel_timer: WheelTimer, now: int, + persist: bool, ) -> Tuple[UserPresenceState, bool, bool]: """Given a presence update: 1. Add any appropriate timers. @@ -2129,6 +2139,8 @@ def handle_update( is_mine: Whether the user is ours wheel_timer now: Time now in ms + persist: True if this state should persist until another update occurs. + Skips insertion into wheel timers. Returns: 3-tuple: `(new_state, persist_and_notify, federation_ping)` where: @@ -2146,14 +2158,15 @@ def handle_update( if is_mine: if new_state.state == PresenceState.ONLINE: # Idle timer - wheel_timer.insert( - now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER - ) + if not persist: + wheel_timer.insert( + now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER + ) active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY new_state = new_state.copy_and_replace(currently_active=active) - if active: + if active and not persist: wheel_timer.insert( now=now, obj=user_id, @@ -2162,11 +2175,12 @@ def handle_update( if new_state.state != PresenceState.OFFLINE: # User has stopped syncing - wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, - ) + if not persist: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, + ) last_federate = new_state.last_federation_update_ts if now - last_federate > FEDERATION_PING_INTERVAL: @@ -2174,7 +2188,7 @@ def handle_update( new_state = new_state.copy_and_replace(last_federation_update_ts=now) federation_ping = True - if new_state.state == PresenceState.BUSY: + if new_state.state == PresenceState.BUSY and not persist: wheel_timer.insert( now=now, obj=user_id, @@ -2182,11 +2196,13 @@ def handle_update( ) else: - wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, - ) + # An update for a remote user was received. + if not persist: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, + ) # Check whether the change was something worth notifying about if should_notify(prev_state, new_state, is_mine):