diff --git a/synapse/handlers/account.py b/synapse/handlers/account.py
index c05a14304c..fa043cca86 100644
--- a/synapse/handlers/account.py
+++ b/synapse/handlers/account.py
@@ -102,7 +102,7 @@ class AccountHandler:
"""
status = {"exists": False}
- userinfo = await self._main_store.get_userinfo_by_id(user_id.to_string())
+ userinfo = await self._main_store.get_user_by_id(user_id.to_string())
if userinfo is not None:
status = {
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 2f0e5f3b0a..ba9704a065 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -14,11 +14,11 @@
import abc
import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Set
+from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set
from synapse.api.constants import Direction, Membership
from synapse.events import EventBase
-from synapse.types import JsonDict, RoomStreamToken, StateMap, UserID
+from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -35,7 +35,7 @@ class AdminHandler:
self._state_storage_controller = self._storage_controllers.state
self._msc3866_enabled = hs.config.experimental.msc3866.enabled
- async def get_whois(self, user: UserID) -> JsonDict:
+ async def get_whois(self, user: UserID) -> JsonMapping:
connections = []
sessions = await self._store.get_user_ip_and_agents(user)
@@ -55,40 +55,32 @@ class AdminHandler:
return ret
- async def get_user(self, user: UserID) -> Optional[JsonDict]:
+ async def get_user(self, user: UserID) -> Optional[JsonMapping]:
"""Function to get user details"""
- user_info_dict = await self._store.get_user_by_id(user.to_string())
- if user_info_dict is None:
+ user_info: Optional[UserInfo] = await self._store.get_user_by_id(
+ user.to_string()
+ )
+ if user_info is None:
return None
- # Restrict returned information to a known set of fields. This prevents additional
- # fields added to get_user_by_id from modifying Synapse's external API surface.
- user_info_to_return = {
- "name",
- "admin",
- "deactivated",
- "locked",
- "shadow_banned",
- "creation_ts",
- "appservice_id",
- "consent_server_notice_sent",
- "consent_version",
- "consent_ts",
- "user_type",
- "is_guest",
- "last_seen_ts",
+ user_info_dict = {
+ "name": user.to_string(),
+ "admin": user_info.is_admin,
+ "deactivated": user_info.is_deactivated,
+ "locked": user_info.locked,
+ "shadow_banned": user_info.is_shadow_banned,
+ "creation_ts": user_info.creation_ts,
+ "appservice_id": user_info.appservice_id,
+ "consent_server_notice_sent": user_info.consent_server_notice_sent,
+ "consent_version": user_info.consent_version,
+ "consent_ts": user_info.consent_ts,
+ "user_type": user_info.user_type,
+ "is_guest": user_info.is_guest,
}
if self._msc3866_enabled:
# Only include the approved flag if support for MSC3866 is enabled.
- user_info_to_return.add("approved")
-
- # Restrict returned keys to a known set.
- user_info_dict = {
- key: value
- for key, value in user_info_dict.items()
- if key in user_info_to_return
- }
+ user_info_dict["approved"] = user_info.approved
# Add additional user metadata
profile = await self._store.get_profileinfo(user)
@@ -105,6 +97,9 @@ class AdminHandler:
user_info_dict["external_ids"] = external_ids
user_info_dict["erased"] = await self._store.is_user_erased(user.to_string())
+ last_seen_ts = await self._store.get_last_seen_for_user_id(user.to_string())
+ user_info_dict["last_seen_ts"] = last_seen_ts
+
return user_info_dict
async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> Any:
@@ -349,7 +344,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def write_profile(self, profile: JsonDict) -> None:
+ def write_profile(self, profile: JsonMapping) -> None:
"""Write the profile of a user.
Args:
@@ -358,7 +353,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def write_devices(self, devices: List[JsonDict]) -> None:
+ def write_devices(self, devices: Sequence[JsonMapping]) -> None:
"""Write the devices of a user.
Args:
@@ -367,7 +362,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def write_connections(self, connections: List[JsonDict]) -> None:
+ def write_connections(self, connections: Sequence[JsonMapping]) -> None:
"""Write the connections of a user.
Args:
@@ -377,7 +372,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write_account_data(
- self, file_name: str, account_data: Mapping[str, JsonDict]
+ self, file_name: str, account_data: Mapping[str, JsonMapping]
) -> None:
"""Write the account data of a user.
@@ -388,7 +383,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def write_media_id(self, media_id: str, media_metadata: JsonDict) -> None:
+ def write_media_id(self, media_id: str, media_metadata: JsonMapping) -> None:
"""Write the media's metadata of a user.
Exports only the metadata, as this can be fetched from the database via
read only. In order to access the files, a connection to the correct
diff --git a/synapse/handlers/cas.py b/synapse/handlers/cas.py
index a850545453..b5b8b9bd35 100644
--- a/synapse/handlers/cas.py
+++ b/synapse/handlers/cas.py
@@ -70,6 +70,7 @@ class CasHandler:
self._cas_protocol_version = hs.config.cas.cas_protocol_version
self._cas_displayname_attribute = hs.config.cas.cas_displayname_attribute
self._cas_required_attributes = hs.config.cas.cas_required_attributes
+ self._cas_enable_registration = hs.config.cas.cas_enable_registration
self._http_client = hs.get_proxied_http_client()
@@ -395,4 +396,5 @@ class CasHandler:
client_redirect_url,
cas_response_to_user_attributes,
grandfather_existing_users,
+ registration_enabled=self._cas_enable_registration,
)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 763f56dfc1..86ad96d030 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.types import (
JsonDict,
+ JsonMapping,
+ ScheduledTask,
StrCollection,
StreamKeyType,
StreamToken,
+ TaskStatus,
UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
@@ -55,13 +58,17 @@ from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.cancellation import cancellable
from synapse.util.metrics import measure_func
-from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.retryutils import (
+ NotRetryingDestination,
+ filter_destinations_by_retry_limiter,
+)
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
+DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
MAX_DEVICE_DISPLAY_NAME_LEN = 100
DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
@@ -78,14 +85,20 @@ class DeviceWorkerHandler:
self._appservice_handler = hs.get_application_service_handler()
self._state_storage = hs.get_storage_controllers().state
self._auth_handler = hs.get_auth_handler()
+ self._event_sources = hs.get_event_sources()
self.server_name = hs.hostname
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
self._query_appservices_for_keys = (
hs.config.experimental.msc3984_appservice_key_query
)
+ self._task_scheduler = hs.get_task_scheduler()
self.device_list_updater = DeviceListWorkerUpdater(hs)
+ self._task_scheduler.register_action(
+ self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
+ )
+
@trace
async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
"""
@@ -375,6 +388,33 @@ class DeviceWorkerHandler:
"Trying handling device list state for partial join: not supported on workers."
)
+ DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
+ DEVICE_MSGS_DELETE_SLEEP_MS = 1000
+
+ async def _delete_device_messages(
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
+ assert task.params is not None
+ user_id = task.params["user_id"]
+ device_id = task.params["device_id"]
+ up_to_stream_id = task.params["up_to_stream_id"]
+
+ # Delete the messages in batches to avoid too much DB load.
+ while True:
+ res = await self.store.delete_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ up_to_stream_id=up_to_stream_id,
+ limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+ )
+
+ if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+ return TaskStatus.COMPLETE, None, None
+
+ await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
+
class DeviceHandler(DeviceWorkerHandler):
device_list_updater: "DeviceListUpdater"
@@ -530,6 +570,7 @@ class DeviceHandler(DeviceWorkerHandler):
user_id: The user to delete devices from.
device_ids: The list of device IDs to delete
"""
+ to_device_stream_id = self._event_sources.get_current_token().to_device_key
try:
await self.store.delete_devices(user_id, device_ids)
@@ -559,6 +600,17 @@ class DeviceHandler(DeviceWorkerHandler):
f"org.matrix.msc3890.local_notification_settings.{device_id}",
)
+ # Delete device messages asynchronously and in batches using the task scheduler
+ await self._task_scheduler.schedule_task(
+ DELETE_DEVICE_MSGS_TASK_NAME,
+ resource_id=device_id,
+ params={
+ "user_id": user_id,
+ "device_id": device_id,
+ "up_to_stream_id": to_device_stream_id,
+ },
+ )
+
# Pushers are deleted after `delete_access_tokens_for_user` is called so that
# modules using `on_logged_out` hook can use them if needed.
await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
@@ -707,12 +759,13 @@ class DeviceHandler(DeviceWorkerHandler):
# If the dehydrated device was successfully deleted (the device ID
# matched the stored dehydrated device), then modify the access
- # token to use the dehydrated device's ID and copy the old device
- # display name to the dehydrated device, and destroy the old device
- # ID
+ # token and refresh token to use the dehydrated device's ID and
+ # copy the old device display name to the dehydrated device,
+ # and destroy the old device ID
old_device_id = await self.store.set_device_for_access_token(
access_token, device_id
)
+ await self.store.set_device_for_refresh_token(user_id, old_device_id, device_id)
old_device = await self.store.get_device(user_id, old_device_id)
if old_device is None:
raise errors.NotFoundError()
@@ -982,7 +1035,7 @@ class DeviceListWorkerUpdater:
async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
- ) -> Dict[str, Optional[JsonDict]]:
+ ) -> Dict[str, Optional[JsonMapping]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
@@ -1011,6 +1064,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
self._notifier = hs.get_notifier()
self._remote_edu_linearizer = Linearizer(name="remote_device_list")
+ self._resync_linearizer = Linearizer(name="remote_device_resync")
# user_id -> list of updates waiting to be handled.
self._pending_updates: Dict[
@@ -1220,8 +1274,18 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
self._resync_retry_in_progress = True
# Get all of the users that need resyncing.
need_resync = await self.store.get_user_ids_requiring_device_list_resync()
+
+ # Filter out users whose host is marked as "down" up front.
+ hosts = await filter_destinations_by_retry_limiter(
+ {get_domain_from_id(u) for u in need_resync}, self.clock, self.store
+ )
+ hosts = set(hosts)
+
# Iterate over the set of user IDs.
for user_id in need_resync:
+ if get_domain_from_id(user_id) not in hosts:
+ continue
+
try:
# Try to resync the current user's devices list.
result = (await self.multi_user_device_resync([user_id], False))[
@@ -1253,7 +1317,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
async def multi_user_device_resync(
self, user_ids: List[str], mark_failed_as_stale: bool = True
- ) -> Dict[str, Optional[JsonDict]]:
+ ) -> Dict[str, Optional[JsonMapping]]:
"""
Like `user_device_resync` but operates on multiple users **from the same origin**
at once.
@@ -1273,9 +1337,11 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
failed = set()
# TODO(Perf): Actually batch these up
for user_id in user_ids:
- user_result, user_failed = await self._user_device_resync_returning_failed(
- user_id
- )
+ async with self._resync_linearizer.queue(user_id):
+ (
+ user_result,
+ user_failed,
+ ) = await self._user_device_resync_returning_failed(user_id)
result[user_id] = user_result
if user_failed:
failed.add(user_id)
@@ -1287,7 +1353,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
async def _user_device_resync_returning_failed(
self, user_id: str
- ) -> Tuple[Optional[JsonDict], bool]:
+ ) -> Tuple[Optional[JsonMapping], bool]:
"""Fetches all devices for a user and updates the device cache with them.
Args:
@@ -1300,6 +1366,12 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
e.g. due to a connection problem.
- True iff the resync failed and the device list should be marked as stale.
"""
+ # Check that we haven't gone and fetched the devices since we last
+ # checked if we needed to resync these device lists.
+ if await self.store.get_users_whose_devices_are_cached([user_id]):
+ cached = await self.store.get_cached_devices_for_user(user_id)
+ return cached, False
+
logger.debug("Attempting to resync the device list for %s", user_id)
log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index d32d224d56..eedde97ab0 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -723,12 +723,11 @@ class FederationEventHandler:
if not prevs - seen:
return
- latest_list = await self._store.get_latest_event_ids_in_room(room_id)
+ latest_frozen = await self._store.get_latest_event_ids_in_room(room_id)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
- latest = set(latest_list)
- latest |= seen
+ latest = seen | latest_frozen
logger.info(
"Requesting missing events between %s and %s",
@@ -1976,8 +1975,7 @@ class FederationEventHandler:
# partial and full state and may not be accurate.
return
- extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
- extrem_ids = set(extrem_ids_list)
+ extrem_ids = await self._store.get_latest_event_ids_in_room(event.room_id)
prev_event_ids = set(event.prev_event_ids())
if extrem_ids == prev_event_ids:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index b3be7a86f0..5dc76ef588 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,7 +13,7 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, List, Optional, Tuple
from synapse.api.constants import (
AccountDataTypes,
@@ -23,7 +23,6 @@ from synapse.api.constants import (
Membership,
)
from synapse.api.errors import SynapseError
-from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
@@ -35,7 +34,6 @@ from synapse.types import (
JsonDict,
Requester,
RoomStreamToken,
- StateMap,
StreamKeyType,
StreamToken,
UserID,
@@ -199,9 +197,7 @@ class InitialSyncHandler:
deferred_room_state = run_in_background(
self._state_storage_controller.get_state_for_events,
[event.event_id],
- ).addCallback(
- lambda states: cast(StateMap[EventBase], states[event.event_id])
- )
+ ).addCallback(lambda states: states[event.event_id])
(messages, token), current_state = await make_deferred_yieldable(
gather_results(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d6be18cdef..c036578a3d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -828,13 +828,13 @@ class EventCreationHandler:
u = await self.store.get_user_by_id(user_id)
assert u is not None
- if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
+ if u.user_type in (UserTypes.SUPPORT, UserTypes.BOT):
# support and bot users are not required to consent
return
- if u["appservice_id"] is not None:
+ if u.appservice_id is not None:
# users registered by an appservice are exempt
return
- if u["consent_version"] == self.config.consent.user_consent_version:
+ if u.consent_version == self.config.consent.user_consent_version:
return
consent_uri = self._consent_uri_builder.build_user_consent_uri(user.localpart)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e5ac9096cc..878f267a4e 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Set
-
-import attr
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, cast
from twisted.python.failure import Failure
@@ -23,16 +21,22 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomResponse
+from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
+from synapse.types import (
+ JsonDict,
+ JsonMapping,
+ Requester,
+ ScheduledTask,
+ StreamKeyType,
+ TaskStatus,
+)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
-from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -53,80 +57,11 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock"
-@attr.s(slots=True, auto_attribs=True)
-class PurgeStatus:
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
- """
-
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- # Save the error message if an error occurs
- error: str = ""
-
- # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
- status: int = STATUS_ACTIVE
-
- def asdict(self) -> JsonDict:
- ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
- if self.error:
- ret["error"] = self.error
- return ret
-
-
-@attr.s(slots=True, auto_attribs=True)
-class DeleteStatus:
- """Object tracking the status of a delete room request
+PURGE_HISTORY_ACTION_NAME = "purge_history"
- This class contains information on the progress of a delete room request, for
- return by get_delete_status.
- """
+PURGE_ROOM_ACTION_NAME = "purge_room"
- STATUS_PURGING = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
- STATUS_SHUTTING_DOWN = 3
-
- STATUS_TEXT = {
- STATUS_PURGING: "purging",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- STATUS_SHUTTING_DOWN: "shutting_down",
- }
-
- # Tracks whether this request has completed.
- # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
- status: int = STATUS_PURGING
-
- # Save the error message if an error occurs
- error: str = ""
-
- # Saves the result of an action to give it back to REST API
- shutdown_room: ShutdownRoomResponse = {
- "kicked_users": [],
- "failed_to_kick_users": [],
- "local_aliases": [],
- "new_room_id": None,
- }
-
- def asdict(self) -> JsonDict:
- ret = {
- "status": DeleteStatus.STATUS_TEXT[self.status],
- "shutdown_room": self.shutdown_room,
- }
- if self.error:
- ret["error"] = self.error
- return ret
+SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"
class PaginationHandler:
@@ -136,9 +71,6 @@ class PaginationHandler:
paginating during a purge.
"""
- # when to remove a completed deletion/purge from the results map
- CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
-
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -150,17 +82,11 @@ class PaginationHandler:
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
self._worker_locks = hs.get_worker_locks_handler()
+ self._task_scheduler = hs.get_task_scheduler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
self._purges_in_progress_by_room: Set[str] = set()
- # map from purge id to PurgeStatus
- self._purges_by_id: Dict[str, PurgeStatus] = {}
- # map from purge id to DeleteStatus
- self._delete_by_id: Dict[str, DeleteStatus] = {}
- # map from room id to delete ids
- # Dict[`room_id`, List[`delete_id`]]
- self._delete_by_room: Dict[str, List[str]] = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = (
@@ -173,6 +99,9 @@ class PaginationHandler:
self._retention_allowed_lifetime_max = (
hs.config.retention.retention_allowed_lifetime_max
)
+ self._forgotten_room_retention_period = (
+ hs.config.server.forgotten_room_retention_period
+ )
self._is_master = hs.config.worker.worker_app is None
if hs.config.retention.retention_enabled and self._is_master:
@@ -189,6 +118,14 @@ class PaginationHandler:
job.longest_max_lifetime,
)
+ self._task_scheduler.register_action(
+ self._purge_history, PURGE_HISTORY_ACTION_NAME
+ )
+ self._task_scheduler.register_action(self._purge_room, PURGE_ROOM_ACTION_NAME)
+ self._task_scheduler.register_action(
+ self._shutdown_and_purge_room, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME
+ )
+
async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
) -> None:
@@ -224,7 +161,7 @@ class PaginationHandler:
include_null = False
logger.info(
- "[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)",
+ "[purge] Running retention purge job for %s < max_lifetime <= %s (include NULLs = %s)",
min_ms,
max_ms,
include_null,
@@ -239,10 +176,10 @@ class PaginationHandler:
for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
- if room_id in self._purges_in_progress_by_room:
+ if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
logger.warning(
- "[purge] not purging room %s as there's an ongoing purge running"
- " for this room",
+ "[purge] not purging room %s for retention as there's an ongoing purge"
+ " running for this room",
room_id,
)
continue
@@ -295,27 +232,20 @@ class PaginationHandler:
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
- purge_id = random_string(16)
-
- self._purges_by_id[purge_id] = PurgeStatus()
-
- logger.info(
- "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
- )
+ logger.info("Starting purging events in room %s", room_id)
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
- "_purge_history",
- self._purge_history,
- purge_id,
+ PURGE_HISTORY_ACTION_NAME,
+ self.purge_history,
room_id,
token,
True,
)
- def start_purge_history(
+ async def start_purge_history(
self, room_id: str, token: str, delete_local_events: bool = False
) -> str:
"""Start off a history purge on a room.
@@ -329,40 +259,58 @@ class PaginationHandler:
Returns:
unique ID for this purge transaction.
"""
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400, "History purge already in progress for %s" % (room_id,)
- )
-
- purge_id = random_string(16)
+ purge_id = await self._task_scheduler.schedule_task(
+ PURGE_HISTORY_ACTION_NAME,
+ resource_id=room_id,
+ params={"token": token, "delete_local_events": delete_local_events},
+ )
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
- self._purges_by_id[purge_id] = PurgeStatus()
- run_as_background_process(
- "purge_history",
- self._purge_history,
- purge_id,
- room_id,
- token,
- delete_local_events,
- )
return purge_id
async def _purge_history(
- self, purge_id: str, room_id: str, token: str, delete_local_events: bool
- ) -> None:
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """
+ Scheduler action to purge some history of a room.
+ """
+ if (
+ task.resource_id is None
+ or task.params is None
+ or "token" not in task.params
+ or "delete_local_events" not in task.params
+ ):
+ return (
+ TaskStatus.FAILED,
+ None,
+ "Not enough parameters passed to _purge_history",
+ )
+ err = await self.purge_history(
+ task.resource_id,
+ task.params["token"],
+ task.params["delete_local_events"],
+ )
+ if err is not None:
+ return TaskStatus.FAILED, None, err
+ return TaskStatus.COMPLETE, None, None
+
+ async def purge_history(
+ self,
+ room_id: str,
+ token: str,
+ delete_local_events: bool,
+ ) -> Optional[str]:
"""Carry out a history purge on a room.
Args:
- purge_id: The ID for this purge.
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
"""
- self._purges_in_progress_by_room.add(room_id)
try:
async with self._worker_locks.acquire_read_write_lock(
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
@@ -371,57 +319,68 @@ class PaginationHandler:
room_id, token, delete_local_events
)
logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ return None
except Exception:
f = Failure()
logger.error(
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
)
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- self._purges_by_id[purge_id].error = f.getErrorMessage()
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge() -> None:
- del self._purges_by_id[purge_id]
-
- self.hs.get_reactor().callLater(
- PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
- )
-
- def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
- """Get the current status of an active purge
+ return f.getErrorMessage()
- Args:
- purge_id: purge_id returned by start_purge_history
- """
- return self._purges_by_id.get(purge_id)
-
- def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+ async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active deleting
Args:
delete_id: delete_id returned by start_shutdown_and_purge_room
+ or start_purge_history.
"""
- return self._delete_by_id.get(delete_id)
+ return await self._task_scheduler.get_task(delete_id)
- def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
- """Get all active delete ids by room
+ async def get_delete_tasks_by_room(
+ self, room_id: str, only_active: Optional[bool] = False
+ ) -> List[ScheduledTask]:
+ """Get complete, failed or active delete tasks by room
Args:
room_id: room_id that is deleted
+ only_active: if True, completed&failed tasks will be omitted
+ """
+ statuses = [TaskStatus.ACTIVE]
+ if not only_active:
+ statuses += [TaskStatus.COMPLETE, TaskStatus.FAILED]
+
+ return await self._task_scheduler.get_tasks(
+ actions=[PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME],
+ resource_id=room_id,
+ statuses=statuses,
+ )
+
+ async def _purge_room(
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """
+ Scheduler action to purge a room.
"""
- return self._delete_by_room.get(room_id)
+ if not task.resource_id:
+ raise Exception("No room id passed to purge_room task")
+ params = task.params if task.params else {}
+ await self.purge_room(task.resource_id, params.get("force", False))
+ return TaskStatus.COMPLETE, None, None
- async def purge_room(self, room_id: str, force: bool = False) -> None:
+ async def purge_room(
+ self,
+ room_id: str,
+ force: bool,
+ ) -> None:
"""Purge the given room from the database.
- This function is part the delete room v1 API.
Args:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
+ logger.info("starting purge room_id=%s force=%s", room_id, force)
+
async with self._worker_locks.acquire_multi_read_write_lock(
[
(PURGE_PAGINATION_LOCK_NAME, room_id),
@@ -430,13 +389,20 @@ class PaginationHandler:
write=True,
):
# first check that we have no users in this room
- if not force:
- joined = await self.store.is_host_joined(room_id, self._server_name)
- if joined:
+ joined = await self.store.is_host_joined(room_id, self._server_name)
+ if joined:
+ if force:
+ logger.info(
+ "force-purging room %s with some local users still joined",
+ room_id,
+ )
+ else:
raise SynapseError(400, "Users are still joined to this room")
await self._storage_controllers.purge_events.purge_room(room_id)
+ logger.info("purge complete for room_id %s", room_id)
+
@trace
async def get_messages(
self,
@@ -711,169 +677,72 @@ class PaginationHandler:
async def _shutdown_and_purge_room(
self,
- delete_id: str,
- room_id: str,
- requester_user_id: str,
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- purge: bool = True,
- force_purge: bool = False,
- ) -> None:
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""
- Shuts down and purges a room.
-
- See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
-
- Args:
- delete_id: The ID for this delete.
- room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action. Will be recorded as putting the room on the
- blocking list.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
- purge:
- If set to `true`, purge the given room from the database.
- force_purge:
- If set to `true`, the room will be purged from database
- also if it fails to remove some users from room.
-
- Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+ Scheduler action to shutdown and purge a room.
"""
+ if task.resource_id is None or task.params is None:
+ raise Exception(
+ "No room id and/or no parameters passed to shutdown_and_purge_room task"
+ )
- self._purges_in_progress_by_room.add(room_id)
- try:
- async with self._worker_locks.acquire_read_write_lock(
- PURGE_PAGINATION_LOCK_NAME, room_id, write=True
- ):
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
- self._delete_by_id[
- delete_id
- ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
- room_id=room_id,
- requester_user_id=requester_user_id,
- new_room_user_id=new_room_user_id,
- new_room_name=new_room_name,
- message=message,
- block=block,
- )
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
+ room_id = task.resource_id
- if purge:
- logger.info("starting purge room_id %s", room_id)
+ async def update_result(result: Optional[JsonMapping]) -> None:
+ await self._task_scheduler.update_task(task.id, result=result)
- # first check that we have no users in this room
- if not force_purge:
- joined = await self.store.is_host_joined(
- room_id, self._server_name
- )
- if joined:
- raise SynapseError(
- 400, "Users are still joined to this room"
- )
+ shutdown_result = (
+ cast(ShutdownRoomResponse, task.result) if task.result else None
+ )
- await self._storage_controllers.purge_events.purge_room(room_id)
+ shutdown_result = await self._room_shutdown_handler.shutdown_room(
+ room_id,
+ cast(ShutdownRoomParams, task.params),
+ shutdown_result,
+ update_result,
+ )
- logger.info("purge complete for room_id %s", room_id)
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
- except Exception:
- f = Failure()
- logger.error(
- "failed",
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
- self._delete_by_id[delete_id].error = f.getErrorMessage()
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the delete from the list 24 hours after it completes
- def clear_delete() -> None:
- del self._delete_by_id[delete_id]
- self._delete_by_room[room_id].remove(delete_id)
- if not self._delete_by_room[room_id]:
- del self._delete_by_room[room_id]
-
- self.hs.get_reactor().callLater(
- PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
+ if task.params.get("purge", False):
+ await self.purge_room(
+ room_id,
+ task.params.get("force_purge", False),
)
- def start_shutdown_and_purge_room(
+ return (TaskStatus.COMPLETE, shutdown_result, None)
+
+ async def start_shutdown_and_purge_room(
self,
room_id: str,
- requester_user_id: str,
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- purge: bool = True,
- force_purge: bool = False,
+ shutdown_params: ShutdownRoomParams,
) -> str:
"""Start off shut down and purge on a room.
Args:
room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action and put the room on the
- blocking list.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
- purge:
- If set to `true`, purge the given room from the database.
- force_purge:
- If set to `true`, the room will be purged from database
- also if it fails to remove some users from room.
+ shutdown_params: parameters for the shutdown
Returns:
unique ID for this delete transaction.
"""
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400, "History purge already in progress for %s" % (room_id,)
- )
+ if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
+ raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
# This check is double to `RoomShutdownHandler.shutdown_room`
# But here the requester get a direct response / error with HTTP request
# and do not have to check the purge status
+ new_room_user_id = shutdown_params["new_room_user_id"]
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
)
- delete_id = random_string(16)
+ delete_id = await self._task_scheduler.schedule_task(
+ SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
+ resource_id=room_id,
+ params=shutdown_params,
+ )
# we log the delete_id here so that it can be tied back to the
# request id in the log lines.
@@ -883,19 +752,4 @@ class PaginationHandler:
delete_id,
)
- self._delete_by_id[delete_id] = DeleteStatus()
- self._delete_by_room.setdefault(room_id, []).append(delete_id)
- run_as_background_process(
- "shutdown_and_purge_room",
- self._shutdown_and_purge_room,
- delete_id,
- room_id,
- requester_user_id,
- new_room_user_id,
- new_room_name,
- message,
- block,
- purge,
- force_purge,
- )
return delete_id
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index f31e18328b..375c7d0901 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -13,13 +13,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""This module is responsible for keeping track of presence status of local
+"""
+This module is responsible for keeping track of presence status of local
and remote users.
The methods that define policy are:
- PresenceHandler._update_states
- PresenceHandler._handle_timeouts
- should_notify
+
+# Tracking local presence
+
+For local users, presence is tracked on a per-device basis. When a user has multiple
+devices the user presence state is derived by coalescing the presence from each
+device:
+
+ BUSY > ONLINE > UNAVAILABLE > OFFLINE
+
+The time that each device was last active and last synced is tracked in order to
+automatically downgrade a device's presence state:
+
+ A device may move from ONLINE -> UNAVAILABLE, if it has not been active for
+ a period of time.
+
+ A device may go from any state -> OFFLINE, if it is not active and has not
+ synced for a period of time.
+
+The timeouts are handled using a wheel timer, which has coarse buckets. Timings
+do not need to be exact.
+
+Generally a device's presence state is updated whenever a user syncs (via the
+set_presence parameter), when the presence API is called, or if "pro-active"
+events occur, including:
+
+* Sending an event, receipt, read marker.
+* Updating typing status.
+
+The busy state has special status that it cannot is not downgraded by a call to
+sync with a lower priority state *and* it takes a long period of time to transition
+to offline.
+
+# Persisting (and restoring) presence
+
+For all users, presence is persisted on a per-user basis. Data is kept in-memory
+and persisted periodically. When Synapse starts each worker loads the current
+presence state and then tracks the presence stream to keep itself up-to-date.
+
+When restoring presence for local users a pseudo-device is created to match the
+user state; this device follows the normal timeout logic (see above) and will
+automatically be replaced with any information from currently available devices.
+
"""
import abc
import contextlib
@@ -30,6 +73,7 @@ from contextlib import contextmanager
from types import TracebackType
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Any,
Callable,
Collection,
@@ -49,7 +93,7 @@ from prometheus_client import Counter
import synapse.metrics
from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
@@ -111,6 +155,8 @@ LAST_ACTIVE_GRANULARITY = 60 * 1000
# How long to wait until a new /events or /sync request before assuming
# the client has gone.
SYNC_ONLINE_TIMEOUT = 30 * 1000
+# Busy status waits longer, but does eventually go offline.
+BUSY_ONLINE_TIMEOUT = 60 * 60 * 1000
# How long to wait before marking the user as idle. Compared against last active
IDLE_TIMER = 5 * 60 * 1000
@@ -137,6 +183,7 @@ class BasePresenceHandler(abc.ABC):
writer"""
def __init__(self, hs: "HomeServer"):
+ self.hs = hs
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
@@ -162,6 +209,7 @@ class BasePresenceHandler(abc.ABC):
self.VALID_PRESENCE += (PresenceState.BUSY,)
active_presence = self.store.take_presence_startup_info()
+ # The combined status across all user devices.
self.user_to_current_state = {state.user_id: state for state in active_presence}
@abc.abstractmethod
@@ -426,8 +474,6 @@ class _NullContextManager(ContextManager[None]):
class WorkerPresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- self.hs = hs
-
self._presence_writer_instance = hs.config.worker.writers.presence[0]
# Route presence EDUs to the right worker
@@ -691,7 +737,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- self.hs = hs
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
@@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler):
lambda: len(self.user_to_current_state),
)
+ # The per-device presence state, maps user to devices to per-device presence state.
+ self._user_to_device_to_current_state: Dict[
+ str, Dict[Optional[str], UserDevicePresenceState]
+ ] = {}
+
now = self.clock.time_msec()
if self._presence_enabled:
for state in self.user_to_current_state.values():
+ # Create a psuedo-device to properly handle time outs. This will
+ # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
+ pseudo_device_id = None
+ self._user_to_device_to_current_state[state.user_id] = {
+ pseudo_device_id: UserDevicePresenceState(
+ user_id=state.user_id,
+ device_id=pseudo_device_id,
+ state=state.state,
+ last_active_ts=state.last_active_ts,
+ last_sync_ts=state.last_user_sync_ts,
+ )
+ }
+
self.wheel_timer.insert(
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
@@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on other processes.
#
- # While any sync is ongoing on another process the user will never
+ # While any sync is ongoing on another process the user's device will never
# go offline.
#
# Each process has a unique identifier and an update frequency. If
@@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler):
timers_fired_counter.inc(len(states))
- syncing_user_ids = {
- user_id
- for (user_id, _), count in self._user_device_to_num_current_syncs.items()
+ # Set of user ID & device IDs which are currently syncing.
+ syncing_user_devices = {
+ user_id_device_id
+ for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count
}
- syncing_user_ids.update(
- user_id
- for user_id, _ in itertools.chain(
- *self.external_process_to_current_syncs.values()
- )
+ syncing_user_devices.update(
+ itertools.chain(*self.external_process_to_current_syncs.values())
)
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
- syncing_user_ids=syncing_user_ids,
+ syncing_user_devices=syncing_user_devices,
+ user_to_devices=self._user_to_device_to_current_state,
now=now,
)
@@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler):
bump_active_time_counter.inc()
- prev_state = await self.current_state_for_user(user_id)
+ now = self.clock.time_msec()
- new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
- if prev_state.state == PresenceState.UNAVAILABLE:
- new_fields["state"] = PresenceState.ONLINE
+ # Update the device information & mark the device as online if it was
+ # unavailable.
+ devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+ device_state = devices.setdefault(
+ device_id,
+ UserDevicePresenceState.default(user_id, device_id),
+ )
+ device_state.last_active_ts = now
+ if device_state.state == PresenceState.UNAVAILABLE:
+ device_state.state = PresenceState.ONLINE
+
+ # Update the user state, this will always update last_active_ts and
+ # might update the presence state.
+ prev_state = await self.current_state_for_user(user_id)
+ new_fields: Dict[str, Any] = {
+ "last_active_ts": now,
+ "state": _combine_device_states(devices.values()),
+ }
await self._update_states([prev_state.copy_and_replace(**new_fields)])
@@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler):
if is_syncing and (user_id, device_id) not in process_presence:
process_presence.add((user_id, device_id))
elif not is_syncing and (user_id, device_id) in process_presence:
+ devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+ device_state = devices.setdefault(
+ device_id, UserDevicePresenceState.default(user_id, device_id)
+ )
+ device_state.last_sync_ts = sync_time_msec
+
new_state = prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec
)
@@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
- prev_states = await self.current_state_for_users(
- {user_id for user_id, device_id in process_presence}
- )
+
time_now_ms = self.clock.time_msec()
+ # Mark each device as having a last sync time.
+ updated_users = set()
+ for user_id, device_id in process_presence:
+ device_state = self._user_to_device_to_current_state.setdefault(
+ user_id, {}
+ ).setdefault(
+ device_id, UserDevicePresenceState.default(user_id, device_id)
+ )
+
+ device_state.last_sync_ts = time_now_ms
+ updated_users.add(user_id)
+
+ # Update each user (and insert into the appropriate timers to check if
+ # they've gone offline).
+ prev_states = await self.current_state_for_users(updated_users)
await self._update_states(
[
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
@@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler):
if prev_state.state == PresenceState.BUSY and is_sync:
presence = PresenceState.BUSY
+ # Update the device specific information.
+ devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+ device_state = devices.setdefault(
+ device_id,
+ UserDevicePresenceState.default(user_id, device_id),
+ )
+ device_state.state = presence
+ device_state.last_active_ts = now
+ if is_sync:
+ device_state.last_sync_ts = now
+
+ # Based on the state of each user's device calculate the new presence state.
+ presence = _combine_device_states(devices.values())
+
new_fields = {"state": presence}
if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
@@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
def handle_timeouts(
user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool],
- syncing_user_ids: Set[str],
+ syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]],
+ user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]],
now: int,
) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as
@@ -1882,7 +1993,8 @@ def handle_timeouts(
Args:
user_states: List of UserPresenceState's to check.
is_mine_fn: Function that returns if a user_id is ours
- syncing_user_ids: Set of user_ids with active syncs.
+ syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+ user_to_devices: A map of user ID to device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
@@ -1891,9 +2003,16 @@ def handle_timeouts(
changes = {} # Actual changes we need to notify people about
for state in user_states:
- is_mine = is_mine_fn(state.user_id)
-
- new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
+ user_id = state.user_id
+ is_mine = is_mine_fn(user_id)
+
+ new_state = handle_timeout(
+ state,
+ is_mine,
+ syncing_user_devices,
+ user_to_devices.get(user_id, {}),
+ now,
+ )
if new_state:
changes[state.user_id] = new_state
@@ -1901,14 +2020,19 @@ def handle_timeouts(
def handle_timeout(
- state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
+ state: UserPresenceState,
+ is_mine: bool,
+ syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
+ user_devices: Dict[Optional[str], UserDevicePresenceState],
+ now: int,
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
- state
+ state: UserPresenceState to check.
is_mine: Whether the user is ours
- syncing_user_ids: Set of user_ids with active syncs.
+ syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+ user_devices: A map of device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
@@ -1919,34 +2043,63 @@ def handle_timeout(
return None
changed = False
- user_id = state.user_id
if is_mine:
- if state.state == PresenceState.ONLINE:
- if now - state.last_active_ts > IDLE_TIMER:
- # Currently online, but last activity ages ago so auto
- # idle
- state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
- changed = True
- elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
- # So that we send down a notification that we've
- # stopped updating.
+ # Check per-device whether the device should be considered idle or offline
+ # due to timeouts.
+ device_changed = False
+ offline_devices = []
+ for device_id, device_state in user_devices.items():
+ if device_state.state == PresenceState.ONLINE:
+ if now - device_state.last_active_ts > IDLE_TIMER:
+ # Currently online, but last activity ages ago so auto
+ # idle
+ device_state.state = PresenceState.UNAVAILABLE
+ device_changed = True
+
+ # If there are have been no sync for a while (and none ongoing),
+ # set presence to offline.
+ if (state.user_id, device_id) not in syncing_device_ids:
+ # If the user has done something recently but hasn't synced,
+ # don't set them as offline.
+ sync_or_active = max(
+ device_state.last_sync_ts, device_state.last_active_ts
+ )
+
+ # Implementations aren't meant to timeout a device with a busy
+ # state, but it needs to timeout *eventually* or else the user
+ # will be stuck in that state.
+ online_timeout = (
+ BUSY_ONLINE_TIMEOUT
+ if device_state.state == PresenceState.BUSY
+ else SYNC_ONLINE_TIMEOUT
+ )
+ if now - sync_or_active > online_timeout:
+ # Mark the device as going offline.
+ offline_devices.append(device_id)
+ device_changed = True
+
+ # Offline devices are not needed and do not add information.
+ for device_id in offline_devices:
+ user_devices.pop(device_id)
+
+ # If the presence state of the devices changed, then (maybe) update
+ # the user's overall presence state.
+ if device_changed:
+ new_presence = _combine_device_states(user_devices.values())
+ if new_presence != state.state:
+ state = state.copy_and_replace(state=new_presence)
changed = True
+ if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+ # So that we send down a notification that we've
+ # stopped updating.
+ changed = True
+
if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
# Need to send ping to other servers to ensure they don't
# timeout and set us to offline
changed = True
-
- # If there are have been no sync for a while (and none ongoing),
- # set presence to offline
- if user_id not in syncing_user_ids:
- # If the user has done something recently but hasn't synced,
- # don't set them as offline.
- sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
- if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
- state = state.copy_and_replace(state=PresenceState.OFFLINE)
- changed = True
else:
# We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that
@@ -2021,6 +2174,13 @@ def handle_update(
new_state = new_state.copy_and_replace(last_federation_update_ts=now)
federation_ping = True
+ if new_state.state == PresenceState.BUSY:
+ wheel_timer.insert(
+ now=now,
+ obj=user_id,
+ then=new_state.last_user_sync_ts + BUSY_ONLINE_TIMEOUT,
+ )
+
else:
wheel_timer.insert(
now=now,
@@ -2036,6 +2196,46 @@ def handle_update(
return new_state, persist_and_notify, federation_ping
+PRESENCE_BY_PRIORITY = {
+ PresenceState.BUSY: 4,
+ PresenceState.ONLINE: 3,
+ PresenceState.UNAVAILABLE: 2,
+ PresenceState.OFFLINE: 1,
+}
+
+
+def _combine_device_states(
+ device_states: Iterable[UserDevicePresenceState],
+) -> str:
+ """
+ Find the device to use presence information from.
+
+ Orders devices by priority, then last_active_ts.
+
+ Args:
+ device_states: An iterable of device presence states
+
+ Return:
+ The combined presence state.
+ """
+
+ # Based on (all) the user's devices calculate the new presence state.
+ presence = PresenceState.OFFLINE
+ last_active_ts = -1
+
+ # Find the device to use the presence state of based on the presence priority,
+ # but tie-break with how recently the device has been seen.
+ for device_state in device_states:
+ if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > (
+ PRESENCE_BY_PRIORITY[presence],
+ last_active_ts,
+ ):
+ presence = device_state.state
+ last_active_ts = device_state.last_active_ts
+
+ return presence
+
+
async def get_interested_parties(
store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 2bacdebfb5..c7edada353 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -37,6 +37,8 @@ class ReceiptsHandler:
self.server_name = hs.config.server.server_name
self.store = hs.get_datastores().main
self.event_auth_handler = hs.get_event_auth_handler()
+ self.event_handler = hs.get_event_handler()
+ self._storage_controllers = hs.get_storage_controllers()
self.hs = hs
@@ -81,6 +83,20 @@ class ReceiptsHandler:
)
continue
+ # Let's check that the origin server is in the room before accepting the receipt.
+ # We don't want to block waiting on a partial state so take an
+ # approximation if needed.
+ domains = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
+ room_id
+ )
+ if origin not in domains:
+ logger.info(
+ "Ignoring receipt for room %r from server %s as they're not in the room",
+ room_id,
+ origin,
+ )
+ continue
+
for receipt_type, users in room_values.items():
for user_id, user_values in users.items():
if get_domain_from_id(user_id) != origin:
@@ -158,17 +174,23 @@ class ReceiptsHandler:
self,
room_id: str,
receipt_type: str,
- user_id: str,
+ user_id: UserID,
event_id: str,
thread_id: Optional[str],
) -> None:
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
+
+ # Ensure the room/event exists, this will raise an error if the user
+ # cannot view the event.
+ if not await self.event_handler.get_event(user_id, room_id, event_id):
+ return
+
receipt = ReadReceipt(
room_id=room_id,
receipt_type=receipt_type,
- user_id=user_id,
+ user_id=user_id.to_string(),
event_ids=[event_id],
thread_id=thread_id,
data={"ts": int(self.clock.time_msec())},
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0513e28aab..a0c3b16819 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -20,7 +20,7 @@ import random
import string
from collections import OrderedDict
from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple
import attr
from typing_extensions import TypedDict
@@ -54,11 +54,11 @@ from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext
from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
-from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
+ JsonMapping,
MutableStateMap,
Requester,
RoomAlias,
@@ -454,7 +454,7 @@ class RoomCreationHandler:
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
user_id
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"You are not permitted to create rooms",
@@ -768,7 +768,7 @@ class RoomCreationHandler:
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
user_id
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"You are not permitted to create rooms",
@@ -1750,6 +1750,45 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_current_room_stream_token_for_room_id(room_id)
+class ShutdownRoomParams(TypedDict):
+ """
+ Attributes:
+ requester_user_id:
+ User who requested the action. Will be recorded as putting the room on the
+ blocking list.
+ new_room_user_id:
+ If set, a new room will be created with this user ID
+ as the creator and admin, and all users in the old room will be
+ moved into that room. If not set, no new room will be created
+ and the users will just be removed from the old room.
+ new_room_name:
+ A string representing the name of the room that new users will
+ be invited to. Defaults to `Content Violation Notification`
+ message:
+ A string containing the first message that will be sent as
+ `new_room_user_id` in the new room. Ideally this will clearly
+ convey why the original room was shut down.
+ Defaults to `Sharing illegal content on this server is not
+ permitted and rooms in violation will be blocked.`
+ block:
+ If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Defaults to `false`.
+ purge:
+ If set to `true`, purge the given room from the database.
+ force_purge:
+ If set to `true`, the room will be purged from database
+ even if there are still users joined to the room.
+ """
+
+ requester_user_id: Optional[str]
+ new_room_user_id: Optional[str]
+ new_room_name: Optional[str]
+ message: Optional[str]
+ block: bool
+ purge: bool
+ force_purge: bool
+
+
class ShutdownRoomResponse(TypedDict):
"""
Attributes:
@@ -1787,12 +1826,12 @@ class RoomShutdownHandler:
async def shutdown_room(
self,
room_id: str,
- requester_user_id: str,
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- ) -> ShutdownRoomResponse:
+ params: ShutdownRoomParams,
+ result: Optional[ShutdownRoomResponse] = None,
+ update_result_fct: Optional[
+ Callable[[Optional[JsonMapping]], Awaitable[None]]
+ ] = None,
+ ) -> Optional[ShutdownRoomResponse]:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1808,48 +1847,23 @@ class RoomShutdownHandler:
Args:
room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action and put the room on the
- blocking list.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `True`, users will be prevented from joining the old
- room. This option can also be used to pre-emptively block a room,
- even if it's unknown to this homeserver. In this case, the room
- will be blocked, and no further action will be taken. If `False`,
- attempting to delete an unknown room is invalid.
-
- Defaults to `False`.
-
- Returns: a dict containing the following keys:
- kicked_users: An array of users (`user_id`) that were kicked.
- failed_to_kick_users:
- An array of users (`user_id`) that that were not kicked.
- local_aliases:
- An array of strings representing the local aliases that were
- migrated from the old room to the new.
- new_room_id:
- A string representing the room ID of the new room, or None if
- no such room was created.
- """
+ delete_id: The delete ID identifying this delete request
+ params: parameters for the shutdown, cf `ShutdownRoomParams`
+ result: current status of the shutdown, if it was interrupted
+ update_result_fct: function called when `result` is updated locally
- if not new_room_name:
- new_room_name = self.DEFAULT_ROOM_NAME
- if not message:
- message = self.DEFAULT_MESSAGE
+ Returns: a dict matching `ShutdownRoomResponse`.
+ """
+ requester_user_id = params["requester_user_id"]
+ new_room_user_id = params["new_room_user_id"]
+ block = params["block"]
+
+ new_room_name = (
+ params["new_room_name"]
+ if params["new_room_name"]
+ else self.DEFAULT_ROOM_NAME
+ )
+ message = params["message"] if params["message"] else self.DEFAULT_MESSAGE
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
@@ -1861,22 +1875,33 @@ class RoomShutdownHandler:
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
+ result = (
+ result
+ if result
+ else {
+ "kicked_users": [],
+ "failed_to_kick_users": [],
+ "local_aliases": [],
+ "new_room_id": None,
+ }
+ )
+
# Action the block first (even if the room doesn't exist yet)
if block:
+ if requester_user_id is None:
+ raise ValueError(
+ "shutdown_room: block=True not allowed when requester_user_id is None."
+ )
# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
await self.store.block_room(room_id, requester_user_id)
if not await self.store.get_room(room_id):
# if we don't know about the room, there is nothing left to do.
- return {
- "kicked_users": [],
- "failed_to_kick_users": [],
- "local_aliases": [],
- "new_room_id": None,
- }
+ return result
- if new_room_user_id is not None:
+ new_room_id = result.get("new_room_id")
+ if new_room_user_id is not None and new_room_id is None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
@@ -1896,6 +1921,10 @@ class RoomShutdownHandler:
ratelimit=False,
)
+ result["new_room_id"] = new_room_id
+ if update_result_fct:
+ await update_result_fct(result)
+
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
)
@@ -1909,12 +1938,9 @@ class RoomShutdownHandler:
stream_id,
)
else:
- new_room_id = None
logger.info("Shutting down room %r", room_id)
users = await self.store.get_users_in_room(room_id)
- kicked_users = []
- failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
@@ -1943,7 +1969,9 @@ class RoomShutdownHandler:
stream_id,
)
- await self.room_member_handler.forget(target_requester.user, room_id)
+ await self.room_member_handler.forget(
+ target_requester.user, room_id, do_not_schedule_purge=True
+ )
# Join users to new room
if new_room_user_id:
@@ -1958,15 +1986,23 @@ class RoomShutdownHandler:
require_consent=False,
)
- kicked_users.append(user_id)
+ result["kicked_users"].append(user_id)
+ if update_result_fct:
+ await update_result_fct(result)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
- failed_to_kick_users.append(user_id)
+ result["failed_to_kick_users"].append(user_id)
+ if update_result_fct:
+ await update_result_fct(result)
# Send message in new room and move aliases
if new_room_user_id:
+ room_creator_requester = create_requester(
+ new_room_user_id, authenticated_entity=requester_user_id
+ )
+
await self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
@@ -1978,18 +2014,15 @@ class RoomShutdownHandler:
ratelimit=False,
)
- aliases_for_room = await self.store.get_aliases_for_room(room_id)
+ result["local_aliases"] = list(
+ await self.store.get_aliases_for_room(room_id)
+ )
assert new_room_id is not None
await self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
else:
- aliases_for_room = []
+ result["local_aliases"] = []
- return {
- "kicked_users": kicked_users,
- "failed_to_kick_users": failed_to_kick_users,
- "local_aliases": list(aliases_for_room),
- "new_room_id": new_room_id,
- }
+ return result
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index de0f04e3fe..90343c2306 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -37,13 +37,13 @@ from synapse.api.ratelimiting import Ratelimiter
from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.handlers.pagination import PURGE_ROOM_ACTION_NAME
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.module_api import NOT_SPAM
from synapse.types import (
JsonDict,
Requester,
@@ -169,6 +169,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
+ self._forgotten_room_retention_period = (
+ hs.config.server.forgotten_room_retention_period
+ )
+
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
@@ -278,7 +282,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
- async def forget(self, user: UserID, room_id: str) -> None:
+ async def forget(
+ self, user: UserID, room_id: str, do_not_schedule_purge: bool = False
+ ) -> None:
user_id = user.to_string()
member = await self._storage_controllers.state.get_current_state_event(
@@ -298,6 +304,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# the table `current_state_events` and `get_current_state_events` is `None`.
await self.store.forget(user_id, room_id)
+ # If everyone locally has left the room, then there is no reason for us to keep the
+ # room around and we automatically purge room after a little bit
+ if (
+ not do_not_schedule_purge
+ and self._forgotten_room_retention_period
+ and await self.store.is_locally_forgotten_room(room_id)
+ ):
+ await self.hs.get_task_scheduler().schedule_task(
+ PURGE_ROOM_ACTION_NAME,
+ resource_id=room_id,
+ timestamp=self.clock.time_msec()
+ + self._forgotten_room_retention_period,
+ )
+
async def ratelimit_multiple_invites(
self,
requester: Optional[Requester],
@@ -804,7 +824,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
spam_check = await self._spam_checker_module_callbacks.user_may_invite(
requester.user.to_string(), target_id, room_id
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking invite due to spam checker")
block_invite_result = spam_check
@@ -939,7 +959,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
target.to_string(), room_id, is_invited=inviter is not None
)
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"Not allowed to join this room",
@@ -1557,7 +1577,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_id=room_id,
)
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"Cannot send threepid invite",
diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py
index 05e21509de..657d9b3559 100644
--- a/synapse/handlers/send_email.py
+++ b/synapse/handlers/send_email.py
@@ -17,7 +17,7 @@ import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from io import BytesIO
-from typing import TYPE_CHECKING, Any, Optional
+from typing import TYPE_CHECKING, Any, Dict, Optional
from pkg_resources import parse_version
@@ -151,6 +151,7 @@ class SendEmailHandler:
app_name: str,
html: str,
text: str,
+ additional_headers: Optional[Dict[str, str]] = None,
) -> None:
"""Send a multipart email with the given information.
@@ -160,6 +161,7 @@ class SendEmailHandler:
app_name: The app name to include in the From header.
html: The HTML content to include in the email.
text: The plain text content to include in the email.
+ additional_headers: A map of additional headers to include.
"""
try:
from_string = self._from % {"app": app_name}
@@ -172,8 +174,8 @@ class SendEmailHandler:
if raw_to == "":
raise RuntimeError("Invalid 'to' address")
- html_part = MIMEText(html, "html", "utf8")
- text_part = MIMEText(text, "plain", "utf8")
+ html_part = MIMEText(html, "html", "utf-8")
+ text_part = MIMEText(text, "plain", "utf-8")
multipart_msg = MIMEMultipart("alternative")
multipart_msg["Subject"] = subject
@@ -181,6 +183,7 @@ class SendEmailHandler:
multipart_msg["To"] = email_address
multipart_msg["Date"] = email.utils.formatdate()
multipart_msg["Message-ID"] = email.utils.make_msgid()
+
# Discourage automatic responses to Synapse's emails.
# Per RFC 3834, automatic responses should not be sent if the "Auto-Submitted"
# header is present with any value other than "no". See
@@ -194,6 +197,11 @@ class SendEmailHandler:
# https://stackoverflow.com/a/25324691/5252017
# https://stackoverflow.com/a/61646381/5252017
multipart_msg["X-Auto-Response-Suppress"] = "All"
+
+ if additional_headers:
+ for header, value in additional_headers.items():
+ multipart_msg[header] = value
+
multipart_msg.attach(text_part)
multipart_msg.attach(html_part)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60a9f341b5..1a4d394eda 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
+from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
from synapse.handlers.relations import BundledAggregations
from synapse.logging import issue9533_logger
from synapse.logging.context import current_context
@@ -56,6 +57,7 @@ from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
JsonDict,
+ JsonMapping,
MutableStateMap,
Requester,
RoomStreamToken,
@@ -268,6 +270,7 @@ class SyncHandler:
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._device_handler = hs.get_device_handler()
+ self._task_scheduler = hs.get_task_scheduler()
self.should_calculate_push_rules = hs.config.push.enable_push
@@ -360,13 +363,36 @@ class SyncHandler:
# (since we now know that the device has received them)
if since_token is not None:
since_stream_id = since_token.to_device_key
+ # Fast path: delete a limited number of to-device messages up front.
+ # We do this to avoid the overhead of scheduling a task for every
+ # sync.
+ device_deletion_limit = 100
deleted = await self.store.delete_messages_for_device(
- sync_config.user.to_string(), sync_config.device_id, since_stream_id
+ sync_config.user.to_string(),
+ sync_config.device_id,
+ since_stream_id,
+ limit=device_deletion_limit,
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
)
+ # If we hit the limit, schedule a background task to delete the rest.
+ if deleted >= device_deletion_limit:
+ await self._task_scheduler.schedule_task(
+ DELETE_DEVICE_MSGS_TASK_NAME,
+ resource_id=sync_config.device_id,
+ params={
+ "user_id": sync_config.user.to_string(),
+ "device_id": sync_config.device_id,
+ "up_to_stream_id": since_stream_id,
+ },
+ )
+ logger.debug(
+ "Deletion of to-device messages up to %d scheduled",
+ since_stream_id,
+ )
+
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
@@ -1768,19 +1794,23 @@ class SyncHandler:
)
if push_rules_changed:
- global_account_data = dict(global_account_data)
- global_account_data[
- AccountDataTypes.PUSH_RULES
- ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
+ global_account_data = {
+ AccountDataTypes.PUSH_RULES: await self._push_rules_handler.push_rules_for_user(
+ sync_config.user
+ ),
+ **global_account_data,
+ }
else:
all_global_account_data = await self.store.get_global_account_data_for_user(
user_id
)
- global_account_data = dict(all_global_account_data)
- global_account_data[
- AccountDataTypes.PUSH_RULES
- ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
+ global_account_data = {
+ AccountDataTypes.PUSH_RULES: await self._push_rules_handler.push_rules_for_user(
+ sync_config.user
+ ),
+ **all_global_account_data,
+ }
account_data_for_user = (
await sync_config.filter_collection.filter_global_account_data(
@@ -1884,7 +1914,7 @@ class SyncHandler:
blocks_all_rooms
or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data()
):
- account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {}
+ account_data_by_room: Mapping[str, Mapping[str, JsonMapping]] = {}
elif since_token and not sync_result_builder.full_state:
account_data_by_room = (
await self.store.get_updated_room_account_data_for_user(
@@ -2324,8 +2354,8 @@ class SyncHandler:
sync_result_builder: "SyncResultBuilder",
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
- tags: Optional[Mapping[str, Mapping[str, Any]]],
- account_data: Mapping[str, JsonDict],
+ tags: Optional[Mapping[str, JsonMapping]],
+ account_data: Mapping[str, JsonMapping],
always_include: bool = False,
) -> None:
"""Populates the `joined` and `archived` section of `sync_result_builder`
|