diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/experimental.py | 20 | ||||
-rw-r--r-- | synapse/event_auth.py | 23 | ||||
-rw-r--r-- | synapse/events/snapshot.py | 1 | ||||
-rw-r--r-- | synapse/handlers/event_auth.py | 13 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room.py | 112 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 9 | ||||
-rw-r--r-- | synapse/http/client.py | 8 | ||||
-rw-r--r-- | synapse/http/federation/matrix_federation_agent.py | 2 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 34 | ||||
-rw-r--r-- | synapse/rest/admin/server_notice_servlet.py | 34 | ||||
-rw-r--r-- | synapse/rest/client/room.py | 174 | ||||
-rw-r--r-- | synapse/rest/client/sendtodevice.py | 25 | ||||
-rw-r--r-- | synapse/rest/client/transactions.py | 55 | ||||
-rw-r--r-- | synapse/server.py | 29 | ||||
-rw-r--r-- | synapse/storage/controllers/purge_events.py | 22 | ||||
-rw-r--r-- | synapse/storage/database.py | 21 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 11 | ||||
-rw-r--r-- | synapse/storage/databases/state/store.py | 2 |
19 files changed, 343 insertions, 254 deletions
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 489f2601ac..7e05f78f70 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -166,20 +166,9 @@ class ExperimentalConfig(Config): # MSC3391: Removing account data. self.msc3391_enabled = experimental.get("msc3391_enabled", False) - # MSC3873: Disambiguate event_match keys. - self.msc3873_escape_event_match_key = experimental.get( - "msc3873_escape_event_match_key", False - ) - - # MSC3966: exact_event_property_contains push rule condition. - self.msc3966_exact_event_property_contains = experimental.get( - "msc3966_exact_event_property_contains", False - ) - # MSC3952: Intentional mentions, this depends on MSC3966. - self.msc3952_intentional_mentions = ( - experimental.get("msc3952_intentional_mentions", False) - and self.msc3966_exact_event_property_contains + self.msc3952_intentional_mentions = experimental.get( + "msc3952_intentional_mentions", False ) # MSC3959: Do not generate notifications for edits. @@ -187,10 +176,5 @@ class ExperimentalConfig(Config): "msc3958_supress_edit_notifs", False ) - # MSC3966: exact_event_property_contains push rule condition. - self.msc3966_exact_event_property_contains = experimental.get( - "msc3966_exact_event_property_contains", False - ) - # MSC3967: Do not require UIA when first uploading cross signing keys self.msc3967_enabled = experimental.get("msc3967_enabled", False) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 4d6d1b8ebd..af55874b5c 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -168,13 +168,24 @@ async def check_state_independent_auth_rules( return # 2. Reject if event has auth_events that: ... - auth_events = await store.get_events( - event.auth_event_ids(), - redact_behaviour=EventRedactBehaviour.as_is, - allow_rejected=True, - ) if batched_auth_events: - auth_events.update(batched_auth_events) + # Copy the batched auth events to avoid mutating them. + auth_events = dict(batched_auth_events) + needed_auth_event_ids = set(event.auth_event_ids()) - batched_auth_events.keys() + if needed_auth_event_ids: + auth_events.update( + await store.get_events( + needed_auth_event_ids, + redact_behaviour=EventRedactBehaviour.as_is, + allow_rejected=True, + ) + ) + else: + auth_events = await store.get_events( + event.auth_event_ids(), + redact_behaviour=EventRedactBehaviour.as_is, + allow_rejected=True, + ) room_id = event.room_id auth_dict: MutableStateMap[str] = {} diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a91a5d1e3c..c04ad08cbb 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -293,6 +293,7 @@ class EventContext(UnpersistedEventContextBase): Maps a (type, state_key) to the event ID of the state event matching this tuple. """ + assert self.state_group_before_event is not None return await self._storage.state.get_state_ids_for_group( self.state_group_before_event, state_filter diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py index c508861b6a..0db0bd7304 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py @@ -63,9 +63,18 @@ class EventAuthHandler: self._store, event, batched_auth_events ) auth_event_ids = event.auth_event_ids() - auth_events_by_id = await self._store.get_events(auth_event_ids) + if batched_auth_events: - auth_events_by_id.update(batched_auth_events) + # Copy the batched auth events to avoid mutating them. + auth_events_by_id = dict(batched_auth_events) + needed_auth_event_ids = set(auth_event_ids) - set(batched_auth_events) + if needed_auth_event_ids: + auth_events_by_id.update( + await self._store.get_events(needed_auth_event_ids) + ) + else: + auth_events_by_id = await self._store.get_events(auth_event_ids) + check_state_dependent_auth_rules(event, auth_events_by_id.values()) def compute_auth_events( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 8c79c055ba..63b35c8d62 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -683,7 +683,7 @@ class PaginationHandler: await self._storage_controllers.purge_events.purge_room(room_id) - logger.info("complete") + logger.info("purge complete for room_id %s", room_id) self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE except Exception: f = Failure() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b1784638f4..be120cb12f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -569,7 +569,7 @@ class RoomCreationHandler: new_room_id, # we expect to override all the presets with initial_state, so this is # somewhat arbitrary. - preset_config=RoomCreationPreset.PRIVATE_CHAT, + room_config={"preset": RoomCreationPreset.PRIVATE_CHAT}, invite_list=[], initial_state=initial_state, creation_content=creation_content, @@ -904,13 +904,6 @@ class RoomCreationHandler: check_membership=False, ) - preset_config = config.get( - "preset", - RoomCreationPreset.PRIVATE_CHAT - if visibility == "private" - else RoomCreationPreset.PUBLIC_CHAT, - ) - raw_initial_state = config.get("initial_state", []) initial_state = OrderedDict() @@ -929,7 +922,7 @@ class RoomCreationHandler: ) = await self._send_events_for_new_room( requester, room_id, - preset_config=preset_config, + room_config=config, invite_list=invite_list, initial_state=initial_state, creation_content=creation_content, @@ -938,48 +931,6 @@ class RoomCreationHandler: creator_join_profile=creator_join_profile, ) - if "name" in config: - name = config["name"] - ( - name_event, - last_stream_id, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.Name, - "room_id": room_id, - "sender": user_id, - "state_key": "", - "content": {"name": name}, - }, - ratelimit=False, - prev_event_ids=[last_sent_event_id], - depth=depth, - ) - last_sent_event_id = name_event.event_id - depth += 1 - - if "topic" in config: - topic = config["topic"] - ( - topic_event, - last_stream_id, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.Topic, - "room_id": room_id, - "sender": user_id, - "state_key": "", - "content": {"topic": topic}, - }, - ratelimit=False, - prev_event_ids=[last_sent_event_id], - depth=depth, - ) - last_sent_event_id = topic_event.event_id - depth += 1 - # we avoid dropping the lock between invites, as otherwise joins can # start coming in and making the createRoom slow. # @@ -1047,7 +998,7 @@ class RoomCreationHandler: self, creator: Requester, room_id: str, - preset_config: str, + room_config: JsonDict, invite_list: List[str], initial_state: MutableStateMap, creation_content: JsonDict, @@ -1064,11 +1015,33 @@ class RoomCreationHandler: Rate limiting should already have been applied by this point. + Args: + creator: + the user requesting the room creation + room_id: + room id for the room being created + room_config: + A dict of configuration options. This will be the body of + a /createRoom request; see + https://spec.matrix.org/latest/client-server-api/#post_matrixclientv3createroom + invite_list: + a list of user ids to invite to the room + initial_state: + A list of state events to set in the new room. + creation_content: + Extra keys, such as m.federate, to be added to the content of the m.room.create event. + room_alias: + alias for the room + power_level_content_override: + The power level content to override in the default power level event. + creator_join_profile: + Set to override the displayname and avatar for the creating + user in this room. + Returns: A tuple containing the stream ID, event ID and depth of the last event sent to the room. """ - creator_id = creator.user.to_string() event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""} depth = 1 @@ -1079,9 +1052,6 @@ class RoomCreationHandler: # created (but not persisted to the db) to determine state for future created events # (as this info can't be pulled from the db) state_map: MutableStateMap[str] = {} - # current_state_group of last event created. Used for computing event context of - # events to be batched - current_state_group: Optional[int] = None def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict: e = {"type": etype, "content": content} @@ -1123,7 +1093,9 @@ class RoomCreationHandler: event_dict, prev_event_ids=prev_event, depth=depth, - state_map=state_map, + # Take a copy to ensure each event gets a unique copy of + # state_map since it is modified below. + state_map=dict(state_map), for_batch=for_batch, ) @@ -1133,6 +1105,14 @@ class RoomCreationHandler: return new_event, new_unpersisted_context + visibility = room_config.get("visibility", "private") + preset_config = room_config.get( + "preset", + RoomCreationPreset.PRIVATE_CHAT + if visibility == "private" + else RoomCreationPreset.PUBLIC_CHAT, + ) + try: config = self._presets_dict[preset_config] except KeyError: @@ -1284,6 +1264,24 @@ class RoomCreationHandler: ) events_to_send.append((encryption_event, encryption_context)) + if "name" in room_config: + name = room_config["name"] + name_event, name_context = await create_event( + EventTypes.Name, + {"name": name}, + True, + ) + events_to_send.append((name_event, name_context)) + + if "topic" in room_config: + topic = room_config["topic"] + topic_event, topic_context = await create_event( + EventTypes.Topic, + {"topic": topic}, + True, + ) + events_to_send.append((topic_event, topic_context)) + datastore = self.hs.get_datastores().state events_and_context = ( await UnpersistedEventContext.batch_persist_unpersisted_contexts( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index fd6d946c37..9f5b83ed54 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1226,6 +1226,10 @@ class SyncHandler: continue event_with_membership_auth = events_with_membership_auth[member] + is_create = ( + event_with_membership_auth.is_state() + and event_with_membership_auth.type == EventTypes.Create + ) is_join = ( event_with_membership_auth.is_state() and event_with_membership_auth.type == EventTypes.Member @@ -1233,9 +1237,10 @@ class SyncHandler: and event_with_membership_auth.content.get("membership") == Membership.JOIN ) - if not is_join: + if not is_create and not is_join: # The event must include the desired membership as an auth event, unless - # it's the first join event for a given user. + # it's the `m.room.create` event for a room or the first join event for + # a given user. missing_members.add(member) auth_event_ids.update(event_with_membership_auth.auth_event_ids()) diff --git a/synapse/http/client.py b/synapse/http/client.py index ae48e7c3f0..d777d59ccf 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -268,8 +268,8 @@ class BlacklistingAgentWrapper(Agent): def __init__( self, agent: IAgent, + ip_blacklist: IPSet, ip_whitelist: Optional[IPSet] = None, - ip_blacklist: Optional[IPSet] = None, ): """ Args: @@ -291,7 +291,9 @@ class BlacklistingAgentWrapper(Agent): h = urllib.parse.urlparse(uri.decode("ascii")) try: - ip_address = IPAddress(h.hostname) + # h.hostname is Optional[str], None raises an AddrFormatError, so + # this is safe even though IPAddress requires a str. + ip_address = IPAddress(h.hostname) # type: ignore[arg-type] except AddrFormatError: # Not an IP pass @@ -388,8 +390,8 @@ class SimpleHttpClient: # by the DNS resolution. self.agent = BlacklistingAgentWrapper( self.agent, - ip_whitelist=self._ip_whitelist, ip_blacklist=self._ip_blacklist, + ip_whitelist=self._ip_whitelist, ) async def request( diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 0359231e7d..8d7d0a3875 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -87,7 +87,7 @@ class MatrixFederationAgent: reactor: ISynapseReactor, tls_client_options_factory: Optional[FederationPolicyForHTTPS], user_agent: bytes, - ip_whitelist: IPSet, + ip_whitelist: Optional[IPSet], ip_blacklist: IPSet, _srv_resolver: Optional[SrvResolver] = None, _well_known_resolver: Optional[WellKnownResolver] = None, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index ba12b6d79a..199337673f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -273,10 +273,7 @@ class BulkPushRuleEvaluator: related_event_id, allow_none=True ) if related_event is not None: - related_events[relation_type] = _flatten_dict( - related_event, - msc3873_escape_event_match_key=self.hs.config.experimental.msc3873_escape_event_match_key, - ) + related_events[relation_type] = _flatten_dict(related_event) reply_event_id = ( event.content.get("m.relates_to", {}) @@ -291,10 +288,7 @@ class BulkPushRuleEvaluator: ) if related_event is not None: - related_events["m.in_reply_to"] = _flatten_dict( - related_event, - msc3873_escape_event_match_key=self.hs.config.experimental.msc3873_escape_event_match_key, - ) + related_events["m.in_reply_to"] = _flatten_dict(related_event) # indicate that this is from a fallback relation. if relation_type == "m.thread" and event.content.get( @@ -401,10 +395,7 @@ class BulkPushRuleEvaluator: ) evaluator = PushRuleEvaluator( - _flatten_dict( - event, - msc3873_escape_event_match_key=self.hs.config.experimental.msc3873_escape_event_match_key, - ), + _flatten_dict(event), has_mentions, room_member_count, sender_power_level, @@ -413,7 +404,6 @@ class BulkPushRuleEvaluator: self._related_event_match_enabled, event.room_version.msc3931_push_features, self.hs.config.experimental.msc1767_enabled, # MSC3931 flag - self.hs.config.experimental.msc3966_exact_event_property_contains, ) users = rules_by_user.keys() @@ -495,8 +485,6 @@ def _flatten_dict( d: Union[EventBase, Mapping[str, Any]], prefix: Optional[List[str]] = None, result: Optional[Dict[str, JsonValue]] = None, - *, - msc3873_escape_event_match_key: bool = False, ) -> Dict[str, JsonValue]: """ Given a JSON dictionary (or event) which might contain sub dictionaries, @@ -525,11 +513,10 @@ def _flatten_dict( if result is None: result = {} for key, value in d.items(): - if msc3873_escape_event_match_key: - # Escape periods in the key with a backslash (and backslashes with an - # extra backslash). This is since a period is used as a separator between - # nested fields. - key = key.replace("\\", "\\\\").replace(".", "\\.") + # Escape periods in the key with a backslash (and backslashes with an + # extra backslash). This is since a period is used as a separator between + # nested fields. + key = key.replace("\\", "\\\\").replace(".", "\\.") if _is_simple_value(value): result[".".join(prefix + [key])] = value @@ -537,12 +524,7 @@ def _flatten_dict( result[".".join(prefix + [key])] = [v for v in value if _is_simple_value(v)] elif isinstance(value, Mapping): # do not set `room_version` due to recursion considerations below - _flatten_dict( - value, - prefix=(prefix + [key]), - result=result, - msc3873_escape_event_match_key=msc3873_escape_event_match_key, - ) + _flatten_dict(value, prefix=(prefix + [key]), result=result) # `room_version` should only ever be set when looking at the top level of an event if ( diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index 15da9cd881..7dd1c10b91 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from http import HTTPStatus -from typing import TYPE_CHECKING, Awaitable, Optional, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from synapse.api.constants import EventTypes from synapse.api.errors import NotFoundError, SynapseError @@ -23,10 +23,10 @@ from synapse.http.servlet import ( parse_json_object_from_request, ) from synapse.http.site import SynapseRequest -from synapse.rest.admin import assert_requester_is_admin -from synapse.rest.admin._base import admin_patterns +from synapse.logging.opentracing import set_tag +from synapse.rest.admin._base import admin_patterns, assert_user_is_admin from synapse.rest.client.transactions import HttpTransactionCache -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, Requester, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -70,10 +70,13 @@ class SendServerNoticeServlet(RestServlet): self.__class__.__name__, ) - async def on_POST( - self, request: SynapseRequest, txn_id: Optional[str] = None + async def _do( + self, + request: SynapseRequest, + requester: Requester, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - await assert_requester_is_admin(self.auth, request) + await assert_user_is_admin(self.auth, requester) body = parse_json_object_from_request(request) assert_params_in_dict(body, ("user_id", "content")) event_type = body.get("type", EventTypes.Message) @@ -106,9 +109,18 @@ class SendServerNoticeServlet(RestServlet): return HTTPStatus.OK, {"event_id": event.event_id} - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + return await self._do(request, requester, None) + + async def on_PUT( self, request: SynapseRequest, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, txn_id + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + set_tag("txn_id", txn_id) + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester, txn_id ) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 61e4cf0213..129b6fe6b0 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -57,7 +57,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID +from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID from synapse.types.state import StateFilter from synapse.util import json_decoder from synapse.util.cancellation import cancellable @@ -151,15 +151,22 @@ class RoomCreateRestServlet(TransactionRestServlet): PATTERNS = "/createRoom" register_txn_path(self, PATTERNS, http_server) - def on_PUT( + async def on_PUT( self, request: SynapseRequest, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request(request, self.on_POST, request) + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester + ) async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) + return await self._do(request, requester) + async def _do( + self, request: SynapseRequest, requester: Requester + ) -> Tuple[int, JsonDict]: room_id, _, _ = await self._room_creation_handler.create_room( requester, self.get_room_config(request) ) @@ -172,9 +179,9 @@ class RoomCreateRestServlet(TransactionRestServlet): # TODO: Needs unit testing for generic events -class RoomStateEventRestServlet(TransactionRestServlet): +class RoomStateEventRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): - super().__init__(hs) + super().__init__() self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() self.message_handler = hs.get_message_handler() @@ -324,16 +331,16 @@ class RoomSendEventRestServlet(TransactionRestServlet): def register(self, http_server: HttpServer) -> None: # /rooms/$roomid/send/$event_type[/$txn_id] PATTERNS = "/rooms/(?P<room_id>[^/]*)/send/(?P<event_type>[^/]*)" - register_txn_path(self, PATTERNS, http_server, with_get=True) + register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_id: str, event_type: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) content = parse_json_object_from_request(request) event_dict: JsonDict = { @@ -362,18 +369,30 @@ class RoomSendEventRestServlet(TransactionRestServlet): set_tag("event_id", event_id) return 200, {"event_id": event_id} - def on_GET( - self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str - ) -> Tuple[int, str]: - return 200, "Not implemented" + async def on_POST( + self, + request: SynapseRequest, + room_id: str, + event_type: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + return await self._do(request, requester, room_id, event_type, None) - def on_PUT( + async def on_PUT( self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, event_type, txn_id + return await self.txns.fetch_or_execute_request( + request, + requester, + self._do, + request, + requester, + room_id, + event_type, + txn_id, ) @@ -389,14 +408,13 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet): PATTERNS = "/join/(?P<room_identifier>[^/]*)" register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_identifier: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - content = parse_json_object_from_request(request, allow_empty_body=True) # twisted.web.server.Request.args is incorrectly defined as Optional[Any] @@ -420,22 +438,31 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet): return 200, {"room_id": room_id} - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + room_identifier: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + return await self._do(request, requester, room_identifier, None) + + async def on_PUT( self, request: SynapseRequest, room_identifier: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_identifier, txn_id + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester, room_identifier, txn_id ) # TODO: Needs unit testing -class PublicRoomListRestServlet(TransactionRestServlet): +class PublicRoomListRestServlet(RestServlet): PATTERNS = client_patterns("/publicRooms$", v1=True) def __init__(self, hs: "HomeServer"): - super().__init__(hs) + super().__init__() self.hs = hs self.auth = hs.get_auth() @@ -907,22 +934,25 @@ class RoomForgetRestServlet(TransactionRestServlet): PATTERNS = "/rooms/(?P<room_id>[^/]*)/forget" register_txn_path(self, PATTERNS, http_server) - async def on_POST( - self, request: SynapseRequest, room_id: str, txn_id: Optional[str] = None - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=False) - + async def _do(self, requester: Requester, room_id: str) -> Tuple[int, JsonDict]: await self.room_member_handler.forget(user=requester.user, room_id=room_id) return 200, {} - def on_PUT( + async def on_POST( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=False) + return await self._do(requester, room_id) + + async def on_PUT( self, request: SynapseRequest, room_id: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=False) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, txn_id + return await self.txns.fetch_or_execute_request( + request, requester, self._do, requester, room_id ) @@ -941,15 +971,14 @@ class RoomMembershipRestServlet(TransactionRestServlet): ) register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_id: str, membership_action: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - if requester.is_guest and membership_action not in { Membership.JOIN, Membership.LEAVE, @@ -1014,13 +1043,30 @@ class RoomMembershipRestServlet(TransactionRestServlet): return 200, return_value - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + room_id: str, + membership_action: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + return await self._do(request, requester, room_id, membership_action, None) + + async def on_PUT( self, request: SynapseRequest, room_id: str, membership_action: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, membership_action, txn_id + return await self.txns.fetch_or_execute_request( + request, + requester, + self._do, + request, + requester, + room_id, + membership_action, + txn_id, ) @@ -1036,14 +1082,14 @@ class RoomRedactEventRestServlet(TransactionRestServlet): PATTERNS = "/rooms/(?P<room_id>[^/]*)/redact/(?P<event_id>[^/]*)" register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_id: str, event_id: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) try: @@ -1094,13 +1140,23 @@ class RoomRedactEventRestServlet(TransactionRestServlet): set_tag("event_id", event_id) return 200, {"event_id": event_id} - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + room_id: str, + event_id: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + return await self._do(request, requester, room_id, event_id, None) + + async def on_PUT( self, request: SynapseRequest, room_id: str, event_id: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, event_id, txn_id + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester, room_id, event_id, txn_id ) @@ -1224,7 +1280,6 @@ def register_txn_path( servlet: RestServlet, regex_string: str, http_server: HttpServer, - with_get: bool = False, ) -> None: """Registers a transaction-based path. @@ -1236,7 +1291,6 @@ def register_txn_path( regex_string: The regex string to register. Must NOT have a trailing $ as this string will be appended to. http_server: The http_server to register paths with. - with_get: True to also register respective GET paths for the PUTs. """ on_POST = getattr(servlet, "on_POST", None) on_PUT = getattr(servlet, "on_PUT", None) @@ -1254,18 +1308,6 @@ def register_txn_path( on_PUT, servlet.__class__.__name__, ) - on_GET = getattr(servlet, "on_GET", None) - if with_get: - if on_GET is None: - raise RuntimeError( - "register_txn_path called with with_get = True, but no on_GET method exists" - ) - http_server.register_paths( - "GET", - client_patterns(regex_string + "/(?P<txn_id>[^/]*)$", v1=True), - on_GET, - servlet.__class__.__name__, - ) class TimestampLookupRestServlet(RestServlet): diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py index 55d52f0b28..110af6df47 100644 --- a/synapse/rest/client/sendtodevice.py +++ b/synapse/rest/client/sendtodevice.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Awaitable, Tuple +from typing import TYPE_CHECKING, Tuple from synapse.http import servlet from synapse.http.server import HttpServer @@ -21,7 +21,7 @@ from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_r from synapse.http.site import SynapseRequest from synapse.logging.opentracing import set_tag from synapse.rest.client.transactions import HttpTransactionCache -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester from ._base import client_patterns @@ -43,19 +43,26 @@ class SendToDeviceRestServlet(servlet.RestServlet): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - def on_PUT( + async def on_PUT( self, request: SynapseRequest, message_type: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self._put, request, message_type, txn_id + return await self.txns.fetch_or_execute_request( + request, + requester, + self._put, + request, + requester, + message_type, ) async def _put( - self, request: SynapseRequest, message_type: str, txn_id: str + self, + request: SynapseRequest, + requester: Requester, + message_type: str, ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - content = parse_json_object_from_request(request) assert_params_in_dict(content, ("messages",)) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 3f40f1874a..f2aaab6227 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -15,16 +15,16 @@ """This module contains logic for storing HTTP PUT transactions. This is used to ensure idempotency when performing PUTs using the REST API.""" import logging -from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Tuple +from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Hashable, Tuple from typing_extensions import ParamSpec from twisted.internet.defer import Deferred from twisted.python.failure import Failure -from twisted.web.server import Request +from twisted.web.iweb import IRequest from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester from synapse.util.async_helpers import ObservableDeferred if TYPE_CHECKING: @@ -41,53 +41,47 @@ P = ParamSpec("P") class HttpTransactionCache: def __init__(self, hs: "HomeServer"): self.hs = hs - self.auth = self.hs.get_auth() self.clock = self.hs.get_clock() # $txn_key: (ObservableDeferred<(res_code, res_json_body)>, timestamp) self.transactions: Dict[ - str, Tuple[ObservableDeferred[Tuple[int, JsonDict]], int] + Hashable, Tuple[ObservableDeferred[Tuple[int, JsonDict]], int] ] = {} # Try to clean entries every 30 mins. This means entries will exist # for at *LEAST* 30 mins, and at *MOST* 60 mins. self.cleaner = self.clock.looping_call(self._cleanup, CLEANUP_PERIOD_MS) - def _get_transaction_key(self, request: Request) -> str: + def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable: """A helper function which returns a transaction key that can be used with TransactionCache for idempotent requests. Idempotency is based on the returned key being the same for separate requests to the same endpoint. The key is formed from the HTTP request - path and the access_token for the requesting user. + path and attributes from the requester: the access_token_id for regular users, + the user ID for guest users, and the appservice ID for appservice users. Args: - request: The incoming request. Must contain an access_token. + request: The incoming request. + requester: The requester doing the request. Returns: A transaction key """ assert request.path is not None - token = self.auth.get_access_token_from_request(request) - return request.path.decode("utf8") + "/" + token + path: str = request.path.decode("utf8") + if requester.is_guest: + assert requester.user is not None, "Guest requester must have a user ID set" + return (path, "guest", requester.user) + elif requester.app_service is not None: + return (path, "appservice", requester.app_service.id) + else: + assert ( + requester.access_token_id is not None + ), "Requester must have an access_token_id" + return (path, "user", requester.access_token_id) def fetch_or_execute_request( self, - request: Request, - fn: Callable[P, Awaitable[Tuple[int, JsonDict]]], - *args: P.args, - **kwargs: P.kwargs, - ) -> Awaitable[Tuple[int, JsonDict]]: - """A helper function for fetch_or_execute which extracts - a transaction key from the given request. - - See: - fetch_or_execute - """ - return self.fetch_or_execute( - self._get_transaction_key(request), fn, *args, **kwargs - ) - - def fetch_or_execute( - self, - txn_key: str, + request: IRequest, + requester: Requester, fn: Callable[P, Awaitable[Tuple[int, JsonDict]]], *args: P.args, **kwargs: P.kwargs, @@ -96,14 +90,15 @@ class HttpTransactionCache: to produce a response for this transaction. Args: - txn_key: A key to ensure idempotency should fetch_or_execute be - called again at a later point in time. + request: + requester: fn: A function which returns a tuple of (response_code, response_dict). *args: Arguments to pass to fn. **kwargs: Keyword arguments to pass to fn. Returns: Deferred which resolves to a tuple of (response_code, response_dict). """ + txn_key = self._get_transaction_key(request, requester) if txn_key in self.transactions: observable = self.transactions[txn_key][0] else: diff --git a/synapse/server.py b/synapse/server.py index df80fc1beb..8078463530 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -23,6 +23,8 @@ import functools import logging from typing import TYPE_CHECKING, Callable, Dict, List, Optional, TypeVar, cast +from typing_extensions import TypeAlias + from twisted.internet.interfaces import IOpenSSLContextFactory from twisted.internet.tcp import Port from twisted.web.iweb import IPolicyForHTTPS @@ -142,10 +144,31 @@ if TYPE_CHECKING: from synapse.handlers.saml import SamlHandler -T = TypeVar("T") +# The annotation for `cache_in_self` used to be +# def (builder: Callable[["HomeServer"],T]) -> Callable[["HomeServer"],T] +# which mypy was happy with. +# +# But PyCharm was confused by this. If `foo` was decorated by `@cache_in_self`, then +# an expression like `hs.foo()` +# +# - would erroneously warn that we hadn't provided a `hs` argument to foo (PyCharm +# confused about boundmethods and unbound methods?), and +# - would be considered to have type `Any`, making for a poor autocomplete and +# cross-referencing experience. +# +# Instead, use a typevar `F` to express that `@cache_in_self` returns exactly the +# same type it receives. This isn't strictly true [*], but it's more than good +# enough to keep PyCharm and mypy happy. +# +# [*]: (e.g. `builder` could be an object with a __call__ attribute rather than a +# types.FunctionType instance, whereas the return value is always a +# types.FunctionType instance.) + +T: TypeAlias = object +F = TypeVar("F", bound=Callable[["HomeServer"], T]) -def cache_in_self(builder: Callable[["HomeServer"], T]) -> Callable[["HomeServer"], T]: +def cache_in_self(builder: F) -> F: """Wraps a function called e.g. `get_foo`, checking if `self.foo` exists and returning if so. If not, calls the given function and sets `self.foo` to it. @@ -183,7 +206,7 @@ def cache_in_self(builder: Callable[["HomeServer"], T]) -> Callable[["HomeServer return dep - return _get + return cast(F, _get) class HomeServer(metaclass=abc.ABCMeta): diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 9ca50d6a09..c599397b86 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -16,6 +16,7 @@ import itertools import logging from typing import TYPE_CHECKING, Set +from synapse.logging.context import nested_logging_context from synapse.storage.databases import Databases if TYPE_CHECKING: @@ -33,8 +34,9 @@ class PurgeEventsStorageController: async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" - state_groups_to_delete = await self.stores.main.purge_room(room_id) - await self.stores.state.purge_room_state(room_id, state_groups_to_delete) + with nested_logging_context(room_id): + state_groups_to_delete = await self.stores.main.purge_room(room_id) + await self.stores.state.purge_room_state(room_id, state_groups_to_delete) async def purge_history( self, room_id: str, token: str, delete_local_events: bool @@ -51,15 +53,17 @@ class PurgeEventsStorageController: (instead of just marking them as outliers and deleting their state groups). """ - state_groups = await self.stores.main.purge_history( - room_id, token, delete_local_events - ) - - logger.info("[purge] finding state groups that can be deleted") + with nested_logging_context(room_id): + state_groups = await self.stores.main.purge_history( + room_id, token, delete_local_events + ) - sg_to_delete = await self._find_unreferenced_groups(state_groups) + logger.info("[purge] finding state groups that can be deleted") + sg_to_delete = await self._find_unreferenced_groups(state_groups) - await self.stores.state.purge_unreferenced_state_groups(room_id, sg_to_delete) + await self.stores.state.purge_unreferenced_state_groups( + room_id, sg_to_delete + ) async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]: """Used when purging history to figure out which state groups can be diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 5efe31aa19..fec4ae5b97 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -34,6 +34,7 @@ from typing import ( Tuple, Type, TypeVar, + Union, cast, overload, ) @@ -100,6 +101,15 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { } +class _PoolConnection(Connection): + """ + A Connection from twisted.enterprise.adbapi.Connection. + """ + + def reconnect(self) -> None: + ... + + def make_pool( reactor: IReactorCore, db_config: DatabaseConnectionConfig, @@ -856,7 +866,8 @@ class DatabasePool: try: with opentracing.start_active_span(f"db.{desc}"): result = await self.runWithConnection( - self.new_transaction, + # mypy seems to have an issue with this, maybe a bug? + self.new_transaction, # type: ignore[arg-type] desc, after_callbacks, async_after_callbacks, @@ -892,7 +903,7 @@ class DatabasePool: async def runWithConnection( self, - func: Callable[..., R], + func: Callable[Concatenate[LoggingDatabaseConnection, P], R], *args: Any, db_autocommit: bool = False, isolation_level: Optional[int] = None, @@ -926,7 +937,7 @@ class DatabasePool: start_time = monotonic_time() - def inner_func(conn, *args, **kwargs): + def inner_func(conn: _PoolConnection, *args: P.args, **kwargs: P.kwargs) -> R: # We shouldn't be in a transaction. If we are then something # somewhere hasn't committed after doing work. (This is likely only # possible during startup, as `run*` will ensure changes are @@ -1019,7 +1030,7 @@ class DatabasePool: decoder: Optional[Callable[[Cursor], R]], query: str, *args: Any, - ) -> R: + ) -> Union[List[Tuple[Any, ...]], R]: """Runs a single query for a result set. Args: @@ -1032,7 +1043,7 @@ class DatabasePool: The result of decoder(results) """ - def interaction(txn): + def interaction(txn: LoggingTransaction) -> Union[List[Tuple[Any, ...]], R]: txn.execute(query, args) if decoder: return decoder(txn) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 9c41d01e13..7a7c0d9c75 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -325,6 +325,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # We then run the same purge a second time without this isolation level to # purge any of those rows which were added during the first. + logger.info("[purge] Starting initial main purge of [1/2]") state_groups_to_delete = await self.db_pool.runInteraction( "purge_room", self._purge_room_txn, @@ -332,6 +333,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): isolation_level=IsolationLevel.READ_COMMITTED, ) + logger.info("[purge] Starting secondary main purge of [2/2]") state_groups_to_delete.extend( await self.db_pool.runInteraction( "purge_room", @@ -339,6 +341,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): room_id=room_id, ), ) + logger.info("[purge] Done with main purge") return state_groups_to_delete @@ -376,7 +379,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): ) referenced_chain_id_tuples = list(txn) - logger.info("[purge] removing events from event_auth_chain_links") + logger.info("[purge] removing from event_auth_chain_links") txn.executemany( """ DELETE FROM event_auth_chain_links WHERE @@ -399,7 +402,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "rejections", "state_events", ): - logger.info("[purge] removing %s from %s", room_id, table) + logger.info("[purge] removing from %s", table) txn.execute( """ @@ -454,7 +457,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # happy "rooms", ): - logger.info("[purge] removing %s from %s", room_id, table) + logger.info("[purge] removing from %s", table) txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,)) # Other tables we do NOT need to clear out: @@ -486,6 +489,4 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # that already exist. self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,)) - logger.info("[purge] done") - return state_groups diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index bf4cdfdf29..29ff64e876 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -805,12 +805,14 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): state_groups_to_delete: State groups to delete """ + logger.info("[purge] Starting state purge") await self.db_pool.runInteraction( "purge_room_state", self._purge_room_state_txn, room_id, state_groups_to_delete, ) + logger.info("[purge] Done with state purge") def _purge_room_state_txn( self, |