diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7c7cda3e95..202beee738 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -110,6 +110,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
+from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -191,7 +192,8 @@ class BasePresenceHandler(abc.ABC):
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id
- self._presence_enabled = hs.config.server.use_presence
+ self._presence_enabled = hs.config.server.presence_enabled
+ self._track_presence = hs.config.server.track_presence
self._federation = None
if hs.should_send_federation():
@@ -511,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
)
async def _on_shutdown(self) -> None:
- if self._presence_enabled:
+ if self._track_presence:
self.hs.get_replication_command_handler().send_command(
ClearUserSyncsCommand(self.instance_id)
)
@@ -523,7 +525,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
is_syncing: bool,
last_sync_ms: int,
) -> None:
- if self._presence_enabled:
+ if self._track_presence:
self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, device_id, is_syncing, last_sync_ms
)
@@ -570,7 +572,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
Called by the sync and events servlets to record that a user has connected to
this worker and is waiting for some events.
"""
- if not affect_presence or not self._presence_enabled:
+ if not affect_presence or not self._track_presence:
return _NullContextManager()
# Note that this causes last_active_ts to be incremented which is not
@@ -701,8 +703,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
user_id = target_user.to_string()
- # If presence is disabled, no-op
- if not self._presence_enabled:
+ # If tracking of presence is disabled, no-op
+ if not self._track_presence:
return
# Proxy request to instance that writes presence
@@ -722,7 +724,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
with the app.
"""
# If presence is disabled, no-op
- if not self._presence_enabled:
+ if not self._track_presence:
return
# Proxy request to instance that writes presence
@@ -759,7 +761,7 @@ class PresenceHandler(BasePresenceHandler):
] = {}
now = self.clock.time_msec()
- if self._presence_enabled:
+ if self._track_presence:
for state in self.user_to_current_state.values():
# Create a psuedo-device to properly handle time outs. This will
# be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
@@ -830,7 +832,7 @@ class PresenceHandler(BasePresenceHandler):
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
- if self._presence_enabled:
+ if self._track_presence:
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
@@ -838,6 +840,9 @@ class PresenceHandler(BasePresenceHandler):
30, self.clock.looping_call, self._handle_timeouts, 5000
)
+ # Presence information is persisted, whether or not it is being tracked
+ # internally.
+ if self._presence_enabled:
self.clock.call_later(
60,
self.clock.looping_call,
@@ -853,7 +858,7 @@ class PresenceHandler(BasePresenceHandler):
)
# Used to handle sending of presence to newly joined users/servers
- if self._presence_enabled:
+ if self._track_presence:
self.notifier.add_replication_callback(self.notify_new_event)
# Presence is best effort and quickly heals itself, so lets just always
@@ -904,7 +909,9 @@ class PresenceHandler(BasePresenceHandler):
)
async def _update_states(
- self, new_states: Iterable[UserPresenceState], force_notify: bool = False
+ self,
+ new_states: Iterable[UserPresenceState],
+ force_notify: bool = False,
) -> None:
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
@@ -942,7 +949,7 @@ class PresenceHandler(BasePresenceHandler):
for new_state in new_states:
user_id = new_state.user_id
- # Its fine to not hit the database here, as the only thing not in
+ # It's fine to not hit the database here, as the only thing not in
# the current state cache are OFFLINE states, where the only field
# of interest is last_active which is safe enough to assume is 0
# here.
@@ -956,6 +963,9 @@ class PresenceHandler(BasePresenceHandler):
is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer,
now=now,
+ # When overriding disabled presence, don't kick off all the
+ # wheel timers.
+ persist=not self._track_presence,
)
if force_notify:
@@ -1071,7 +1081,7 @@ class PresenceHandler(BasePresenceHandler):
with the app.
"""
# If presence is disabled, no-op
- if not self._presence_enabled:
+ if not self._track_presence:
return
user_id = user.to_string()
@@ -1123,7 +1133,7 @@ class PresenceHandler(BasePresenceHandler):
client that is being used by a user.
presence_state: The presence state indicated in the sync request
"""
- if not affect_presence or not self._presence_enabled:
+ if not affect_presence or not self._track_presence:
return _NullContextManager()
curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
@@ -1283,7 +1293,7 @@ class PresenceHandler(BasePresenceHandler):
async def incoming_presence(self, origin: str, content: JsonDict) -> None:
"""Called when we receive a `m.presence` EDU from a remote server."""
- if not self._presence_enabled:
+ if not self._track_presence:
return
now = self.clock.time_msec()
@@ -1358,7 +1368,7 @@ class PresenceHandler(BasePresenceHandler):
raise SynapseError(400, "Invalid presence state")
# If presence is disabled, no-op
- if not self._presence_enabled:
+ if not self._track_presence:
return
user_id = target_user.to_string()
@@ -1499,9 +1509,9 @@ class PresenceHandler(BasePresenceHandler):
# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
- deltas_by_room: Dict[str, List[JsonDict]] = {}
+ deltas_by_room: Dict[str, List[StateDelta]] = {}
for delta in deltas:
- deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+ deltas_by_room.setdefault(delta.room_id, []).append(delta)
for room_id, deltas_for_room in deltas_by_room.items():
await self._handle_state_delta(room_id, deltas_for_room)
@@ -1513,7 +1523,7 @@ class PresenceHandler(BasePresenceHandler):
max_pos
)
- async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+ async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
@@ -1524,31 +1534,30 @@ class PresenceHandler(BasePresenceHandler):
newly_joined_users = set()
for delta in deltas:
- assert room_id == delta["room_id"]
+ assert room_id == delta.room_id
- typ = delta["type"]
- state_key = delta["state_key"]
- event_id = delta["event_id"]
- prev_event_id = delta["prev_event_id"]
-
- logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+ logger.debug(
+ "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
+ )
# Drop any event that isn't a membership join
- if typ != EventTypes.Member:
+ if delta.event_type != EventTypes.Member:
continue
- if event_id is None:
+ if delta.event_id is None:
# state has been deleted, so this is not a join. We only care about
# joins.
continue
- event = await self.store.get_event(event_id, allow_none=True)
+ event = await self.store.get_event(delta.event_id, allow_none=True)
if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue
- if prev_event_id:
- prev_event = await self.store.get_event(prev_event_id, allow_none=True)
+ if delta.prev_event_id:
+ prev_event = await self.store.get_event(
+ delta.prev_event_id, allow_none=True
+ )
if (
prev_event
and prev_event.content.get("membership") == Membership.JOIN
@@ -1556,7 +1565,7 @@ class PresenceHandler(BasePresenceHandler):
# Ignore changes to join events.
continue
- newly_joined_users.add(state_key)
+ newly_joined_users.add(delta.state_key)
if not newly_joined_users:
# If nobody has joined then there's nothing to do.
@@ -2118,6 +2127,7 @@ def handle_update(
is_mine: bool,
wheel_timer: WheelTimer,
now: int,
+ persist: bool,
) -> Tuple[UserPresenceState, bool, bool]:
"""Given a presence update:
1. Add any appropriate timers.
@@ -2129,6 +2139,8 @@ def handle_update(
is_mine: Whether the user is ours
wheel_timer
now: Time now in ms
+ persist: True if this state should persist until another update occurs.
+ Skips insertion into wheel timers.
Returns:
3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
@@ -2146,14 +2158,15 @@ def handle_update(
if is_mine:
if new_state.state == PresenceState.ONLINE:
# Idle timer
- wheel_timer.insert(
- now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
- )
+ if not persist:
+ wheel_timer.insert(
+ now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
+ )
active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
new_state = new_state.copy_and_replace(currently_active=active)
- if active:
+ if active and not persist:
wheel_timer.insert(
now=now,
obj=user_id,
@@ -2162,11 +2175,12 @@ def handle_update(
if new_state.state != PresenceState.OFFLINE:
# User has stopped syncing
- wheel_timer.insert(
- now=now,
- obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
- )
+ if not persist:
+ wheel_timer.insert(
+ now=now,
+ obj=user_id,
+ then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
+ )
last_federate = new_state.last_federation_update_ts
if now - last_federate > FEDERATION_PING_INTERVAL:
@@ -2174,7 +2188,7 @@ def handle_update(
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
federation_ping = True
- if new_state.state == PresenceState.BUSY:
+ if new_state.state == PresenceState.BUSY and not persist:
wheel_timer.insert(
now=now,
obj=user_id,
@@ -2182,11 +2196,13 @@ def handle_update(
)
else:
- wheel_timer.insert(
- now=now,
- obj=user_id,
- then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
- )
+ # An update for a remote user was received.
+ if not persist:
+ wheel_timer.insert(
+ now=now,
+ obj=user_id,
+ then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
+ )
# Check whether the change was something worth notifying about
if should_notify(prev_state, new_state, is_mine):
|