diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index be3203ac80..85157a138b 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -234,7 +234,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write_invite(
- self, room_id: str, event: EventBase, state: StateMap[dict]
+ self, room_id: str, event: EventBase, state: StateMap[EventBase]
) -> None:
"""Write an invite for the room, with associated invite state.
@@ -248,7 +248,7 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
@abc.abstractmethod
def write_knock(
- self, room_id: str, event: EventBase, state: StateMap[dict]
+ self, room_id: str, event: EventBase, state: StateMap[EventBase]
) -> None:
"""Write a knock for the room, with associated knock state.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ddc9105ee9..9abdad262b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -188,7 +188,7 @@ class ApplicationServicesHandler:
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
- users: Optional[Collection[Union[str, UserID]]] = None,
+ users: Collection[Union[str, UserID]],
) -> None:
"""
This is called by the notifier in the background when an ephemeral event is handled
@@ -203,7 +203,9 @@ class ApplicationServicesHandler:
value for `stream_key` will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
- them.
+ receiving them by setting `push_ephemeral` to true in their registration
+ file. Note that while MSC2409 is experimental, this option is called
+ `de.sorunome.msc2409.push_ephemeral`.
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
@@ -214,6 +216,7 @@ class ApplicationServicesHandler:
if not self.notify_appservices:
return
+ # Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
@@ -230,18 +233,25 @@ class ApplicationServicesHandler:
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
+ # Check whether there are any appservices which have registered to receive
+ # ephemeral events.
+ #
+ # Note that whether these events are actually relevant to these appservices
+ # is decided later on.
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services:
+ # Bail out early if none of the target appservices have explicitly registered
+ # to receive these ephemeral events.
return
# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
- services, stream_key, new_token, users or []
+ services, stream_key, new_token, users
)
@wrap_as_background_process("notify_interested_services_ephemeral")
@@ -252,7 +262,7 @@ class ApplicationServicesHandler:
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
- logger.debug("Checking interested services for %s" % (stream_key))
+ logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
@@ -345,6 +355,9 @@ class ApplicationServicesHandler:
Args:
service: The application service to check for which events it should receive.
+ new_token: A receipts event stream token. Purely used to double-check that the
+ from_token we pull from the database isn't greater than or equal to this
+ token. Prevents accidentally duplicating work.
Returns:
A list of JSON dictionaries containing data derived from the read receipts that
@@ -382,6 +395,9 @@ class ApplicationServicesHandler:
Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.
+ new_token: A presence update stream token. Purely used to double-check that the
+ from_token we pull from the database isn't greater than or equal to this
+ token. Prevents accidentally duplicating work.
Returns:
A list of json dictionaries containing data derived from the presence events
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 60e59d11a0..4b66a9862f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -790,10 +790,10 @@ class AuthHandler:
(
new_refresh_token,
new_refresh_token_id,
- ) = await self.get_refresh_token_for_user_id(
+ ) = await self.create_refresh_token_for_user_id(
user_id=existing_token.user_id, device_id=existing_token.device_id
)
- access_token = await self.get_access_token_for_user_id(
+ access_token = await self.create_access_token_for_user_id(
user_id=existing_token.user_id,
device_id=existing_token.device_id,
valid_until_ms=valid_until_ms,
@@ -832,7 +832,7 @@ class AuthHandler:
return True
- async def get_refresh_token_for_user_id(
+ async def create_refresh_token_for_user_id(
self,
user_id: str,
device_id: str,
@@ -855,7 +855,7 @@ class AuthHandler:
)
return refresh_token, refresh_token_id
- async def get_access_token_for_user_id(
+ async def create_access_token_for_user_id(
self,
user_id: str,
device_id: Optional[str],
@@ -1828,13 +1828,6 @@ def load_single_legacy_password_auth_provider(
logger.error("Error while initializing %r: %s", module, e)
raise
- # The known hooks. If a module implements a method who's name appears in this set
- # we'll want to register it
- password_auth_provider_methods = {
- "check_3pid_auth",
- "on_logged_out",
- }
-
# All methods that the module provides should be async, but this wasn't enforced
# in the old module system, so we wrap them if needed
def async_wrapper(f: Optional[Callable]) -> Optional[Callable[..., Awaitable]]:
@@ -1919,11 +1912,14 @@ def load_single_legacy_password_auth_provider(
return run
- # populate hooks with the implemented methods, wrapped with async_wrapper
- hooks = {
- hook: async_wrapper(getattr(provider, hook, None))
- for hook in password_auth_provider_methods
- }
+ # If the module has these methods implemented, then we pull them out
+ # and register them as hooks.
+ check_3pid_auth_hook: Optional[CHECK_3PID_AUTH_CALLBACK] = async_wrapper(
+ getattr(provider, "check_3pid_auth", None)
+ )
+ on_logged_out_hook: Optional[ON_LOGGED_OUT_CALLBACK] = async_wrapper(
+ getattr(provider, "on_logged_out", None)
+ )
supported_login_types = {}
# call get_supported_login_types and add that to the dict
@@ -1950,7 +1946,11 @@ def load_single_legacy_password_auth_provider(
# need to use a tuple here for ("password",) not a list since lists aren't hashable
auth_checkers[(LoginType.PASSWORD, ("password",))] = check_password
- api.register_password_auth_provider_callbacks(hooks, auth_checkers=auth_checkers)
+ api.register_password_auth_provider_callbacks(
+ check_3pid_auth=check_3pid_auth_hook,
+ on_logged_out=on_logged_out_hook,
+ auth_checkers=auth_checkers,
+ )
CHECK_3PID_AUTH_CALLBACK = Callable[
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index b6a2a34ab7..b582266af9 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -89,6 +89,13 @@ class DeviceMessageHandler:
)
async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
+ """
+ Handle receiving to-device messages from remote homeservers.
+
+ Args:
+ origin: The remote homeserver.
+ content: The JSON dictionary containing the to-device messages.
+ """
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
@@ -135,12 +142,16 @@ class DeviceMessageHandler:
message_type, sender_user_id, by_device
)
- stream_id = await self.store.add_messages_from_remote_to_device_inbox(
+ # Add messages to the database.
+ # Retrieve the stream id of the last-processed to-device message.
+ last_stream_id = await self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)
+ # Notify listeners that there are new to-device messages to process,
+ # handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", stream_id, users=local_messages.keys()
+ "to_device_key", last_stream_id, users=local_messages.keys()
)
async def _check_for_unknown_devices(
@@ -195,6 +206,14 @@ class DeviceMessageHandler:
message_type: str,
messages: Dict[str, Dict[str, JsonDict]],
) -> None:
+ """
+ Handle a request from a user to send to-device message(s).
+
+ Args:
+ requester: The user that is sending the to-device messages.
+ message_type: The type of to-device messages that are being sent.
+ messages: A dictionary containing recipients mapped to messages intended for them.
+ """
sender_user_id = requester.user.to_string()
message_id = random_string(16)
@@ -257,12 +276,16 @@ class DeviceMessageHandler:
"org.matrix.opentracing_context": json_encoder.encode(context),
}
- stream_id = await self.store.add_messages_to_device_inbox(
+ # Add messages to the database.
+ # Retrieve the stream id of the last-processed to-device message.
+ last_stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
+ # Notify listeners that there are new to-device messages to process,
+ # handing them the latest stream id.
self.notifier.on_new_event(
- "to_device_key", stream_id, users=local_messages.keys()
+ "to_device_key", last_stream_id, users=local_messages.keys()
)
if self.federation_sender:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8ca5f60b1c..7ee5c47fd9 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -204,6 +204,10 @@ class DirectoryHandler:
)
room_id = await self._delete_association(room_alias)
+ if room_id is None:
+ # It's possible someone else deleted the association after the
+ # checks above, but before we did the deletion.
+ raise NotFoundError("Unknown room alias")
try:
await self._update_canonical_alias(requester, user_id, room_id, room_alias)
@@ -225,7 +229,7 @@ class DirectoryHandler:
)
await self._delete_association(room_alias)
- async def _delete_association(self, room_alias: RoomAlias) -> str:
+ async def _delete_association(self, room_alias: RoomAlias) -> Optional[str]:
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1f64534a8a..b4ff935546 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -124,7 +124,7 @@ class EventStreamHandler:
as_client_event=as_client_event,
# We don't bundle "live" events, as otherwise clients
# will end up double counting annotations.
- bundle_aggregations=False,
+ bundle_relations=False,
)
chunk = {
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 1a1cd93b1a..9917613298 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -981,8 +981,6 @@ class FederationEventHandler:
origin,
event,
context,
- state=state,
- backfilled=backfilled,
)
except AuthError as e:
# FIXME richvdh 2021/10/07 I don't think this is reachable. Let's log it
@@ -1332,8 +1330,6 @@ class FederationEventHandler:
origin: str,
event: EventBase,
context: EventContext,
- state: Optional[Iterable[EventBase]] = None,
- backfilled: bool = False,
) -> EventContext:
"""
Checks whether an event should be rejected (for failing auth checks).
@@ -1344,12 +1340,6 @@ class FederationEventHandler:
context:
The event context.
- state:
- The state events used to check the event for soft-fail. If this is
- not provided the current state events will be used.
-
- backfilled: True if the event was backfilled.
-
Returns:
The updated context object.
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 3dbe611f95..c83eaea359 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -464,15 +464,6 @@ class IdentityHandler:
if next_link:
params["next_link"] = next_link
- if self.hs.config.email.using_identity_server_from_trusted_list:
- # Warn that a deprecated config option is in use
- logger.warning(
- 'The config option "trust_identity_server_for_password_resets" '
- 'has been replaced by "account_threepid_delegate". '
- "Please consult the sample config at docs/sample_config.yaml for "
- "details and update your config file."
- )
-
try:
data = await self.http_client.post_json_get_json(
id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
@@ -517,15 +508,6 @@ class IdentityHandler:
if next_link:
params["next_link"] = next_link
- if self.hs.config.email.using_identity_server_from_trusted_list:
- # Warn that a deprecated config option is in use
- logger.warning(
- 'The config option "trust_identity_server_for_password_resets" '
- 'has been replaced by "account_threepid_delegate". '
- "Please consult the sample config at docs/sample_config.yaml for "
- "details and update your config file."
- )
-
try:
data = await self.http_client.post_json_get_json(
id_server + "/_matrix/identity/api/v1/validate/msisdn/requestToken",
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6a6c468cb7..67e557aeaf 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -252,7 +252,7 @@ class MessageHandler:
now,
# We don't bother bundling aggregations in when asked for state
# events, as clients won't use them.
- bundle_aggregations=False,
+ bundle_relations=False,
)
return events
@@ -1001,13 +1001,52 @@ class EventCreationHandler:
)
self.validator.validate_new(event, self.config)
+ await self._validate_event_relation(event)
+ logger.debug("Created event %s", event.event_id)
+
+ return event, context
+
+ async def _validate_event_relation(self, event: EventBase) -> None:
+ """
+ Ensure the relation data on a new event is not bogus.
+
+ Args:
+ event: The event being created.
+
+ Raises:
+ SynapseError if the event is invalid.
+ """
+
+ relation = event.content.get("m.relates_to")
+ if not relation:
+ return
+
+ relation_type = relation.get("rel_type")
+ if not relation_type:
+ return
+
+ # Ensure the parent is real.
+ relates_to = relation.get("event_id")
+ if not relates_to:
+ return
+
+ parent_event = await self.store.get_event(relates_to, allow_none=True)
+ if parent_event:
+ # And in the same room.
+ if parent_event.room_id != event.room_id:
+ raise SynapseError(400, "Relations must be in the same room")
+
+ else:
+ # There must be some reason that the client knows the event exists,
+ # see if there are existing relations. If so, assume everything is fine.
+ if not await self.store.event_is_target_of_relation(relates_to):
+ # Otherwise, the client can't know about the parent event!
+ raise SynapseError(400, "Can't send relation to unknown event")
# If this event is an annotation then we check that that the sender
# can't annotate the same way twice (e.g. stops users from liking an
# event multiple times).
- relation = event.content.get("m.relates_to", {})
- if relation.get("rel_type") == RelationTypes.ANNOTATION:
- relates_to = relation["event_id"]
+ if relation_type == RelationTypes.ANNOTATION:
aggregation_key = relation["key"]
already_exists = await self.store.has_user_annotated_event(
@@ -1016,9 +1055,12 @@ class EventCreationHandler:
if already_exists:
raise SynapseError(400, "Can't send same reaction twice")
- logger.debug("Created event %s", event.event_id)
-
- return event, context
+ # Don't attempt to start a thread if the parent event is a relation.
+ elif relation_type == RelationTypes.THREAD:
+ if await self.store.event_includes_relation(relates_to):
+ raise SynapseError(
+ 400, "Cannot start threads from an event with a relation"
+ )
@measure_func("handle_new_client_event")
async def handle_new_client_event(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index abfe7be0e3..cd64142735 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Dict, Optional, Set
+from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Set
import attr
@@ -22,7 +22,7 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
-from synapse.logging.context import run_in_background
+from synapse.handlers.room import ShutdownRoomResponse
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
@@ -56,11 +56,62 @@ class PurgeStatus:
STATUS_FAILED: "failed",
}
+ # Save the error message if an error occurs
+ error: str = ""
+
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
status: int = STATUS_ACTIVE
def asdict(self) -> JsonDict:
- return {"status": PurgeStatus.STATUS_TEXT[self.status]}
+ ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
+ if self.error:
+ ret["error"] = self.error
+ return ret
+
+
+@attr.s(slots=True, auto_attribs=True)
+class DeleteStatus:
+ """Object tracking the status of a delete room request
+
+ This class contains information on the progress of a delete room request, for
+ return by get_delete_status.
+ """
+
+ STATUS_PURGING = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+ STATUS_SHUTTING_DOWN = 3
+
+ STATUS_TEXT = {
+ STATUS_PURGING: "purging",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ STATUS_SHUTTING_DOWN: "shutting_down",
+ }
+
+ # Tracks whether this request has completed.
+ # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
+ status: int = STATUS_PURGING
+
+ # Save the error message if an error occurs
+ error: str = ""
+
+ # Saves the result of an action to give it back to REST API
+ shutdown_room: ShutdownRoomResponse = {
+ "kicked_users": [],
+ "failed_to_kick_users": [],
+ "local_aliases": [],
+ "new_room_id": None,
+ }
+
+ def asdict(self) -> JsonDict:
+ ret = {
+ "status": DeleteStatus.STATUS_TEXT[self.status],
+ "shutdown_room": self.shutdown_room,
+ }
+ if self.error:
+ ret["error"] = self.error
+ return ret
class PaginationHandler:
@@ -70,6 +121,9 @@ class PaginationHandler:
paginating during a purge.
"""
+ # when to remove a completed deletion/purge from the results map
+ CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
+
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -78,11 +132,18 @@ class PaginationHandler:
self.state_store = self.storage.state
self.clock = hs.get_clock()
self._server_name = hs.hostname
+ self._room_shutdown_handler = hs.get_room_shutdown_handler()
self.pagination_lock = ReadWriteLock()
+ # IDs of rooms in which there currently an active purge *or delete* operation.
self._purges_in_progress_by_room: Set[str] = set()
# map from purge id to PurgeStatus
self._purges_by_id: Dict[str, PurgeStatus] = {}
+ # map from purge id to DeleteStatus
+ self._delete_by_id: Dict[str, DeleteStatus] = {}
+ # map from room id to delete ids
+ # Dict[`room_id`, List[`delete_id`]]
+ self._delete_by_room: Dict[str, List[str]] = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = (
@@ -265,8 +326,13 @@ class PaginationHandler:
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
- run_in_background(
- self._purge_history, purge_id, room_id, token, delete_local_events
+ run_as_background_process(
+ "purge_history",
+ self._purge_history,
+ purge_id,
+ room_id,
+ token,
+ delete_local_events,
)
return purge_id
@@ -276,7 +342,7 @@ class PaginationHandler:
"""Carry out a history purge on a room.
Args:
- purge_id: The id for this purge
+ purge_id: The ID for this purge.
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
@@ -295,6 +361,7 @@ class PaginationHandler:
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) # type: ignore
)
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ self._purges_by_id[purge_id].error = f.getErrorMessage()
finally:
self._purges_in_progress_by_room.discard(room_id)
@@ -302,7 +369,9 @@ class PaginationHandler:
def clear_purge() -> None:
del self._purges_by_id[purge_id]
- self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+ self.hs.get_reactor().callLater(
+ PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
+ )
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
"""Get the current status of an active purge
@@ -312,8 +381,25 @@ class PaginationHandler:
"""
return self._purges_by_id.get(purge_id)
+ def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+ """Get the current status of an active deleting
+
+ Args:
+ delete_id: delete_id returned by start_shutdown_and_purge_room
+ """
+ return self._delete_by_id.get(delete_id)
+
+ def get_delete_ids_by_room(self, room_id: str) -> Optional[Collection[str]]:
+ """Get all active delete ids by room
+
+ Args:
+ room_id: room_id that is deleted
+ """
+ return self._delete_by_room.get(room_id)
+
async def purge_room(self, room_id: str, force: bool = False) -> None:
"""Purge the given room from the database.
+ This function is part the delete room v1 API.
Args:
room_id: room to be purged
@@ -424,7 +510,7 @@ class PaginationHandler:
if events:
if event_filter:
- events = event_filter.filter(events)
+ events = await event_filter.filter(events)
events = await filter_events_for_client(
self.storage, user_id, events, is_peeking=(member_event_id is None)
@@ -472,3 +558,192 @@ class PaginationHandler:
)
return chunk
+
+ async def _shutdown_and_purge_room(
+ self,
+ delete_id: str,
+ room_id: str,
+ requester_user_id: str,
+ new_room_user_id: Optional[str] = None,
+ new_room_name: Optional[str] = None,
+ message: Optional[str] = None,
+ block: bool = False,
+ purge: bool = True,
+ force_purge: bool = False,
+ ) -> None:
+ """
+ Shuts down and purges a room.
+
+ See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
+
+ Args:
+ delete_id: The ID for this delete.
+ room_id: The ID of the room to shut down.
+ requester_user_id:
+ User who requested the action. Will be recorded as putting the room on the
+ blocking list.
+ new_room_user_id:
+ If set, a new room will be created with this user ID
+ as the creator and admin, and all users in the old room will be
+ moved into that room. If not set, no new room will be created
+ and the users will just be removed from the old room.
+ new_room_name:
+ A string representing the name of the room that new users will
+ be invited to. Defaults to `Content Violation Notification`
+ message:
+ A string containing the first message that will be sent as
+ `new_room_user_id` in the new room. Ideally this will clearly
+ convey why the original room was shut down.
+ Defaults to `Sharing illegal content on this server is not
+ permitted and rooms in violation will be blocked.`
+ block:
+ If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Defaults to `false`.
+ purge:
+ If set to `true`, purge the given room from the database.
+ force_purge:
+ If set to `true`, the room will be purged from database
+ also if it fails to remove some users from room.
+
+ Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+ """
+
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with await self.pagination_lock.write(room_id):
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
+ self._delete_by_id[
+ delete_id
+ ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
+ room_id=room_id,
+ requester_user_id=requester_user_id,
+ new_room_user_id=new_room_user_id,
+ new_room_name=new_room_name,
+ message=message,
+ block=block,
+ )
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
+
+ if purge:
+ logger.info("starting purge room_id %s", room_id)
+
+ # first check that we have no users in this room
+ if not force_purge:
+ joined = await self.store.is_host_joined(
+ room_id, self._server_name
+ )
+ if joined:
+ raise SynapseError(
+ 400, "Users are still joined to this room"
+ )
+
+ await self.storage.purge_events.purge_room(room_id)
+
+ logger.info("complete")
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
+ except Exception:
+ f = Failure()
+ logger.error(
+ "failed",
+ exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
+ )
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
+ self._delete_by_id[delete_id].error = f.getErrorMessage()
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the delete from the list 24 hours after it completes
+ def clear_delete() -> None:
+ del self._delete_by_id[delete_id]
+ self._delete_by_room[room_id].remove(delete_id)
+ if not self._delete_by_room[room_id]:
+ del self._delete_by_room[room_id]
+
+ self.hs.get_reactor().callLater(
+ PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
+ )
+
+ def start_shutdown_and_purge_room(
+ self,
+ room_id: str,
+ requester_user_id: str,
+ new_room_user_id: Optional[str] = None,
+ new_room_name: Optional[str] = None,
+ message: Optional[str] = None,
+ block: bool = False,
+ purge: bool = True,
+ force_purge: bool = False,
+ ) -> str:
+ """Start off shut down and purge on a room.
+
+ Args:
+ room_id: The ID of the room to shut down.
+ requester_user_id:
+ User who requested the action and put the room on the
+ blocking list.
+ new_room_user_id:
+ If set, a new room will be created with this user ID
+ as the creator and admin, and all users in the old room will be
+ moved into that room. If not set, no new room will be created
+ and the users will just be removed from the old room.
+ new_room_name:
+ A string representing the name of the room that new users will
+ be invited to. Defaults to `Content Violation Notification`
+ message:
+ A string containing the first message that will be sent as
+ `new_room_user_id` in the new room. Ideally this will clearly
+ convey why the original room was shut down.
+ Defaults to `Sharing illegal content on this server is not
+ permitted and rooms in violation will be blocked.`
+ block:
+ If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Defaults to `false`.
+ purge:
+ If set to `true`, purge the given room from the database.
+ force_purge:
+ If set to `true`, the room will be purged from database
+ also if it fails to remove some users from room.
+
+ Returns:
+ unique ID for this delete transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400, "History purge already in progress for %s" % (room_id,)
+ )
+
+ # This check is double to `RoomShutdownHandler.shutdown_room`
+ # But here the requester get a direct response / error with HTTP request
+ # and do not have to check the purge status
+ if new_room_user_id is not None:
+ if not self.hs.is_mine_id(new_room_user_id):
+ raise SynapseError(
+ 400, "User must be our own: %s" % (new_room_user_id,)
+ )
+
+ delete_id = random_string(16)
+
+ # we log the delete_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info(
+ "starting shutdown room_id %s with delete_id %s",
+ room_id,
+ delete_id,
+ )
+
+ self._delete_by_id[delete_id] = DeleteStatus()
+ self._delete_by_room.setdefault(room_id, []).append(delete_id)
+ run_as_background_process(
+ "shutdown_and_purge_room",
+ self._shutdown_and_purge_room,
+ delete_id,
+ room_id,
+ requester_user_id,
+ new_room_user_id,
+ new_room_name,
+ message,
+ block,
+ purge,
+ force_purge,
+ )
+ return delete_id
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a0e6a01775..448a36108e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -116,7 +116,9 @@ class RegistrationHandler:
self.pusher_pool = hs.get_pusherpool()
self.session_lifetime = hs.config.registration.session_lifetime
- self.access_token_lifetime = hs.config.registration.access_token_lifetime
+ self.refreshable_access_token_lifetime = (
+ hs.config.registration.refreshable_access_token_lifetime
+ )
init_counters_for_auth_provider("")
@@ -813,13 +815,15 @@ class RegistrationHandler:
(
refresh_token,
refresh_token_id,
- ) = await self._auth_handler.get_refresh_token_for_user_id(
+ ) = await self._auth_handler.create_refresh_token_for_user_id(
user_id,
device_id=registered_device_id,
)
- valid_until_ms = self.clock.time_msec() + self.access_token_lifetime
+ valid_until_ms = (
+ self.clock.time_msec() + self.refreshable_access_token_lifetime
+ )
- access_token = await self._auth_handler.get_access_token_for_user_id(
+ access_token = await self._auth_handler.create_access_token_for_user_id(
user_id,
device_id=registered_device_id,
valid_until_ms=valid_until_ms,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 969eb3b9b0..88053f9869 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Contains functions for performing events on rooms."""
-
+"""Contains functions for performing actions on rooms."""
import itertools
import logging
import math
@@ -31,6 +30,8 @@ from typing import (
Tuple,
)
+from typing_extensions import TypedDict
+
from synapse.api.constants import (
EventContentFields,
EventTypes,
@@ -774,8 +775,11 @@ class RoomCreationHandler:
raise SynapseError(403, "Room visibility value not allowed.")
if is_public:
+ room_aliases = []
+ if room_alias:
+ room_aliases.append(room_alias.to_string())
if not self.config.roomdirectory.is_publishing_room_allowed(
- user_id, room_id, room_alias
+ user_id, room_id, room_aliases
):
# Let's just return a generic message, as there may be all sorts of
# reasons why we said no. TODO: Allow configurable error messages
@@ -1158,8 +1162,10 @@ class RoomContextHandler:
)
if event_filter:
- results["events_before"] = event_filter.filter(results["events_before"])
- results["events_after"] = event_filter.filter(results["events_after"])
+ results["events_before"] = await event_filter.filter(
+ results["events_before"]
+ )
+ results["events_after"] = await event_filter.filter(results["events_after"])
results["events_before"] = await filter_evts(results["events_before"])
results["events_after"] = await filter_evts(results["events_after"])
@@ -1195,7 +1201,7 @@ class RoomContextHandler:
state_events = list(state[last_event_id].values())
if event_filter:
- state_events = event_filter.filter(state_events)
+ state_events = await event_filter.filter(state_events)
results["state"] = await filter_evts(state_events)
@@ -1275,8 +1281,25 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_room_events_max_id(room_id)
-class RoomShutdownHandler:
+class ShutdownRoomResponse(TypedDict):
+ """
+ Attributes:
+ kicked_users: An array of users (`user_id`) that were kicked.
+ failed_to_kick_users:
+ An array of users (`user_id`) that that were not kicked.
+ local_aliases:
+ An array of strings representing the local aliases that were
+ migrated from the old room to the new.
+ new_room_id: A string representing the room ID of the new room.
+ """
+ kicked_users: List[str]
+ failed_to_kick_users: List[str]
+ local_aliases: List[str]
+ new_room_id: Optional[str]
+
+
+class RoomShutdownHandler:
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
" violation will be blocked."
@@ -1289,7 +1312,6 @@ class RoomShutdownHandler:
self._room_creation_handler = hs.get_room_creation_handler()
self._replication = hs.get_replication_data_handler()
self.event_creation_handler = hs.get_event_creation_handler()
- self.state = hs.get_state_handler()
self.store = hs.get_datastore()
async def shutdown_room(
@@ -1300,7 +1322,7 @@ class RoomShutdownHandler:
new_room_name: Optional[str] = None,
message: Optional[str] = None,
block: bool = False,
- ) -> dict:
+ ) -> ShutdownRoomResponse:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1334,8 +1356,13 @@ class RoomShutdownHandler:
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
+ If set to `True`, users will be prevented from joining the old
+ room. This option can also be used to pre-emptively block a room,
+ even if it's unknown to this homeserver. In this case, the room
+ will be blocked, and no further action will be taken. If `False`,
+ attempting to delete an unknown room is invalid.
+
+ Defaults to `False`.
Returns: a dict containing the following keys:
kicked_users: An array of users (`user_id`) that were kicked.
@@ -1344,7 +1371,9 @@ class RoomShutdownHandler:
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
- new_room_id: A string representing the room ID of the new room.
+ new_room_id:
+ A string representing the room ID of the new room, or None if
+ no such room was created.
"""
if not new_room_name:
@@ -1355,14 +1384,28 @@ class RoomShutdownHandler:
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
- if not await self.store.get_room(room_id):
- raise NotFoundError("Unknown room id %s" % (room_id,))
-
- # This will work even if the room is already blocked, but that is
- # desirable in case the first attempt at blocking the room failed below.
+ # Action the block first (even if the room doesn't exist yet)
if block:
+ # This will work even if the room is already blocked, but that is
+ # desirable in case the first attempt at blocking the room failed below.
await self.store.block_room(room_id, requester_user_id)
+ if not await self.store.get_room(room_id):
+ if block:
+ # We allow you to block an unknown room.
+ return {
+ "kicked_users": [],
+ "failed_to_kick_users": [],
+ "local_aliases": [],
+ "new_room_id": None,
+ }
+ else:
+ # But if you don't want to preventatively block another room,
+ # this function can't do anything useful.
+ raise NotFoundError(
+ "Cannot shut down room: unknown room id %s" % (room_id,)
+ )
+
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 0723286383..f880aa93d2 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -221,6 +221,7 @@ class RoomBatchHandler:
action=membership,
content=event_dict["content"],
outlier=True,
+ historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
@@ -240,6 +241,7 @@ class RoomBatchHandler:
),
event_dict,
outlier=True,
+ historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index fa3e9acc74..cac76d0221 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -269,6 +269,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content: Optional[dict] = None,
require_consent: bool = True,
outlier: bool = False,
+ historical: bool = False,
) -> Tuple[str, int]:
"""
Internal membership update function to get an existing event or create
@@ -294,6 +295,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
+ historical: Indicates whether the message is being inserted
+ back in time around some existing events. This is used to skip
+ a few checks and mark the event as backfilled.
Returns:
Tuple of event ID and stream ordering position
@@ -338,6 +342,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
auth_event_ids=auth_event_ids,
require_consent=require_consent,
outlier=outlier,
+ historical=historical,
)
prev_state_ids = await context.get_prev_state_ids()
@@ -434,6 +439,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
+ historical: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
@@ -455,6 +461,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
+ historical: Indicates whether the message is being inserted
+ back in time around some existing events. This is used to skip
+ a few checks and mark the event as backfilled.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
@@ -507,6 +516,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
+ historical=historical,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
)
@@ -527,6 +537,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
new_room: bool = False,
require_consent: bool = True,
outlier: bool = False,
+ historical: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
@@ -550,6 +561,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
+ historical: Indicates whether the message is being inserted
+ back in time around some existing events. This is used to skip
+ a few checks and mark the event as backfilled.
prev_event_ids: The event IDs to use as the prev events
auth_event_ids:
The event ids to use as the auth_events for the new event.
@@ -677,6 +691,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content=content,
require_consent=require_consent,
outlier=outlier,
+ historical=historical,
)
latest_event_ids = await self.store.get_prev_events_for_room(room_id)
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index fb26ee7ad7..8181cc0b52 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -97,7 +97,7 @@ class RoomSummaryHandler:
# If a user tries to fetch the same page multiple times in quick succession,
# only process the first attempt and return its result to subsequent requests.
self._pagination_response_cache: ResponseCache[
- Tuple[str, bool, Optional[int], Optional[int], Optional[str]]
+ Tuple[str, str, bool, Optional[int], Optional[int], Optional[str]]
] = ResponseCache(
hs.get_clock(),
"get_room_hierarchy",
@@ -282,7 +282,14 @@ class RoomSummaryHandler:
# This is due to the pagination process mutating internal state, attempting
# to process multiple requests for the same page will result in errors.
return await self._pagination_response_cache.wrap(
- (requested_room_id, suggested_only, max_depth, limit, from_token),
+ (
+ requester,
+ requested_room_id,
+ suggested_only,
+ max_depth,
+ limit,
+ from_token,
+ ),
self._get_room_hierarchy,
requester,
requested_room_id,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 6e4dff8056..ab7eaab2fb 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -180,7 +180,7 @@ class SearchHandler:
% (set(group_keys) - {"room_id", "sender"},),
)
- search_filter = Filter(filter_dict)
+ search_filter = Filter(self.hs, filter_dict)
# TODO: Search through left rooms too
rooms = await self.store.get_rooms_for_local_user_where_membership_is(
@@ -242,7 +242,7 @@ class SearchHandler:
rank_map.update({r["event"].event_id: r["rank"] for r in results})
- filtered_events = search_filter.filter([r["event"] for r in results])
+ filtered_events = await search_filter.filter([r["event"] for r in results])
events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events
@@ -292,7 +292,9 @@ class SearchHandler:
rank_map.update({r["event"].event_id: r["rank"] for r in results})
- filtered_events = search_filter.filter([r["event"] for r in results])
+ filtered_events = await search_filter.filter(
+ [r["event"] for r in results]
+ )
events = await filter_events_for_client(
self.storage, user.to_string(), filtered_events
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c7c6d63a9..891435c14d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -510,7 +510,7 @@ class SyncHandler:
log_kv({"limited": limited})
if potential_recents:
- recents = sync_config.filter_collection.filter_room_timeline(
+ recents = await sync_config.filter_collection.filter_room_timeline(
potential_recents
)
log_kv({"recents_after_sync_filtering": len(recents)})
@@ -575,8 +575,8 @@ class SyncHandler:
log_kv({"loaded_recents": len(events)})
- loaded_recents = sync_config.filter_collection.filter_room_timeline(
- events
+ loaded_recents = (
+ await sync_config.filter_collection.filter_room_timeline(events)
)
log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)})
@@ -1015,7 +1015,7 @@ class SyncHandler:
return {
(e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(
+ for e in await sync_config.filter_collection.filter_room_state(
list(state.values())
)
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
@@ -1383,7 +1383,7 @@ class SyncHandler:
sync_config.user
)
- account_data_for_user = sync_config.filter_collection.filter_account_data(
+ account_data_for_user = await sync_config.filter_collection.filter_account_data(
[
{"type": account_data_type, "content": content}
for account_data_type, content in account_data.items()
@@ -1448,7 +1448,7 @@ class SyncHandler:
# Deduplicate the presence entries so that there's at most one per user
presence = list({p.user_id: p for p in presence}.values())
- presence = sync_config.filter_collection.filter_presence(presence)
+ presence = await sync_config.filter_collection.filter_presence(presence)
sync_result_builder.presence = presence
@@ -2021,12 +2021,14 @@ class SyncHandler:
)
account_data_events = (
- sync_config.filter_collection.filter_room_account_data(
+ await sync_config.filter_collection.filter_room_account_data(
account_data_events
)
)
- ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+ ephemeral = await sync_config.filter_collection.filter_room_ephemeral(
+ ephemeral
+ )
if not (
always_include
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 22c6174821..1676ebd057 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -90,7 +90,7 @@ class FollowerTypingHandler:
self.wheel_timer = WheelTimer(bucket_size=5000)
@wrap_as_background_process("typing._handle_timeouts")
- def _handle_timeouts(self) -> None:
+ async def _handle_timeouts(self) -> None:
logger.debug("Checking for typing timeouts")
now = self.clock.time_msec()
|