diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 0e812a6d8b..2f0e5f3b0a 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -76,6 +76,7 @@ class AdminHandler:
"consent_ts",
"user_type",
"is_guest",
+ "last_seen_ts",
}
if self._msc3866_enabled:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 59ecafa6a0..2b0c505130 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -218,19 +218,17 @@ class AuthHandler:
self._failed_uia_attempts_ratelimiter = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=self.hs.config.ratelimiting.rc_login_failed_attempts.per_second,
- burst_count=self.hs.config.ratelimiting.rc_login_failed_attempts.burst_count,
+ cfg=self.hs.config.ratelimiting.rc_login_failed_attempts,
)
# The number of seconds to keep a UI auth session active.
self._ui_auth_session_timeout = hs.config.auth.ui_auth_session_timeout
- # Ratelimitier for failed /login attempts
+ # Ratelimiter for failed /login attempts
self._failed_login_attempts_ratelimiter = Ratelimiter(
store=self.store,
clock=hs.get_clock(),
- rate_hz=self.hs.config.ratelimiting.rc_login_failed_attempts.per_second,
- burst_count=self.hs.config.ratelimiting.rc_login_failed_attempts.burst_count,
+ cfg=self.hs.config.ratelimiting.rc_login_failed_attempts,
)
self._clock = self.hs.get_clock()
diff --git a/synapse/handlers/cas.py b/synapse/handlers/cas.py
index 5c71637038..a850545453 100644
--- a/synapse/handlers/cas.py
+++ b/synapse/handlers/cas.py
@@ -67,6 +67,7 @@ class CasHandler:
self._cas_server_url = hs.config.cas.cas_server_url
self._cas_service_url = hs.config.cas.cas_service_url
+ self._cas_protocol_version = hs.config.cas.cas_protocol_version
self._cas_displayname_attribute = hs.config.cas.cas_displayname_attribute
self._cas_required_attributes = hs.config.cas.cas_required_attributes
@@ -121,7 +122,10 @@ class CasHandler:
Returns:
The parsed CAS response.
"""
- uri = self._cas_server_url + "/proxyValidate"
+ if self._cas_protocol_version == 3:
+ uri = self._cas_server_url + "/p3/proxyValidate"
+ else:
+ uri = self._cas_server_url + "/proxyValidate"
args = {
"ticket": ticket,
"service": self._build_service_param(service_args),
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5ae427d52c..763f56dfc1 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler):
user_id,
hosts,
)
- for host in hosts:
- self.federation_sender.send_device_messages(
- host, immediate=False
- )
- # TODO: when called, this isn't in a logging context.
- # This leads to log spam, sentry event spam, and massive
- # memory usage.
- # See https://github.com/matrix-org/synapse/issues/12552.
- # log_kv(
- # {"message": "sent device update to host", "host": host}
- # )
+ await self.federation_sender.send_device_messages(
+ hosts, immediate=False
+ )
+ # TODO: when called, this isn't in a logging context.
+ # This leads to log spam, sentry event spam, and massive
+ # memory usage.
+ # See https://github.com/matrix-org/synapse/issues/12552.
+ # log_kv(
+ # {"message": "sent device update to host", "host": host}
+ # )
if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
@@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler):
# Notify things that device lists need to be sent out.
self.notifier.notify_replication()
- for host in potentially_changed_hosts:
- self.federation_sender.send_device_messages(host, immediate=False)
+ await self.federation_sender.send_device_messages(
+ potentially_changed_hosts, immediate=False
+ )
def _update_device_from_client_ips(
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 17ff8821d9..1c79f7a61e 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -90,8 +90,7 @@ class DeviceMessageHandler:
self._ratelimiter = Ratelimiter(
store=self.store,
clock=hs.get_clock(),
- rate_hz=hs.config.ratelimiting.rc_key_requests.per_second,
- burst_count=hs.config.ratelimiting.rc_key_requests.burst_count,
+ cfg=hs.config.ratelimiting.rc_key_requests,
)
async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
@@ -303,10 +302,9 @@ class DeviceMessageHandler:
)
if self.federation_sender:
- for destination in remote_messages.keys():
- # Enqueue a new federation transaction to send the new
- # device messages to each remote destination.
- self.federation_sender.send_device_messages(destination)
+ # Enqueue a new federation transaction to send the new
+ # device messages to each remote destination.
+ await self.federation_sender.send_device_messages(remote_messages.keys())
async def get_events_for_dehydrated_device(
self,
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 33359f6ed7..d12803bf0f 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -67,6 +67,7 @@ class EventStreamHandler:
context = await presence_handler.user_syncing(
requester.user.to_string(),
+ requester.device_id,
affect_presence=affect_presence,
presence_state=PresenceState.ONLINE,
)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 3031384d25..472879c964 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -66,14 +66,12 @@ class IdentityHandler:
self._3pid_validation_ratelimiter_ip = Ratelimiter(
store=self.store,
clock=hs.get_clock(),
- rate_hz=hs.config.ratelimiting.rc_3pid_validation.per_second,
- burst_count=hs.config.ratelimiting.rc_3pid_validation.burst_count,
+ cfg=hs.config.ratelimiting.rc_3pid_validation,
)
self._3pid_validation_ratelimiter_address = Ratelimiter(
store=self.store,
clock=hs.get_clock(),
- rate_hz=hs.config.ratelimiting.rc_3pid_validation.per_second,
- burst_count=hs.config.ratelimiting.rc_3pid_validation.burst_count,
+ cfg=hs.config.ratelimiting.rc_3pid_validation,
)
async def ratelimit_request_token_requests(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a74db1dccf..d6be18cdef 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -379,7 +379,7 @@ class MessageHandler:
"""
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
- if type(expiry_ts) is not int or event.is_state():
+ if type(expiry_ts) is not int or event.is_state(): # noqa: E721
return
# _schedule_expiry_for_event won't actually schedule anything if there's already
@@ -908,19 +908,6 @@ class EventCreationHandler:
if existing_event_id:
return existing_event_id
- # Some requsters don't have device IDs (appservice, guests, and access
- # tokens minted with the admin API), fallback to checking the access token
- # ID, which should be close enough.
- if requester.access_token_id:
- existing_event_id = (
- await self.store.get_event_id_from_transaction_id_and_token_id(
- room_id,
- requester.user.to_string(),
- requester.access_token_id,
- txn_id,
- )
- )
-
return existing_event_id
async def get_event_from_transaction(
@@ -1474,23 +1461,23 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
- event, context = events_and_context[0]
+ #
+ # Note: mypy gets confused if we inline dl and check with twisted#11770.
+ # Some kind of bug in mypy's deduction?
+ deferreds = (
+ run_in_background(
+ self._persist_events,
+ requester=requester,
+ events_and_context=events_and_context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ ),
+ run_in_background(
+ self.cache_joined_hosts_for_events, events_and_context
+ ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
+ )
result, _ = await make_deferred_yieldable(
- gather_results(
- (
- run_in_background(
- self._persist_events,
- requester=requester,
- events_and_context=events_and_context,
- ratelimit=ratelimit,
- extra_users=extra_users,
- ),
- run_in_background(
- self.cache_joined_hosts_for_events, events_and_context
- ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
- ),
- consumeErrors=True,
- )
+ gather_results(deferreds, consumeErrors=True)
).addErrback(unwrapFirstError)
return result
@@ -1921,7 +1908,10 @@ class EventCreationHandler:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_as_background_process(
- "bump_presence_active_time", self._bump_active_time, requester.user
+ "bump_presence_active_time",
+ self._bump_active_time,
+ requester.user,
+ requester.device_id,
)
async def _notify() -> None:
@@ -1958,10 +1948,10 @@ class EventCreationHandler:
logger.info("maybe_kick_guest_users %r", current_state)
await self.hs.get_room_member_handler().kick_guest_users(current_state)
- async def _bump_active_time(self, user: UserID) -> None:
+ async def _bump_active_time(self, user: UserID, device_id: Optional[str]) -> None:
try:
presence = self.hs.get_presence_handler()
- await presence.bump_presence_active_time(user)
+ await presence.bump_presence_active_time(user, device_id)
except Exception:
logger.exception("Error bumping presence active time")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e8e9db4b91..f31e18328b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -23,6 +23,7 @@ The methods that define policy are:
"""
import abc
import contextlib
+import itertools
import logging
from bisect import bisect
from contextlib import contextmanager
@@ -151,15 +152,13 @@ class BasePresenceHandler(abc.ABC):
self._federation_queue = PresenceFederationQueue(hs, self)
- self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
-
self.VALID_PRESENCE: Tuple[str, ...] = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)
- if self._busy_presence_enabled:
+ if hs.config.experimental.msc3026_enabled:
self.VALID_PRESENCE += (PresenceState.BUSY,)
active_presence = self.store.take_presence_startup_info()
@@ -167,7 +166,11 @@ class BasePresenceHandler(abc.ABC):
@abc.abstractmethod
async def user_syncing(
- self, user_id: str, affect_presence: bool, presence_state: str
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ affect_presence: bool,
+ presence_state: str,
) -> ContextManager[None]:
"""Returns a context manager that should surround any stream requests
from the user.
@@ -178,6 +181,7 @@ class BasePresenceHandler(abc.ABC):
Args:
user_id: the user that is starting a sync
+ device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
@@ -185,15 +189,17 @@ class BasePresenceHandler(abc.ABC):
"""
@abc.abstractmethod
- def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
- """Get an iterable of syncing users on this worker, to send to the presence handler
+ def get_currently_syncing_users_for_replication(
+ self,
+ ) -> Iterable[Tuple[str, Optional[str]]]:
+ """Get an iterable of syncing users and devices on this worker, to send to the presence handler
This is called when a replication connection is established. It should return
- a list of user ids, which are then sent as USER_SYNC commands to inform the
- process handling presence about those users.
+ a list of tuples of user ID & device ID, which are then sent as USER_SYNC commands
+ to inform the process handling presence about those users/devices.
Returns:
- An iterable of user_id strings.
+ An iterable of tuples of user ID and device ID.
"""
async def get_state(self, target_user: UserID) -> UserPresenceState:
@@ -254,28 +260,39 @@ class BasePresenceHandler(abc.ABC):
async def set_state(
self,
target_user: UserID,
+ device_id: Optional[str],
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
+ device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
@abc.abstractmethod
- async def bump_presence_active_time(self, user: UserID) -> None:
+ async def bump_presence_active_time(
+ self, user: UserID, device_id: Optional[str]
+ ) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
async def update_external_syncs_row( # noqa: B027 (no-op by design)
- self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
+ self,
+ process_id: str,
+ user_id: str,
+ device_id: Optional[str],
+ is_syncing: bool,
+ sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.
@@ -286,6 +303,7 @@ class BasePresenceHandler(abc.ABC):
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
+ device_id: The user's device that has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
@@ -336,7 +354,9 @@ class BasePresenceHandler(abc.ABC):
)
for destination, host_states in hosts_to_states.items():
- self._federation.send_presence_to_destinations(host_states, [destination])
+ await self._federation.send_presence_to_destinations(
+ host_states, [destination]
+ )
async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
"""
@@ -381,7 +401,9 @@ class BasePresenceHandler(abc.ABC):
# We set force_notify=True here so that this presence update is guaranteed to
# increment the presence stream ID (which resending the current user's presence
# otherwise would not do).
- await self.set_state(UserID.from_string(user_id), state, force_notify=True)
+ await self.set_state(
+ UserID.from_string(user_id), None, state, force_notify=True
+ )
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
raise NotImplementedError(
@@ -414,16 +436,18 @@ class WorkerPresenceHandler(BasePresenceHandler):
hs.config.worker.writers.presence,
)
- # The number of ongoing syncs on this process, by user id.
+ # The number of ongoing syncs on this process, by (user ID, device ID).
# Empty if _presence_enabled is false.
- self._user_to_num_current_syncs: Dict[str, int] = {}
+ self._user_device_to_num_current_syncs: Dict[
+ Tuple[str, Optional[str]], int
+ ] = {}
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
- # user_id -> last_sync_ms. Lists the users that have stopped syncing but
- # we haven't notified the presence writer of that yet
- self.users_going_offline: Dict[str, int] = {}
+ # (user_id, device_id) -> last_sync_ms. Lists the devices that have stopped
+ # syncing but we haven't notified the presence writer of that yet
+ self._user_devices_going_offline: Dict[Tuple[str, Optional[str]], int] = {}
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
@@ -446,42 +470,54 @@ class WorkerPresenceHandler(BasePresenceHandler):
ClearUserSyncsCommand(self.instance_id)
)
- def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
+ def send_user_sync(
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ is_syncing: bool,
+ last_sync_ms: int,
+ ) -> None:
if self._presence_enabled:
self.hs.get_replication_command_handler().send_user_sync(
- self.instance_id, user_id, is_syncing, last_sync_ms
+ self.instance_id, user_id, device_id, is_syncing, last_sync_ms
)
- def mark_as_coming_online(self, user_id: str) -> None:
+ def mark_as_coming_online(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
"""
- going_offline = self.users_going_offline.pop(user_id, None)
+ going_offline = self._user_devices_going_offline.pop((user_id, device_id), None)
if not going_offline:
# Safe to skip because we haven't yet told the presence writer they
# were offline
- self.send_user_sync(user_id, True, self.clock.time_msec())
+ self.send_user_sync(user_id, device_id, True, self.clock.time_msec())
- def mark_as_going_offline(self, user_id: str) -> None:
+ def mark_as_going_offline(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
"""
- self.users_going_offline[user_id] = self.clock.time_msec()
+ self._user_devices_going_offline[(user_id, device_id)] = self.clock.time_msec()
def send_stop_syncing(self) -> None:
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
- for user_id, last_sync_ms in list(self.users_going_offline.items()):
+ for (user_id, device_id), last_sync_ms in list(
+ self._user_devices_going_offline.items()
+ ):
if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
- self.users_going_offline.pop(user_id, None)
- self.send_user_sync(user_id, False, last_sync_ms)
+ self._user_devices_going_offline.pop((user_id, device_id), None)
+ self.send_user_sync(user_id, device_id, False, last_sync_ms)
async def user_syncing(
- self, user_id: str, affect_presence: bool, presence_state: str
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ affect_presence: bool,
+ presence_state: str,
) -> ContextManager[None]:
"""Record that a user is syncing.
@@ -491,36 +527,32 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not affect_presence or not self._presence_enabled:
return _NullContextManager()
- prev_state = await self.current_state_for_user(user_id)
- if prev_state.state != PresenceState.BUSY:
- # We set state here but pass ignore_status_msg = True as we don't want to
- # cause the status message to be cleared.
- # Note that this causes last_active_ts to be incremented which is not
- # what the spec wants: see comment in the BasePresenceHandler version
- # of this function.
- await self.set_state(
- UserID.from_string(user_id),
- {"presence": presence_state},
- ignore_status_msg=True,
- )
+ # Note that this causes last_active_ts to be incremented which is not
+ # what the spec wants.
+ await self.set_state(
+ UserID.from_string(user_id),
+ device_id,
+ state={"presence": presence_state},
+ is_sync=True,
+ )
- curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
- self._user_to_num_current_syncs[user_id] = curr_sync + 1
+ curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
+ self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1
- # If we went from no in flight sync to some, notify replication
- if self._user_to_num_current_syncs[user_id] == 1:
- self.mark_as_coming_online(user_id)
+ # If this is the first in-flight sync, notify replication
+ if self._user_device_to_num_current_syncs[(user_id, device_id)] == 1:
+ self.mark_as_coming_online(user_id, device_id)
def _end() -> None:
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
- if user_id in self._user_to_num_current_syncs:
- self._user_to_num_current_syncs[user_id] -= 1
+ if (user_id, device_id) in self._user_device_to_num_current_syncs:
+ self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1
- # If we went from one in flight sync to non, notify replication
- if self._user_to_num_current_syncs[user_id] == 0:
- self.mark_as_going_offline(user_id)
+ # If there are no more in-flight syncs, notify replication
+ if self._user_device_to_num_current_syncs[(user_id, device_id)] == 0:
+ self.mark_as_going_offline(user_id, device_id)
@contextlib.contextmanager
def _user_syncing() -> Generator[None, None, None]:
@@ -587,28 +619,34 @@ class WorkerPresenceHandler(BasePresenceHandler):
# If this is a federation sender, notify about presence updates.
await self.maybe_send_presence_to_interested_destinations(state_to_notify)
- def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
+ def get_currently_syncing_users_for_replication(
+ self,
+ ) -> Iterable[Tuple[str, Optional[str]]]:
return [
- user_id
- for user_id, count in self._user_to_num_current_syncs.items()
+ user_id_device_id
+ for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count > 0
]
async def set_state(
self,
target_user: UserID,
+ device_id: Optional[str],
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
+ device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
presence = state["presence"]
@@ -625,12 +663,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
await self._set_state_client(
instance_name=self._presence_writer_instance,
user_id=user_id,
+ device_id=device_id,
state=state,
- ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
+ is_sync=is_sync,
)
- async def bump_presence_active_time(self, user: UserID) -> None:
+ async def bump_presence_active_time(
+ self, user: UserID, device_id: Optional[str]
+ ) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -641,7 +682,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
# Proxy request to instance that writes presence
user_id = user.to_string()
await self._bump_active_client(
- instance_name=self._presence_writer_instance, user_id=user_id
+ instance_name=self._presence_writer_instance,
+ user_id=user_id,
+ device_id=device_id,
)
@@ -703,17 +746,23 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on this process. While
# this is non zero a user will never go offline.
- self.user_to_num_current_syncs: Dict[str, int] = {}
+ self._user_device_to_num_current_syncs: Dict[
+ Tuple[str, Optional[str]], int
+ ] = {}
# Keeps track of the number of *ongoing* syncs on other processes.
+ #
# While any sync is ongoing on another process the user will never
# go offline.
+ #
# Each process has a unique identifier and an update frequency. If
# no update is received from that process within the update period then
# we assume that all the sync requests on that process have stopped.
- # Stored as a dict from process_id to set of user_id, and a dict of
- # process_id to millisecond timestamp last updated.
- self.external_process_to_current_syncs: Dict[str, Set[str]] = {}
+ # Stored as a dict from process_id to set of (user_id, device_id), and
+ # a dict of process_id to millisecond timestamp last updated.
+ self.external_process_to_current_syncs: Dict[
+ str, Set[Tuple[str, Optional[str]]]
+ ] = {}
self.external_process_last_updated_ms: Dict[str, int] = {}
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
@@ -889,7 +938,7 @@ class PresenceHandler(BasePresenceHandler):
)
for destination, states in hosts_to_states.items():
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
states, [destination]
)
@@ -918,7 +967,10 @@ class PresenceHandler(BasePresenceHandler):
# that were syncing on that process to see if they need to be timed
# out.
users_to_check.update(
- self.external_process_to_current_syncs.pop(process_id, ())
+ user_id
+ for user_id, device_id in self.external_process_to_current_syncs.pop(
+ process_id, ()
+ )
)
self.external_process_last_updated_ms.pop(process_id)
@@ -931,11 +983,15 @@ class PresenceHandler(BasePresenceHandler):
syncing_user_ids = {
user_id
- for user_id, count in self.user_to_num_current_syncs.items()
+ for (user_id, _), count in self._user_device_to_num_current_syncs.items()
if count
}
- for user_ids in self.external_process_to_current_syncs.values():
- syncing_user_ids.update(user_ids)
+ syncing_user_ids.update(
+ user_id
+ for user_id, _ in itertools.chain(
+ *self.external_process_to_current_syncs.values()
+ )
+ )
changes = handle_timeouts(
states,
@@ -946,7 +1002,9 @@ class PresenceHandler(BasePresenceHandler):
return await self._update_states(changes)
- async def bump_presence_active_time(self, user: UserID) -> None:
+ async def bump_presence_active_time(
+ self, user: UserID, device_id: Optional[str]
+ ) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@@ -969,6 +1027,7 @@ class PresenceHandler(BasePresenceHandler):
async def user_syncing(
self,
user_id: str,
+ device_id: Optional[str],
affect_presence: bool = True,
presence_state: str = PresenceState.ONLINE,
) -> ContextManager[None]:
@@ -980,7 +1039,8 @@ class PresenceHandler(BasePresenceHandler):
when users disconnect/reconnect.
Args:
- user_id
+ user_id: the user that is starting a sync
+ device_id: the user's device that is starting a sync
affect_presence: If false this function will be a no-op.
Useful for streams that are not associated with an actual
client that is being used by a user.
@@ -989,52 +1049,21 @@ class PresenceHandler(BasePresenceHandler):
if not affect_presence or not self._presence_enabled:
return _NullContextManager()
- curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
- self.user_to_num_current_syncs[user_id] = curr_sync + 1
+ curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
+ self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1
- prev_state = await self.current_state_for_user(user_id)
-
- # If they're busy then they don't stop being busy just by syncing,
- # so just update the last sync time.
- if prev_state.state != PresenceState.BUSY:
- # XXX: We set_state separately here and just update the last_active_ts above
- # This keeps the logic as similar as possible between the worker and single
- # process modes. Using set_state will actually cause last_active_ts to be
- # updated always, which is not what the spec calls for, but synapse has done
- # this for... forever, I think.
- await self.set_state(
- UserID.from_string(user_id),
- {"presence": presence_state},
- ignore_status_msg=True,
- )
- # Retrieve the new state for the logic below. This should come from the
- # in-memory cache.
- prev_state = await self.current_state_for_user(user_id)
-
- # To keep the single process behaviour consistent with worker mode, run the
- # same logic as `update_external_syncs_row`, even though it looks weird.
- if prev_state.state == PresenceState.OFFLINE:
- await self._update_states(
- [
- prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=self.clock.time_msec(),
- last_user_sync_ts=self.clock.time_msec(),
- )
- ]
- )
- # otherwise, set the new presence state & update the last sync time,
- # but don't update last_active_ts as this isn't an indication that
- # they've been active (even though it's probably been updated by
- # set_state above)
- else:
- await self._update_states(
- [prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
- )
+ # Note that this causes last_active_ts to be incremented which is not
+ # what the spec wants.
+ await self.set_state(
+ UserID.from_string(user_id),
+ device_id,
+ state={"presence": presence_state},
+ is_sync=True,
+ )
async def _end() -> None:
try:
- self.user_to_num_current_syncs[user_id] -= 1
+ self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1
prev_state = await self.current_state_for_user(user_id)
await self._update_states(
@@ -1056,12 +1085,19 @@ class PresenceHandler(BasePresenceHandler):
return _user_syncing()
- def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
+ def get_currently_syncing_users_for_replication(
+ self,
+ ) -> Iterable[Tuple[str, Optional[str]]]:
# since we are the process handling presence, there is nothing to do here.
return []
async def update_external_syncs_row(
- self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
+ self,
+ process_id: str,
+ user_id: str,
+ device_id: Optional[str],
+ is_syncing: bool,
+ sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.
@@ -1070,6 +1106,7 @@ class PresenceHandler(BasePresenceHandler):
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
+ device_id: The user's device that has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
@@ -1080,31 +1117,27 @@ class PresenceHandler(BasePresenceHandler):
process_id, set()
)
- updates = []
- if is_syncing and user_id not in process_presence:
- if prev_state.state == PresenceState.OFFLINE:
- updates.append(
- prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=sync_time_msec,
- last_user_sync_ts=sync_time_msec,
- )
- )
- else:
- updates.append(
- prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
- )
- process_presence.add(user_id)
- elif user_id in process_presence:
- updates.append(
- prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+ # USER_SYNC is sent when a user's device starts or stops syncing on
+ # a remote # process. (But only for the initial and last sync for that
+ # device.)
+ #
+ # When a device *starts* syncing it also calls set_state(...) which
+ # will update the state, last_active_ts, and last_user_sync_ts.
+ # Simply ensure the user & device is tracked as syncing in this case.
+ #
+ # When a device *stops* syncing, update the last_user_sync_ts and mark
+ # them as no longer syncing. Note this doesn't quite match the
+ # monolith behaviour, which updates last_user_sync_ts at the end of
+ # every sync, not just the last in-flight sync.
+ if is_syncing and (user_id, device_id) not in process_presence:
+ process_presence.add((user_id, device_id))
+ elif not is_syncing and (user_id, device_id) in process_presence:
+ new_state = prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec
)
+ await self._update_states([new_state])
- if not is_syncing:
- process_presence.discard(user_id)
-
- if updates:
- await self._update_states(updates)
+ process_presence.discard((user_id, device_id))
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
@@ -1118,7 +1151,9 @@ class PresenceHandler(BasePresenceHandler):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
- prev_states = await self.current_state_for_users(process_presence)
+ prev_states = await self.current_state_for_users(
+ {user_id for user_id, device_id in process_presence}
+ )
time_now_ms = self.clock.time_msec()
await self._update_states(
@@ -1203,18 +1238,22 @@ class PresenceHandler(BasePresenceHandler):
async def set_state(
self,
target_user: UserID,
+ device_id: Optional[str],
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
+ device_id: the device that the user is setting the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
status_msg = state.get("status_msg", None)
presence = state["presence"]
@@ -1227,18 +1266,27 @@ class PresenceHandler(BasePresenceHandler):
return
user_id = target_user.to_string()
+ now = self.clock.time_msec()
prev_state = await self.current_state_for_user(user_id)
+ # Syncs do not override a previous presence of busy.
+ #
+ # TODO: This is a hack for lack of multi-device support. Unfortunately
+ # removing this requires coordination with clients.
+ if prev_state.state == PresenceState.BUSY and is_sync:
+ presence = PresenceState.BUSY
+
new_fields = {"state": presence}
- if not ignore_status_msg:
- new_fields["status_msg"] = status_msg
+ if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
+ new_fields["last_active_ts"] = now
- if presence == PresenceState.ONLINE or (
- presence == PresenceState.BUSY and self._busy_presence_enabled
- ):
- new_fields["last_active_ts"] = self.clock.time_msec()
+ if is_sync:
+ new_fields["last_user_sync_ts"] = now
+ else:
+ # Syncs do not override the status message.
+ new_fields["status_msg"] = status_msg
await self._update_states(
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
@@ -1462,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler):
or state.status_msg is not None
]
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)
@@ -1473,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler):
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)
@@ -2136,7 +2184,7 @@ class PresenceFederationQueue:
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: StrCollection
) -> None:
"""Send the presence states to the given destinations.
@@ -2156,7 +2204,7 @@ class PresenceFederationQueue:
return
if self._federation:
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states,
destinations=destinations,
)
@@ -2279,7 +2327,7 @@ class PresenceFederationQueue:
for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states.values(),
destinations=[host],
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index bb409f97b7..da146b0d45 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -112,8 +112,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self._join_rate_limiter_local = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
- burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
+ cfg=hs.config.ratelimiting.rc_joins_local,
)
# Tracks joins from local users to rooms this server isn't a member of.
# I.e. joins this server makes by requesting /make_join /send_join from
@@ -121,8 +120,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self._join_rate_limiter_remote = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
- burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
+ cfg=hs.config.ratelimiting.rc_joins_remote,
)
# TODO: find a better place to keep this Ratelimiter.
# It needs to be
@@ -135,8 +133,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self._join_rate_per_room_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
- burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
+ cfg=hs.config.ratelimiting.rc_joins_per_room,
)
# Ratelimiter for invites, keyed by room (across all issuers, all
@@ -144,8 +141,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self._invites_per_room_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=hs.config.ratelimiting.rc_invites_per_room.per_second,
- burst_count=hs.config.ratelimiting.rc_invites_per_room.burst_count,
+ cfg=hs.config.ratelimiting.rc_invites_per_room,
)
# Ratelimiter for invites, keyed by recipient (across all rooms, all
@@ -153,8 +149,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self._invites_per_recipient_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=hs.config.ratelimiting.rc_invites_per_user.per_second,
- burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count,
+ cfg=hs.config.ratelimiting.rc_invites_per_user,
)
# Ratelimiter for invites, keyed by issuer (across all rooms, all
@@ -162,15 +157,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self._invites_per_issuer_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=hs.config.ratelimiting.rc_invites_per_issuer.per_second,
- burst_count=hs.config.ratelimiting.rc_invites_per_issuer.burst_count,
+ cfg=hs.config.ratelimiting.rc_invites_per_issuer,
)
self._third_party_invite_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
- rate_hz=hs.config.ratelimiting.rc_third_party_invite.per_second,
- burst_count=hs.config.ratelimiting.rc_third_party_invite.burst_count,
+ cfg=hs.config.ratelimiting.rc_third_party_invite,
)
self.request_ratelimiter = hs.get_request_ratelimiter()
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index dad3e23470..dd559b4c45 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
UnsupportedRoomVersionError,
)
from synapse.api.ratelimiting import Ratelimiter
+from synapse.config.ratelimiting import RatelimitSettings
from synapse.events import EventBase
from synapse.types import JsonDict, Requester, StrCollection
from synapse.util.caches.response_cache import ResponseCache
@@ -94,7 +95,9 @@ class RoomSummaryHandler:
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
self._ratelimiter = Ratelimiter(
- store=self._store, clock=hs.get_clock(), rate_hz=5, burst_count=10
+ store=self._store,
+ clock=hs.get_clock(),
+ cfg=RatelimitSettings("<room summary>", per_second=5, burst_count=10),
)
# If a user tries to fetch the same page multiple times in quick succession,
diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py
index 804cc6e81e..05e21509de 100644
--- a/synapse/handlers/send_email.py
+++ b/synapse/handlers/send_email.py
@@ -23,9 +23,11 @@ from pkg_resources import parse_version
import twisted
from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IOpenSSLContextFactory
+from twisted.internet.endpoints import HostnameEndpoint
+from twisted.internet.interfaces import IOpenSSLContextFactory, IProtocolFactory
from twisted.internet.ssl import optionsForClientTLS
from twisted.mail.smtp import ESMTPSender, ESMTPSenderFactory
+from twisted.protocols.tls import TLSMemoryBIOFactory
from synapse.logging.context import make_deferred_yieldable
from synapse.types import ISynapseReactor
@@ -97,6 +99,7 @@ async def _sendmail(
**kwargs,
)
+ factory: IProtocolFactory
if _is_old_twisted:
# before twisted 21.2, we have to override the ESMTPSender protocol to disable
# TLS
@@ -110,22 +113,13 @@ async def _sendmail(
factory = build_sender_factory(hostname=smtphost if enable_tls else None)
if force_tls:
- reactor.connectSSL(
- smtphost,
- smtpport,
- factory,
- optionsForClientTLS(smtphost),
- timeout=30,
- bindAddress=None,
- )
- else:
- reactor.connectTCP(
- smtphost,
- smtpport,
- factory,
- timeout=30,
- bindAddress=None,
- )
+ factory = TLSMemoryBIOFactory(optionsForClientTLS(smtphost), True, factory)
+
+ endpoint = HostnameEndpoint(
+ reactor, smtphost, smtpport, timeout=30, bindAddress=None
+ )
+
+ await make_deferred_yieldable(endpoint.connect(factory))
await make_deferred_yieldable(d)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 7aeae5319c..4b4227003d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StreamKeyType, UserID
+from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
from synapse.util.wheel_timer import WheelTimer
if TYPE_CHECKING:
@@ -150,8 +151,15 @@ class FollowerTypingHandler:
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
)
- hosts = await self._storage_controllers.state.get_current_hosts_in_room(
- member.room_id
+ hosts: StrCollection = (
+ await self._storage_controllers.state.get_current_hosts_in_room(
+ member.room_id
+ )
+ )
+ hosts = await filter_destinations_by_retry_limiter(
+ hosts,
+ clock=self.clock,
+ store=self.store,
)
for domain in hosts:
if not self.is_mine_server_name(domain):
|