summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/9639.bugfix1
-rw-r--r--changelog.d/9653.feature1
-rw-r--r--changelog.d/9674.misc1
-rw-r--r--changelog.d/9675.misc1
-rw-r--r--changelog.d/9676.misc1
-rw-r--r--changelog.d/9678.misc1
-rw-r--r--changelog.d/9679.doc1
-rw-r--r--docs/sample_config.yaml4
-rw-r--r--docs/workers.md3
-rw-r--r--mypy.ini5
-rwxr-xr-xsetup.py2
-rw-r--r--synapse/config/ratelimiting.py8
-rw-r--r--synapse/events/third_party_rules.py15
-rw-r--r--synapse/federation/federation_client.py180
-rw-r--r--synapse/federation/transport/client.py32
-rw-r--r--synapse/handlers/space_summary.py135
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/rest/client/v2_alpha/sync.py12
-rw-r--r--synapse/secrets.py8
-rw-r--r--synapse/server.py4
-rw-r--r--synapse/server_notices/consent_server_notices.py18
-rw-r--r--synapse/server_notices/resource_limits_server_notices.py11
-rw-r--r--synapse/server_notices/server_notices_manager.py2
-rw-r--r--synapse/server_notices/server_notices_sender.py18
-rw-r--r--synapse/server_notices/worker_server_notices_sender.py11
-rw-r--r--synapse/storage/databases/main/deviceinbox.py6
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py4
-rw-r--r--synapse/storage/databases/main/transactions.py45
-rw-r--r--synapse/storage/state.py4
-rw-r--r--synapse/visibility.py78
30 files changed, 452 insertions, 166 deletions
diff --git a/changelog.d/9639.bugfix b/changelog.d/9639.bugfix
new file mode 100644

index 0000000000..51b3746707 --- /dev/null +++ b/changelog.d/9639.bugfix
@@ -0,0 +1 @@ +Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind. diff --git a/changelog.d/9653.feature b/changelog.d/9653.feature new file mode 100644
index 0000000000..2f7ccedcfb --- /dev/null +++ b/changelog.d/9653.feature
@@ -0,0 +1 @@ +Add initial experimental support for a "space summary" API. diff --git a/changelog.d/9674.misc b/changelog.d/9674.misc new file mode 100644
index 0000000000..c82fde61b2 --- /dev/null +++ b/changelog.d/9674.misc
@@ -0,0 +1 @@ +Increase default join ratelimiting burst rate. diff --git a/changelog.d/9675.misc b/changelog.d/9675.misc new file mode 100644
index 0000000000..35338cd332 --- /dev/null +++ b/changelog.d/9675.misc
@@ -0,0 +1 @@ +Add additional type hints to the Homeserver object. diff --git a/changelog.d/9676.misc b/changelog.d/9676.misc new file mode 100644
index 0000000000..829e38b938 --- /dev/null +++ b/changelog.d/9676.misc
@@ -0,0 +1 @@ +Add type hints to third party event rules and visibility modules. diff --git a/changelog.d/9678.misc b/changelog.d/9678.misc new file mode 100644
index 0000000000..77a2b2d439 --- /dev/null +++ b/changelog.d/9678.misc
@@ -0,0 +1 @@ +Bump mypy-zope to 0.2.13 to fix "Cannot determine consistent method resolution order (MRO)" errors when running mypy a second time. diff --git a/changelog.d/9679.doc b/changelog.d/9679.doc new file mode 100644
index 0000000000..34f87490d6 --- /dev/null +++ b/changelog.d/9679.doc
@@ -0,0 +1 @@ +Improve worker documentation for fallback/web auth endpoints. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 7ce7764d43..3ca4025d9e 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml
@@ -943,10 +943,10 @@ log_config: "CONFDIR/SERVERNAME.log.config" #rc_joins: # local: # per_second: 0.1 -# burst_count: 3 +# burst_count: 10 # remote: # per_second: 0.01 -# burst_count: 3 +# burst_count: 10 # #rc_3pid_validation: # per_second: 0.003 diff --git a/docs/workers.md b/docs/workers.md
index e7bf9b8ce4..c6282165b0 100644 --- a/docs/workers.md +++ b/docs/workers.md
@@ -232,7 +232,6 @@ expressions: # Registration/login requests ^/_matrix/client/(api/v1|r0|unstable)/login$ ^/_matrix/client/(r0|unstable)/register$ - ^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$ # Event sending requests ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact @@ -276,7 +275,7 @@ using): Ensure that all SSO logins go to a single process. For multiple workers not handling the SSO endpoints properly, see -[#7530](https://github.com/matrix-org/synapse/issues/7530) and +[#7530](https://github.com/matrix-org/synapse/issues/7530) and [#9427](https://github.com/matrix-org/synapse/issues/9427). Note that a HTTP listener with `client` and `federation` resources must be diff --git a/mypy.ini b/mypy.ini
index e0685e097c..709a8d07a5 100644 --- a/mypy.ini +++ b/mypy.ini
@@ -20,8 +20,9 @@ files = synapse/crypto, synapse/event_auth.py, synapse/events/builder.py, - synapse/events/validator.py, synapse/events/spamcheck.py, + synapse/events/third_party_rules.py, + synapse/events/validator.py, synapse/federation, synapse/groups, synapse/handlers, @@ -38,6 +39,7 @@ files = synapse/push, synapse/replication, synapse/rest, + synapse/secrets.py, synapse/server.py, synapse/server_notices, synapse/spam_checker_api, @@ -71,6 +73,7 @@ files = synapse/util/metrics.py, synapse/util/macaroons.py, synapse/util/stringutils.py, + synapse/visibility.py, tests/replication, tests/test_utils, tests/handlers/test_password_providers.py, diff --git a/setup.py b/setup.py
index b834e4e55b..1939a7b86b 100755 --- a/setup.py +++ b/setup.py
@@ -103,7 +103,7 @@ CONDITIONAL_REQUIREMENTS["lint"] = [ "flake8", ] -CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.812", "mypy-zope==0.2.11"] +CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.812", "mypy-zope==0.2.13"] # Dependencies which are exclusively required by unit test code. This is # NOT a list of all modules that are necessary to run the unit tests. diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 944051422e..19322372a9 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py
@@ -98,11 +98,11 @@ class RatelimitConfig(Config): self.rc_joins_local = RateLimitConfig( config.get("rc_joins", {}).get("local", {}), - defaults={"per_second": 0.1, "burst_count": 3}, + defaults={"per_second": 0.1, "burst_count": 10}, ) self.rc_joins_remote = RateLimitConfig( config.get("rc_joins", {}).get("remote", {}), - defaults={"per_second": 0.01, "burst_count": 3}, + defaults={"per_second": 0.01, "burst_count": 10}, ) # Ratelimit cross-user key requests: @@ -196,10 +196,10 @@ class RatelimitConfig(Config): #rc_joins: # local: # per_second: 0.1 - # burst_count: 3 + # burst_count: 10 # remote: # per_second: 0.01 - # burst_count: 3 + # burst_count: 10 # #rc_3pid_validation: # per_second: 0.003 diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py
index 02bce8b5c9..9767d23940 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py
@@ -13,12 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Union +from typing import TYPE_CHECKING, Union from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.types import Requester, StateMap +if TYPE_CHECKING: + from synapse.server import HomeServer + class ThirdPartyEventRules: """Allows server admins to provide a Python module implementing an extra @@ -28,7 +31,7 @@ class ThirdPartyEventRules: behaviours. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.third_party_rules = None self.store = hs.get_datastore() @@ -95,10 +98,9 @@ class ThirdPartyEventRules: if self.third_party_rules is None: return True - ret = await self.third_party_rules.on_create_room( + return await self.third_party_rules.on_create_room( requester, config, is_requester_admin ) - return ret async def check_threepid_can_be_invited( self, medium: str, address: str, room_id: str @@ -119,10 +121,9 @@ class ThirdPartyEventRules: state_events = await self._get_state_map_for_room(room_id) - ret = await self.third_party_rules.check_threepid_can_be_invited( + return await self.third_party_rules.check_threepid_can_be_invited( medium, address, state_events ) - return ret async def check_visibility_can_be_modified( self, room_id: str, new_visibility: str @@ -143,7 +144,7 @@ class ThirdPartyEventRules: check_func = getattr( self.third_party_rules, "check_visibility_can_be_modified", None ) - if not check_func or not isinstance(check_func, Callable): + if not check_func or not callable(check_func): return True state_events = await self._get_state_map_for_room(room_id) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3c864d216c..184096d165 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -29,11 +29,13 @@ from typing import ( List, Mapping, Optional, + Sequence, Tuple, TypeVar, Union, ) +import attr from prometheus_client import Counter from twisted.internet import defer @@ -457,6 +459,7 @@ class FederationClient(FederationBase): description: str, destinations: Iterable[str], callback: Callable[[str], Awaitable[T]], + failover_on_unknown_endpoint: bool = False, ) -> T: """Try an operation on a series of servers, until it succeeds @@ -476,6 +479,10 @@ class FederationClient(FederationBase): next server tried. Normally the stacktrace is logged but this is suppressed if the exception is an InvalidResponseError. + failover_on_unknown_endpoint: if True, we will try other servers if it looks + like a server doesn't support the endpoint. This is typically useful + if the endpoint in question is new or experimental. + Returns: The result of callback, if it succeeds @@ -495,16 +502,31 @@ class FederationClient(FederationBase): except UnsupportedRoomVersionError: raise except HttpResponseException as e: - if not 500 <= e.code < 600: - raise e.to_synapse_error() - else: - logger.warning( - "Failed to %s via %s: %i %s", - description, - destination, - e.code, - e.args[0], - ) + synapse_error = e.to_synapse_error() + failover = False + + if 500 <= e.code < 600: + failover = True + + elif failover_on_unknown_endpoint: + # there is no good way to detect an "unknown" endpoint. Dendrite + # returns a 404 (with no body); synapse returns a 400 + # with M_UNRECOGNISED. + if e.code == 404 or ( + e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED + ): + failover = True + + if not failover: + raise synapse_error from e + + logger.warning( + "Failed to %s via %s: %i %s", + description, + destination, + e.code, + e.args[0], + ) except Exception: logger.warning( "Failed to %s via %s", description, destination, exc_info=True @@ -1100,3 +1122,141 @@ class FederationClient(FederationBase): # If we don't manage to find it, return None. It's not an error if a # server doesn't give it to us. return None + + async def get_space_summary( + self, + destinations: Iterable[str], + room_id: str, + suggested_only: bool, + max_rooms_per_space: Optional[int], + exclude_rooms: List[str], + ) -> "FederationSpaceSummaryResult": + """ + Call other servers to get a summary of the given space + + + Args: + destinations: The remote servers. We will try them in turn, omitting any + that have been blacklisted. + + room_id: ID of the space to be queried + + suggested_only: If true, ask the remote server to only return children + with the "suggested" flag set + + max_rooms_per_space: A limit on the number of children to return for each + space + + exclude_rooms: A list of room IDs to tell the remote server to skip + + Returns: + a parsed FederationSpaceSummaryResult + + Raises: + SynapseError if we were unable to get a valid summary from any of the + remote servers + """ + + async def send_request(destination: str) -> FederationSpaceSummaryResult: + res = await self.transport_layer.get_space_summary( + destination=destination, + room_id=room_id, + suggested_only=suggested_only, + max_rooms_per_space=max_rooms_per_space, + exclude_rooms=exclude_rooms, + ) + + try: + return FederationSpaceSummaryResult.from_json_dict(res) + except ValueError as e: + raise InvalidResponseError(str(e)) + + return await self._try_destination_list( + "fetch space summary", + destinations, + send_request, + failover_on_unknown_endpoint=True, + ) + + +@attr.s(frozen=True, slots=True) +class FederationSpaceSummaryEventResult: + """Represents a single event in the result of a successful get_space_summary call. + + It's essentially just a serialised event object, but we do a bit of parsing and + validation in `from_json_dict` and store some of the validated properties in + object attributes. + """ + + event_type = attr.ib(type=str) + state_key = attr.ib(type=str) + via = attr.ib(type=Sequence[str]) + + # the raw data, including the above keys + data = attr.ib(type=JsonDict) + + @classmethod + def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult": + """Parse an event within the result of a /spaces/ request + + Args: + d: json object to be parsed + + Raises: + ValueError if d is not a valid event + """ + + event_type = d.get("type") + if not isinstance(event_type, str): + raise ValueError("Invalid event: 'event_type' must be a str") + + state_key = d.get("state_key") + if not isinstance(state_key, str): + raise ValueError("Invalid event: 'state_key' must be a str") + + content = d.get("content") + if not isinstance(content, dict): + raise ValueError("Invalid event: 'content' must be a dict") + + via = content.get("via") + if not isinstance(via, Sequence): + raise ValueError("Invalid event: 'via' must be a list") + if any(not isinstance(v, str) for v in via): + raise ValueError("Invalid event: 'via' must be a list of strings") + + return cls(event_type, state_key, via, d) + + +@attr.s(frozen=True, slots=True) +class FederationSpaceSummaryResult: + """Represents the data returned by a successful get_space_summary call.""" + + rooms = attr.ib(type=Sequence[JsonDict]) + events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult]) + + @classmethod + def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult": + """Parse the result of a /spaces/ request + + Args: + d: json object to be parsed + + Raises: + ValueError if d is not a valid /spaces/ response + """ + rooms = d.get("rooms") + if not isinstance(rooms, Sequence): + raise ValueError("'rooms' must be a list") + if any(not isinstance(r, dict) for r in rooms): + raise ValueError("Invalid room in 'rooms' list") + + events = d.get("events") + if not isinstance(events, Sequence): + raise ValueError("'events' must be a list") + if any(not isinstance(e, dict) for e in events): + raise ValueError("Invalid event in 'events' list") + parsed_events = [ + FederationSpaceSummaryEventResult.from_json_dict(e) for e in events + ] + + return cls(rooms, parsed_events) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 3f2b2bf7d5..df7e9fbbc2 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -1031,6 +1031,38 @@ class TransportLayerClient: return self.client.get_json(destination=destination, path=path) + async def get_space_summary( + self, + destination: str, + room_id: str, + suggested_only: bool, + max_rooms_per_space: Optional[int], + exclude_rooms: List[str], + ) -> JsonDict: + """ + Args: + destination: The remote server + room_id: The room ID to ask about. + suggested_only: if True, only suggested rooms will be returned + max_rooms_per_space: an optional limit to the number of children to be + returned per space + exclude_rooms: a list of any rooms we can skip + """ + path = _create_path( + FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id + ) + + params = { + "suggested_only": suggested_only, + "exclude_rooms": exclude_rooms, + } + if max_rooms_per_space is not None: + params["max_rooms_per_space"] = max_rooms_per_space + + return await self.client.post_json( + destination=destination, path=path, data=params + ) + def get_info_of_users(self, destination: str, user_ids: List[str]): """ Args: diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index f5ead9447f..5d9418969d 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py
@@ -16,7 +16,7 @@ import itertools import logging from collections import deque -from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple +from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast import attr @@ -38,6 +38,9 @@ MAX_ROOMS = 50 # max number of events to return per room. MAX_ROOMS_PER_SPACE = 50 +# max number of federation servers to hit per room +MAX_SERVERS_PER_SPACE = 3 + class SpaceSummaryHandler: def __init__(self, hs: "HomeServer"): @@ -47,6 +50,8 @@ class SpaceSummaryHandler: self._state_handler = hs.get_state_handler() self._store = hs.get_datastore() self._event_serializer = hs.get_event_client_serializer() + self._server_name = hs.hostname + self._federation_client = hs.get_federation_client() async def get_space_summary( self, @@ -78,35 +83,81 @@ class SpaceSummaryHandler: await self._auth.check_user_in_room_or_world_readable(room_id, requester) # the queue of rooms to process - room_queue = deque((_RoomQueueEntry(room_id),)) + room_queue = deque((_RoomQueueEntry(room_id, ()),)) + # rooms we have already processed processed_rooms = set() # type: Set[str] + # events we have already processed. We don't necessarily have their event ids, + # so instead we key on (room id, state key) + processed_events = set() # type: Set[Tuple[str, str]] + rooms_result = [] # type: List[JsonDict] events_result = [] # type: List[JsonDict] while room_queue and len(rooms_result) < MAX_ROOMS: queue_entry = room_queue.popleft() room_id = queue_entry.room_id + if room_id in processed_rooms: + # already done this room + continue + logger.debug("Processing room %s", room_id) - processed_rooms.add(room_id) + + is_in_room = await self._store.is_host_joined(room_id, self._server_name) # The client-specified max_rooms_per_space limit doesn't apply to the # room_id specified in the request, so we ignore it if this is the # first room we are processing. max_children = max_rooms_per_space if processed_rooms else None - rooms, events = await self._summarize_local_room( - requester, room_id, suggested_only, max_children + if is_in_room: + rooms, events = await self._summarize_local_room( + requester, room_id, suggested_only, max_children + ) + else: + rooms, events = await self._summarize_remote_room( + queue_entry, + suggested_only, + max_children, + exclude_rooms=processed_rooms, + ) + + logger.debug( + "Query of %s returned rooms %s, events %s", + queue_entry.room_id, + [room.get("room_id") for room in rooms], + ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events], ) rooms_result.extend(rooms) - events_result.extend(events) - # add any children that we haven't already processed to the queue - for edge_event in events: - if edge_event["state_key"] not in processed_rooms: - room_queue.append(_RoomQueueEntry(edge_event["state_key"])) + # any rooms returned don't need visiting again + processed_rooms.update(cast(str, room.get("room_id")) for room in rooms) + + # the room we queried may or may not have been returned, but don't process + # it again, anyway. + processed_rooms.add(room_id) + + # XXX: is it ok that we blindly iterate through any events returned by + # a remote server, whether or not they actually link to any rooms in our + # tree? + for ev in events: + # remote servers might return events we have already processed + # (eg, Dendrite returns inward pointers as well as outward ones), so + # we need to filter them out, to avoid returning duplicate links to the + # client. + ev_key = (ev["room_id"], ev["state_key"]) + if ev_key in processed_events: + continue + events_result.append(ev) + + # add the child to the queue. we have already validated + # that the vias are a list of server names. + room_queue.append( + _RoomQueueEntry(ev["state_key"], ev["content"]["via"]) + ) + processed_events.add(ev_key) return {"rooms": rooms_result, "events": events_result} @@ -149,20 +200,23 @@ class SpaceSummaryHandler: while room_queue and len(rooms_result) < MAX_ROOMS: room_id = room_queue.popleft() + if room_id in processed_rooms: + # already done this room + continue + logger.debug("Processing room %s", room_id) - processed_rooms.add(room_id) rooms, events = await self._summarize_local_room( None, room_id, suggested_only, max_rooms_per_space ) + processed_rooms.add(room_id) + rooms_result.extend(rooms) events_result.extend(events) - # add any children that we haven't already processed to the queue - for edge_event in events: - if edge_event["state_key"] not in processed_rooms: - room_queue.append(edge_event["state_key"]) + # add any children to the queue + room_queue.extend(edge_event["state_key"] for edge_event in events) return {"rooms": rooms_result, "events": events_result} @@ -200,6 +254,43 @@ class SpaceSummaryHandler: ) return (room_entry,), events_result + async def _summarize_remote_room( + self, + room: "_RoomQueueEntry", + suggested_only: bool, + max_children: Optional[int], + exclude_rooms: Iterable[str], + ) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]: + room_id = room.room_id + logger.info("Requesting summary for %s via %s", room_id, room.via) + + # we need to make the exclusion list json-serialisable + exclude_rooms = list(exclude_rooms) + + via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE) + try: + res = await self._federation_client.get_space_summary( + via, + room_id, + suggested_only=suggested_only, + max_rooms_per_space=max_children, + exclude_rooms=exclude_rooms, + ) + except Exception as e: + logger.warning( + "Unable to get summary of %s via federation: %s", + room_id, + e, + exc_info=logger.isEnabledFor(logging.DEBUG), + ) + return (), () + + return res.rooms, tuple( + ev.data + for ev in res.events + if ev.event_type == EventTypes.MSC1772_SPACE_CHILD + ) + async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool: # if we have an authenticated requesting user, first check if they are in the # room @@ -276,12 +367,24 @@ class SpaceSummaryHandler: ) # filter out any events without a "via" (which implies it has been redacted) - return (e for e in events if e.content.get("via")) + return (e for e in events if _has_valid_via(e)) @attr.s(frozen=True, slots=True) class _RoomQueueEntry: room_id = attr.ib(type=str) + via = attr.ib(type=Sequence[str]) + + +def _has_valid_via(e: EventBase) -> bool: + via = e.content.get("via") + if not via or not isinstance(via, Sequence): + return False + for v in via: + if not isinstance(v, str): + logger.debug("Ignoring edge event %s with invalid via entry", e.event_id) + return False + return True def _is_suggested_child_event(edge_event: EventBase) -> bool: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5d700a48e9..17277619ad 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -80,7 +80,7 @@ class SyncConfig: filter_collection = attr.ib(type=FilterCollection) is_guest = attr.ib(type=bool) request_key = attr.ib(type=Tuple[Any, ...]) - device_id = attr.ib(type=str) + device_id = attr.ib(type=Optional[str]) @attr.s(slots=True, frozen=True) @@ -737,7 +737,9 @@ class SyncHandler: return summary - def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache: + def get_lazy_loaded_members_cache( + self, cache_key: Tuple[str, Optional[str]] + ) -> LruCache: cache = self.lazy_loaded_members_cache.get(cache_key) if cache is None: logger.debug("creating LruCache for %r", cache_key) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 582c999abd..9452e7ca9f 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py
@@ -14,7 +14,7 @@ # limitations under the License. import itertools import logging -from typing import Any, Callable, Dict, List +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple from synapse.api.constants import Membership, PresenceState from synapse.api.errors import Codes, StoreError, SynapseError @@ -26,11 +26,15 @@ from synapse.events.utils import ( from synapse.handlers.presence import format_user_presence_state from synapse.handlers.sync import KnockedSyncResult, SyncConfig from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string -from synapse.types import StreamToken +from synapse.http.site import SynapseRequest +from synapse.types import JsonDict, StreamToken from synapse.util import json_decoder from ._base import client_patterns, set_timeline_upper_limit +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -73,7 +77,7 @@ class SyncRestServlet(RestServlet): PATTERNS = client_patterns("/sync$") ALLOWED_PRESENCE = {"online", "offline", "unavailable"} - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__() self.hs = hs self.auth = hs.get_auth() @@ -85,7 +89,7 @@ class SyncRestServlet(RestServlet): self._server_notices_sender = hs.get_server_notices_sender() self._event_serializer = hs.get_event_client_serializer() - async def on_GET(self, request): + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: if b"from" in request.args: # /events used to use 'from', but /sync uses 'since'. # Lets be helpful and whine if we see a 'from'. diff --git a/synapse/secrets.py b/synapse/secrets.py
index fb6d90a3b7..7939db75e7 100644 --- a/synapse/secrets.py +++ b/synapse/secrets.py
@@ -26,10 +26,10 @@ if sys.version_info[0:2] >= (3, 6): import secrets class Secrets: - def token_bytes(self, nbytes=32): + def token_bytes(self, nbytes: int = 32) -> bytes: return secrets.token_bytes(nbytes) - def token_hex(self, nbytes=32): + def token_hex(self, nbytes: int = 32) -> str: return secrets.token_hex(nbytes) @@ -38,8 +38,8 @@ else: import os class Secrets: - def token_bytes(self, nbytes=32): + def token_bytes(self, nbytes: int = 32) -> bytes: return os.urandom(nbytes) - def token_hex(self, nbytes=32): + def token_hex(self, nbytes: int = 32) -> str: return binascii.hexlify(self.token_bytes(nbytes)).decode("ascii") diff --git a/synapse/server.py b/synapse/server.py
index 98822d8e2f..5e787e2281 100644 --- a/synapse/server.py +++ b/synapse/server.py
@@ -650,13 +650,13 @@ class HomeServer(metaclass=abc.ABCMeta): return FederationHandlerRegistry(self) @cache_in_self - def get_server_notices_manager(self): + def get_server_notices_manager(self) -> ServerNoticesManager: if self.config.worker_app: raise Exception("Workers cannot send server notices") return ServerNoticesManager(self) @cache_in_self - def get_server_notices_sender(self): + def get_server_notices_sender(self) -> WorkerServerNoticesSender: if self.config.worker_app: return WorkerServerNoticesSender(self) return ServerNoticesSender(self) diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 9137c4edb1..a9349bf9a1 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py
@@ -13,13 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any +from typing import TYPE_CHECKING, Any, Set from synapse.api.errors import SynapseError from synapse.api.urls import ConsentURIBuilder from synapse.config import ConfigError from synapse.types import get_localpart_from_id +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -28,16 +31,11 @@ class ConsentServerNotices: privacy policy consent, and sends one if we do. """ - def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "HomeServer"): self._server_notices_manager = hs.get_server_notices_manager() self._store = hs.get_datastore() - self._users_in_progress = set() + self._users_in_progress = set() # type: Set[str] self._current_consent_version = hs.config.user_consent_version self._server_notice_content = hs.config.user_consent_server_notice_content @@ -73,6 +71,10 @@ class ConsentServerNotices: try: u = await self._store.get_user_by_id(user_id) + # The user doesn't exist. + if u is None: + return + if u["is_guest"] and not self._send_to_guests: # don't send to guests return diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py
index 6652451346..a18a2e76c9 100644 --- a/synapse/server_notices/resource_limits_server_notices.py +++ b/synapse/server_notices/resource_limits_server_notices.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List, Tuple +from typing import TYPE_CHECKING, List, Tuple from synapse.api.constants import ( EventTypes, @@ -24,6 +24,9 @@ from synapse.api.constants import ( from synapse.api.errors import AuthError, ResourceLimitError, SynapseError from synapse.server_notices.server_notices_manager import SERVER_NOTICE_ROOM_TAG +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -32,11 +35,7 @@ class ResourceLimitsServerNotices: ensures that the client is kept up to date. """ - def __init__(self, hs): - """ - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "HomeServer"): self._server_notices_manager = hs.get_server_notices_manager() self._store = hs.get_datastore() self._auth = hs.get_auth() diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py
index c46b2f047d..144e1da78e 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py
@@ -58,7 +58,7 @@ class ServerNoticesManager: user_id: str, event_content: dict, type: str = EventTypes.Message, - state_key: Optional[bool] = None, + state_key: Optional[str] = None, ) -> EventBase: """Send a notice to the given user diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py
index 6870b67ca0..965c645889 100644 --- a/synapse/server_notices/server_notices_sender.py +++ b/synapse/server_notices/server_notices_sender.py
@@ -12,25 +12,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable, Union +from typing import TYPE_CHECKING, Iterable, Union from synapse.server_notices.consent_server_notices import ConsentServerNotices from synapse.server_notices.resource_limits_server_notices import ( ResourceLimitsServerNotices, ) +from synapse.server_notices.worker_server_notices_sender import ( + WorkerServerNoticesSender, +) + +if TYPE_CHECKING: + from synapse.server import HomeServer -class ServerNoticesSender: +class ServerNoticesSender(WorkerServerNoticesSender): """A centralised place which sends server notices automatically when Certain Events take place """ - def __init__(self, hs): - """ - - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "HomeServer"): + super().__init__(hs) self._server_notices = ( ConsentServerNotices(hs), ResourceLimitsServerNotices(hs), diff --git a/synapse/server_notices/worker_server_notices_sender.py b/synapse/server_notices/worker_server_notices_sender.py
index 9273e61895..c76bd57460 100644 --- a/synapse/server_notices/worker_server_notices_sender.py +++ b/synapse/server_notices/worker_server_notices_sender.py
@@ -12,16 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from synapse.server import HomeServer class WorkerServerNoticesSender: """Stub impl of ServerNoticesSender which does nothing""" - def __init__(self, hs): - """ - Args: - hs (synapse.server.HomeServer): - """ + def __init__(self, hs: "HomeServer"): + pass async def on_user_syncing(self, user_id: str) -> None: """Called when the user performs a sync operation. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 45ca6620a8..691080ce74 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import List, Tuple +from typing import List, Optional, Tuple from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.replication.tcp.streams import ToDeviceStream @@ -115,7 +115,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): async def get_new_messages_for_device( self, user_id: str, - device_id: str, + device_id: Optional[str], last_stream_id: int, current_stream_id: int, limit: int = 100, @@ -163,7 +163,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): @trace async def delete_messages_for_device( - self, user_id: str, device_id: str, up_to_stream_id: int + self, user_id: str, device_id: Optional[str], up_to_stream_id: int ) -> int: """ Args: diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index d788dc0fc6..757da3d55d 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Dict, List +from typing import Dict, List, Optional from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore @@ -109,7 +109,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): return users @cached(num_args=1) - async def user_last_seen_monthly_active(self, user_id: str) -> int: + async def user_last_seen_monthly_active(self, user_id: str) -> Optional[int]: """ Checks if a given user is part of the monthly active user group diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 0309661841..b7072f1f5e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction -from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util.caches.expiringcache import ExpiringCache @@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore): stream_ordering: the stream_ordering of the event """ - return await self.db_pool.runInteraction( - "store_destination_rooms_entries", - self._store_destination_rooms_entries_txn, - destinations, - room_id, - stream_ordering, + await self.db_pool.simple_upsert_many( + table="destinations", + key_names=("destination",), + key_values=[(d,) for d in destinations], + value_names=[], + value_values=[], + desc="store_destination_rooms_entries_dests", ) - def _store_destination_rooms_entries_txn( - self, - txn: LoggingTransaction, - destinations: Iterable[str], - room_id: str, - stream_ordering: int, - ) -> None: - - # ensure we have a `destinations` row for this destination, as there is - # a foreign key constraint. - if isinstance(self.database_engine, PostgresEngine): - q = """ - INSERT INTO destinations (destination) - VALUES (?) - ON CONFLICT DO NOTHING; - """ - elif isinstance(self.database_engine, Sqlite3Engine): - q = """ - INSERT OR IGNORE INTO destinations (destination) - VALUES (?); - """ - else: - raise RuntimeError("Unknown database engine") - - txn.execute_batch(q, ((destination,) for destination in destinations)) - rows = [(destination, room_id) for destination in destinations] - - self.db_pool.simple_upsert_many_txn( - txn, + await self.db_pool.simple_upsert_many( table="destination_rooms", key_names=("destination", "room_id"), key_values=rows, value_names=["stream_ordering"], value_values=[(stream_ordering,)] * len(rows), + desc="store_destination_rooms_entries_rooms", ) async def get_destination_last_successful_stream_ordering( diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index aa25bd8350..2e277a21c4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py
@@ -449,7 +449,7 @@ class StateGroupStorage: return self.stores.state._get_state_groups_from_groups(groups, state_filter) async def get_state_for_events( - self, event_ids: List[str], state_filter: StateFilter = StateFilter.all() + self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all() ) -> Dict[str, StateMap[EventBase]]: """Given a list of event_ids and type tuples, return a list of state dicts for each event. @@ -485,7 +485,7 @@ class StateGroupStorage: return {event: event_to_state[event] for event in event_ids} async def get_state_ids_for_events( - self, event_ids: List[str], state_filter: StateFilter = StateFilter.all() + self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all() ) -> Dict[str, StateMap[str]]: """ Get the state dicts corresponding to a list of events, containing the event_ids diff --git a/synapse/visibility.py b/synapse/visibility.py
index e39d02602a..ff53a49b3a 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import operator +from typing import Dict, FrozenSet, List, Optional from synapse.api.constants import ( AccountDataTypes, @@ -21,10 +21,11 @@ from synapse.api.constants import ( HistoryVisibility, Membership, ) +from synapse.events import EventBase from synapse.events.utils import prune_event from synapse.storage import Storage from synapse.storage.state import StateFilter -from synapse.types import get_domain_from_id +from synapse.types import StateMap, get_domain_from_id logger = logging.getLogger(__name__) @@ -48,32 +49,32 @@ MEMBERSHIP_PRIORITY = ( async def filter_events_for_client( storage: Storage, - user_id, - events, - is_peeking=False, - always_include_ids=frozenset(), - filter_send_to_client=True, -): + user_id: str, + events: List[EventBase], + is_peeking: bool = False, + always_include_ids: FrozenSet[str] = frozenset(), + filter_send_to_client: bool = True, +) -> List[EventBase]: """ Check which events a user is allowed to see. If the user can see the event but its sender asked for their data to be erased, prune the content of the event. Args: storage - user_id(str): user id to be checked - events(list[synapse.events.EventBase]): sequence of events to be checked - is_peeking(bool): should be True if: + user_id: user id to be checked + events: sequence of events to be checked + is_peeking: should be True if: * the user is not currently a member of the room, and: * the user has not been a member of the room since the given events - always_include_ids (set(event_id)): set of event ids to specifically + always_include_ids: set of event ids to specifically include (unless sender is ignored) - filter_send_to_client (bool): Whether we're checking an event that's going to be + filter_send_to_client: Whether we're checking an event that's going to be sent to a client. This might not always be the case since this function can also be called to check whether a user can see the state at a given point. Returns: - list[synapse.events.EventBase] + The filtered events. """ # Filter out events that have been soft failed so that we don't relay them # to clients. @@ -90,7 +91,7 @@ async def filter_events_for_client( AccountDataTypes.IGNORED_USER_LIST, user_id ) - ignore_list = frozenset() + ignore_list = frozenset() # type: FrozenSet[str] if ignore_dict_content: ignored_users_dict = ignore_dict_content.get("ignored_users", {}) if isinstance(ignored_users_dict, dict): @@ -107,19 +108,18 @@ async def filter_events_for_client( room_id ] = await storage.main.get_retention_policy_for_room(room_id) - def allowed(event): + def allowed(event: EventBase) -> Optional[EventBase]: """ Args: - event (synapse.events.EventBase): event to check + event: event to check Returns: - None|EventBase: - None if the user cannot see this event at all + None if the user cannot see this event at all - a redacted copy of the event if they can only see a redacted - version + a redacted copy of the event if they can only see a redacted + version - the original event if they can see it as normal. + the original event if they can see it as normal. """ # Only run some checks if these events aren't about to be sent to clients. This is # because, if this is not the case, we're probably only checking if the users can @@ -252,48 +252,46 @@ async def filter_events_for_client( return event - # check each event: gives an iterable[None|EventBase] + # Check each event: gives an iterable of None or (a potentially modified) + # EventBase. filtered_events = map(allowed, events) - # remove the None entries - filtered_events = filter(operator.truth, filtered_events) - - # we turn it into a list before returning it. - return list(filtered_events) + # Turn it into a list and remove None entries before returning. + return [ev for ev in filtered_events if ev] async def filter_events_for_server( storage: Storage, - server_name, - events, - redact=True, - check_history_visibility_only=False, -): + server_name: str, + events: List[EventBase], + redact: bool = True, + check_history_visibility_only: bool = False, +) -> List[EventBase]: """Filter a list of events based on whether given server is allowed to see them. Args: storage - server_name (str) - events (iterable[FrozenEvent]) - redact (bool): Whether to return a redacted version of the event, or + server_name + events + redact: Whether to return a redacted version of the event, or to filter them out entirely. - check_history_visibility_only (bool): Whether to only check the + check_history_visibility_only: Whether to only check the history visibility, rather than things like if the sender has been erased. This is used e.g. during pagination to decide whether to backfill or not. Returns - list[FrozenEvent] + The filtered events. """ - def is_sender_erased(event, erased_senders): + def is_sender_erased(event: EventBase, erased_senders: Dict[str, bool]) -> bool: if erased_senders and erased_senders[event.sender]: logger.info("Sender of %s has been erased, redacting", event.event_id) return True return False - def check_event_is_visible(event, state): + def check_event_is_visible(event: EventBase, state: StateMap[EventBase]) -> bool: history = state.get((EventTypes.RoomHistoryVisibility, ""), None) if history: visibility = history.content.get(