diff options
Diffstat (limited to 'synapse')
27 files changed, 410 insertions, 359 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py index cf1ebf1af2..1bb2e86789 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -17,6 +17,7 @@ """Contains exceptions and error codes.""" import logging +from typing import Dict from six import iteritems from six.moves import http_client @@ -111,7 +112,7 @@ class ProxiedRequestError(SynapseError): def __init__(self, code, msg, errcode=Codes.UNKNOWN, additional_fields=None): super(ProxiedRequestError, self).__init__(code, msg, errcode) if additional_fields is None: - self._additional_fields = {} + self._additional_fields = {} # type: Dict else: self._additional_fields = dict(additional_fields) diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 95292b7dec..c6f50fd7b9 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -12,6 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from typing import Dict + import attr @@ -102,4 +105,4 @@ KNOWN_ROOM_VERSIONS = { RoomVersions.V4, RoomVersions.V5, ) -} # type: dict[str, RoomVersion] +} # type: Dict[str, RoomVersion] diff --git a/synapse/app/_base.py b/synapse/app/_base.py index c30fdeee9a..2ac7d5c064 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -263,7 +263,9 @@ def start(hs, listeners=None): refresh_certificate(hs) # Start the tracer - synapse.logging.opentracing.init_tracer(hs.config) + synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa + hs.config + ) # It is now safe to start your Synapse. hs.start_listening(listeners) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 8387ff6805..28d36b1bc3 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from typing import Dict from six import string_types from six.moves.urllib import parse as urlparse @@ -56,8 +57,8 @@ def load_appservices(hostname, config_files): return [] # Dicts of value -> filename - seen_as_tokens = {} - seen_ids = {} + seen_as_tokens = {} # type: Dict[str, str] + seen_ids = {} # type: Dict[str, str] appservices = [] diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index 94916f3a49..48976e17b1 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -73,8 +73,8 @@ DEFAULT_CONFIG = """\ class ConsentConfig(Config): - def __init__(self): - super(ConsentConfig, self).__init__() + def __init__(self, *args): + super(ConsentConfig, self).__init__(*args) self.user_consent_version = None self.user_consent_template_dir = None diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py index 788c39c9fb..c50e244394 100644 --- a/synapse/config/password_auth_providers.py +++ b/synapse/config/password_auth_providers.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, List + from synapse.util.module_loader import load_module from ._base import Config @@ -22,7 +24,7 @@ LDAP_PROVIDER = "ldap_auth_provider.LdapAuthProvider" class PasswordAuthProviderConfig(Config): def read_config(self, config, **kwargs): - self.password_providers = [] + self.password_providers = [] # type: List[Any] providers = [] # We want to be backwards compatible with the old `ldap_config` diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 52e014608a..14740891f3 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -15,6 +15,7 @@ import os from collections import namedtuple +from typing import Dict, List from synapse.python_dependencies import DependencyException, check_requirements from synapse.util.module_loader import load_module @@ -61,7 +62,7 @@ def parse_thumbnail_requirements(thumbnail_sizes): Dictionary mapping from media type string to list of ThumbnailRequirement tuples. """ - requirements = {} + requirements = {} # type: Dict[str, List] for size in thumbnail_sizes: width = size["width"] height = size["height"] @@ -130,7 +131,7 @@ class ContentRepositoryConfig(Config): # # We don't create the storage providers here as not all workers need # them to be started. - self.media_storage_providers = [] + self.media_storage_providers = [] # type: List[tuple] for provider_config in storage_providers: # We special case the module "file_system" so as not to need to diff --git a/synapse/config/server.py b/synapse/config/server.py index 536ee7f29c..709bd387e5 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,6 +19,7 @@ import logging import os.path import re from textwrap import indent +from typing import List import attr import yaml @@ -243,7 +244,7 @@ class ServerConfig(Config): # events with profile information that differ from the target's global profile. self.allow_per_room_profiles = config.get("allow_per_room_profiles", True) - self.listeners = [] + self.listeners = [] # type: List[dict] for listener in config.get("listeners", []): if not isinstance(listener.get("port", None), int): raise ConfigError( @@ -287,7 +288,10 @@ class ServerConfig(Config): validator=attr.validators.instance_of(bool), default=False ) complexity = attr.ib( - validator=attr.validators.instance_of((int, float)), default=1.0 + validator=attr.validators.instance_of( + (float, int) # type: ignore[arg-type] # noqa + ), + default=1.0, ) complexity_error = attr.ib( validator=attr.validators.instance_of(str), @@ -366,7 +370,7 @@ class ServerConfig(Config): "cleanup_extremities_with_dummy_events", True ) - def has_tls_listener(self): + def has_tls_listener(self) -> bool: return any(l["tls"] for l in self.listeners) def generate_config_section( diff --git a/synapse/config/server_notices_config.py b/synapse/config/server_notices_config.py index eaac3d73bc..6d4285ef93 100644 --- a/synapse/config/server_notices_config.py +++ b/synapse/config/server_notices_config.py @@ -59,8 +59,8 @@ class ServerNoticesConfig(Config): None if server notices are not enabled. """ - def __init__(self): - super(ServerNoticesConfig, self).__init__() + def __init__(self, *args): + super(ServerNoticesConfig, self).__init__(*args) self.server_notices_mxid = None self.server_notices_mxid_display_name = None self.server_notices_mxid_avatar_url = None diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 7f8a16e355..0f16f21c2d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -765,6 +765,10 @@ class PublicRoomList(BaseFederationServlet): else: network_tuple = ThirdPartyInstanceID(None, None) + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + data = await maybeDeferred( self.handler.get_local_public_room_list, limit, @@ -800,6 +804,10 @@ class PublicRoomList(BaseFederationServlet): if search_filter is None: logger.warning("Nonefilter") + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + data = await self.handler.get_local_public_room_list( limit=limit, since_token=since_token, diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index a7e55f00e5..cfed344d4d 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -16,8 +16,7 @@ import logging from collections import namedtuple -from six import PY3, iteritems -from six.moves import range +from six import iteritems import msgpack from unpaddedbase64 import decode_base64, encode_base64 @@ -27,7 +26,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, HttpResponseException from synapse.types import ThirdPartyInstanceID -from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache @@ -37,7 +35,6 @@ logger = logging.getLogger(__name__) REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 - # This is used to indicate we should only return rooms published to the main list. EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) @@ -72,6 +69,8 @@ class RoomListHandler(BaseHandler): This can be (None, None) to indicate the main list, or a particular appservice and network id to use an appservice specific one. Setting to None returns all public rooms across all lists. + from_federation (bool): true iff the request comes from the federation + API """ if not self.enable_room_list_search: return defer.succeed({"chunk": [], "total_room_count_estimate": 0}) @@ -133,239 +132,117 @@ class RoomListHandler(BaseHandler): from_federation (bool): Whether this request originated from a federating server or a client. Used for room filtering. timeout (int|None): Amount of seconds to wait for a response before - timing out. + timing out. TODO """ - if since_token and since_token != "END": - since_token = RoomListNextBatch.from_token(since_token) - else: - since_token = None - rooms_to_order_value = {} - rooms_to_num_joined = {} + # Pagination tokens work by storing the room ID sent in the last batch, + # plus the direction (forwards or backwards). Next batch tokens always + # go forwards, prev batch tokens always go backwards. - newly_visible = [] - newly_unpublished = [] if since_token: - stream_token = since_token.stream_ordering - current_public_id = yield self.store.get_current_public_room_stream_id() - public_room_stream_id = since_token.public_room_stream_id - newly_visible, newly_unpublished = yield self.store.get_public_room_changes( - public_room_stream_id, current_public_id, network_tuple=network_tuple - ) - else: - stream_token = yield self.store.get_room_max_stream_ordering() - public_room_stream_id = yield self.store.get_current_public_room_stream_id() - - room_ids = yield self.store.get_public_room_ids_at_stream_id( - public_room_stream_id, network_tuple=network_tuple - ) - - # We want to return rooms in a particular order: the number of joined - # users. We then arbitrarily use the room_id as a tie breaker. - - @defer.inlineCallbacks - def get_order_for_room(room_id): - # Most of the rooms won't have changed between the since token and - # now (especially if the since token is "now"). So, we can ask what - # the current users are in a room (that will hit a cache) and then - # check if the room has changed since the since token. (We have to - # do it in that order to avoid races). - # If things have changed then fall back to getting the current state - # at the since token. - joined_users = yield self.store.get_users_in_room(room_id) - if self.store.has_room_changed_since(room_id, stream_token): - latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_token - ) - - if not latest_event_ids: - return + batch_token = RoomListNextBatch.from_token(since_token) - joined_users = yield self.state_handler.get_current_users_in_room( - room_id, latest_event_ids - ) - - num_joined_users = len(joined_users) - rooms_to_num_joined[room_id] = num_joined_users + bounds = (batch_token.last_joined_members, batch_token.last_room_id) + forwards = batch_token.direction_is_forward + else: + batch_token = None + bounds = None - if num_joined_users == 0: - return + forwards = True - # We want larger rooms to be first, hence negating num_joined_users - rooms_to_order_value[room_id] = (-num_joined_users, room_id) + # we request one more than wanted to see if there are more pages to come + probing_limit = limit + 1 if limit is not None else None - logger.info( - "Getting ordering for %i rooms since %s", len(room_ids), stream_token + results = yield self.store.get_largest_public_rooms( + network_tuple, + search_filter, + probing_limit, + bounds=bounds, + forwards=forwards, + ignore_non_federatable=from_federation, ) - yield concurrently_execute(get_order_for_room, room_ids, 10) - sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) - sorted_rooms = [room_id for room_id, _ in sorted_entries] + def build_room_entry(room): + entry = { + "room_id": room["room_id"], + "name": room["name"], + "topic": room["topic"], + "canonical_alias": room["canonical_alias"], + "num_joined_members": room["joined_members"], + "avatar_url": room["avatar"], + "world_readable": room["history_visibility"] == "world_readable", + "guest_can_join": room["guest_access"] == "can_join", + } - # `sorted_rooms` should now be a list of all public room ids that is - # stable across pagination. Therefore, we can use indices into this - # list as our pagination tokens. + # Filter out Nones – rather omit the field altogether + return {k: v for k, v in entry.items() if v is not None} - # Filter out rooms that we don't want to return - rooms_to_scan = [ - r - for r in sorted_rooms - if r not in newly_unpublished and rooms_to_num_joined[r] > 0 - ] + results = [build_room_entry(r) for r in results] - total_room_count = len(rooms_to_scan) + response = {} + num_results = len(results) + if limit is not None: + more_to_come = num_results == probing_limit - if since_token: - # Filter out rooms we've already returned previously - # `since_token.current_limit` is the index of the last room we - # sent down, so we exclude it and everything before/after it. - if since_token.direction_is_forward: - rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :] + # Depending on direction we trim either the front or back. + if forwards: + results = results[:limit] else: - rooms_to_scan = rooms_to_scan[: since_token.current_limit] - rooms_to_scan.reverse() - - logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan)) - - # _append_room_entry_to_chunk will append to chunk but will stop if - # len(chunk) > limit - # - # Normally we will generate enough results on the first iteration here, - # but if there is a search filter, _append_room_entry_to_chunk may - # filter some results out, in which case we loop again. - # - # We don't want to scan over the entire range either as that - # would potentially waste a lot of work. - # - # XXX if there is no limit, we may end up DoSing the server with - # calls to get_current_state_ids for every single room on the - # server. Surely we should cap this somehow? - # - if limit: - step = limit + 1 + results = results[-limit:] else: - # step cannot be zero - step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 - - chunk = [] - for i in range(0, len(rooms_to_scan), step): - if timeout and self.clock.time() > timeout: - raise Exception("Timed out searching room directory") - - batch = rooms_to_scan[i : i + step] - logger.info("Processing %i rooms for result", len(batch)) - yield concurrently_execute( - lambda r: self._append_room_entry_to_chunk( - r, - rooms_to_num_joined[r], - chunk, - limit, - search_filter, - from_federation=from_federation, - ), - batch, - 5, - ) - logger.info("Now %i rooms in result", len(chunk)) - if len(chunk) >= limit + 1: - break - - chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) - - # Work out the new limit of the batch for pagination, or None if we - # know there are no more results that would be returned. - # i.e., [since_token.current_limit..new_limit] is the batch of rooms - # we've returned (or the reverse if we paginated backwards) - # We tried to pull out limit + 1 rooms above, so if we have <= limit - # then we know there are no more results to return - new_limit = None - if chunk and (not limit or len(chunk) > limit): - - if not since_token or since_token.direction_is_forward: - if limit: - chunk = chunk[:limit] - last_room_id = chunk[-1]["room_id"] + more_to_come = False + + if num_results > 0: + final_entry = results[-1] + initial_entry = results[0] + + if forwards: + if batch_token: + # If there was a token given then we assume that there + # must be previous results. + response["prev_batch"] = RoomListNextBatch( + last_joined_members=initial_entry["num_joined_members"], + last_room_id=initial_entry["room_id"], + direction_is_forward=False, + ).to_token() + + if more_to_come: + response["next_batch"] = RoomListNextBatch( + last_joined_members=final_entry["num_joined_members"], + last_room_id=final_entry["room_id"], + direction_is_forward=True, + ).to_token() else: - if limit: - chunk = chunk[-limit:] - last_room_id = chunk[0]["room_id"] - - new_limit = sorted_rooms.index(last_room_id) - - results = {"chunk": chunk, "total_room_count_estimate": total_room_count} - - if since_token: - results["new_rooms"] = bool(newly_visible) - - if not since_token or since_token.direction_is_forward: - if new_limit is not None: - results["next_batch"] = RoomListNextBatch( - stream_ordering=stream_token, - public_room_stream_id=public_room_stream_id, - current_limit=new_limit, - direction_is_forward=True, - ).to_token() - - if since_token: - results["prev_batch"] = since_token.copy_and_replace( - direction_is_forward=False, - current_limit=since_token.current_limit + 1, - ).to_token() - else: - if new_limit is not None: - results["prev_batch"] = RoomListNextBatch( - stream_ordering=stream_token, - public_room_stream_id=public_room_stream_id, - current_limit=new_limit, - direction_is_forward=False, - ).to_token() - - if since_token: - results["next_batch"] = since_token.copy_and_replace( - direction_is_forward=True, - current_limit=since_token.current_limit - 1, - ).to_token() - - return results - - @defer.inlineCallbacks - def _append_room_entry_to_chunk( - self, - room_id, - num_joined_users, - chunk, - limit, - search_filter, - from_federation=False, - ): - """Generate the entry for a room in the public room list and append it - to the `chunk` if it matches the search filter - - Args: - room_id (str): The ID of the room. - num_joined_users (int): The number of joined users in the room. - chunk (list) - limit (int|None): Maximum amount of rooms to display. Function will - return if length of chunk is greater than limit + 1. - search_filter (dict|None) - from_federation (bool): Whether this request originated from a - federating server or a client. Used for room filtering. - """ - if limit and len(chunk) > limit + 1: - # We've already got enough, so lets just drop it. - return + if batch_token: + response["next_batch"] = RoomListNextBatch( + last_joined_members=final_entry["num_joined_members"], + last_room_id=final_entry["room_id"], + direction_is_forward=True, + ).to_token() + + if more_to_come: + response["prev_batch"] = RoomListNextBatch( + last_joined_members=initial_entry["num_joined_members"], + last_room_id=initial_entry["room_id"], + direction_is_forward=False, + ).to_token() + + for room in results: + # populate search result entries with additional fields, namely + # 'aliases' + room_id = room["room_id"] + + aliases = yield self.store.get_aliases_for_room(room_id) + if aliases: + room["aliases"] = aliases - result = yield self.generate_room_entry(room_id, num_joined_users) - if not result: - return + response["chunk"] = results - if from_federation and not result.get("m.federate", True): - # This is a room that other servers cannot join. Do not show them - # this room. - return + response["total_room_count_estimate"] = yield self.store.count_public_rooms( + network_tuple, ignore_non_federatable=from_federation + ) - if _matches_room_entry(result, search_filter): - chunk.append(result) + return response @cachedInlineCallbacks(num_args=1, cache_context=True) def generate_room_entry( @@ -580,18 +457,15 @@ class RoomListNextBatch( namedtuple( "RoomListNextBatch", ( - "stream_ordering", # stream_ordering of the first public room list - "public_room_stream_id", # public room stream id for first public room list - "current_limit", # The number of previous rooms returned + "last_joined_members", # The count to get rooms after/before + "last_room_id", # The room_id to get rooms after/before "direction_is_forward", # Bool if this is a next_batch, false if prev_batch ), ) ): - KEY_DICT = { - "stream_ordering": "s", - "public_room_stream_id": "p", - "current_limit": "n", + "last_joined_members": "m", + "last_room_id": "r", "direction_is_forward": "d", } @@ -599,13 +473,7 @@ class RoomListNextBatch( @classmethod def from_token(cls, token): - if PY3: - # The argument raw=False is only available on new versions of - # msgpack, and only really needed on Python 3. Gate it behind - # a PY3 check to avoid causing issues on Debian-packaged versions. - decoded = msgpack.loads(decode_base64(token), raw=False) - else: - decoded = msgpack.loads(decode_base64(token)) + decoded = msgpack.loads(decode_base64(token), raw=False) return RoomListNextBatch( **{cls.REVERSE_KEY_DICT[key]: val for key, val in decoded.items()} ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 8abdb1b6e6..95a244d86c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -216,8 +216,8 @@ class RoomMemberHandler(object): self.copy_room_tags_and_direct_to_room( predecessor["room_id"], room_id, user_id ) - # Move over old push rules - self.store.move_push_rules_from_room_to_room_for_user( + # Copy over push rules + yield self.store.copy_push_rules_from_room_to_room_for_user( predecessor["room_id"], room_id, user_id ) elif event.membership == Membership.LEAVE: diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 308a27213b..cd1ff6a518 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -170,6 +170,7 @@ import inspect import logging import re from functools import wraps +from typing import Dict from canonicaljson import json @@ -547,7 +548,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T return span = opentracing.tracer.active_span - carrier = {} + carrier = {} # type: Dict[str, str] opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier) for key, value in carrier.items(): @@ -584,7 +585,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True): span = opentracing.tracer.active_span - carrier = {} + carrier = {} # type: Dict[str, str] opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier) for key, value in carrier.items(): @@ -639,7 +640,7 @@ def get_active_span_text_map(destination=None): if destination and not whitelisted_homeserver(destination): return {} - carrier = {} + carrier = {} # type: Dict[str, str] opentracing.tracer.inject( opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier ) @@ -653,7 +654,7 @@ def active_span_context_as_string(): Returns: The active span context encoded as a string. """ - carrier = {} + carrier = {} # type: Dict[str, str] if opentracing: opentracing.tracer.inject( opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py index 7df0fa6087..6073fc2725 100644 --- a/synapse/logging/utils.py +++ b/synapse/logging/utils.py @@ -119,7 +119,11 @@ def trace_function(f): logger = logging.getLogger(name) level = logging.DEBUG - s = inspect.currentframe().f_back + frame = inspect.currentframe() + if frame is None: + raise Exception("Can't get current frame!") + + s = frame.f_back to_print = [ "\t%s:%s %s. Args: args=%s, kwargs=%s" @@ -144,7 +148,7 @@ def trace_function(f): pathname=pathname, lineno=lineno, msg=msg, - args=None, + args=tuple(), exc_info=None, ) @@ -157,7 +161,12 @@ def trace_function(f): def get_previous_frames(): - s = inspect.currentframe().f_back.f_back + + frame = inspect.currentframe() + if frame is None: + raise Exception("Can't get current frame!") + + s = frame.f_back.f_back to_return = [] while s: if s.f_globals["__name__"].startswith("synapse"): @@ -174,7 +183,10 @@ def get_previous_frames(): def get_previous_frame(ignore=[]): - s = inspect.currentframe().f_back.f_back + frame = inspect.currentframe() + if frame is None: + raise Exception("Can't get current frame!") + s = frame.f_back.f_back while s: if s.f_globals["__name__"].startswith("synapse"): diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index bec3b13397..0b45e1f52a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -125,7 +125,7 @@ class InFlightGauge(object): ) # Counts number of in flight blocks for a given set of label values - self._registrations = {} + self._registrations = {} # type: Dict # Protects access to _registrations self._lock = threading.Lock() @@ -226,7 +226,7 @@ class BucketCollector(object): # Fetch the data -- this must be synchronous! data = self.data_collector() - buckets = {} + buckets = {} # type: Dict[float, int] res = [] for x in data.keys(): diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index 74d9c3ecd3..a248103191 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -36,9 +36,9 @@ from twisted.web.resource import Resource try: from prometheus_client.samples import Sample except ImportError: - Sample = namedtuple( + Sample = namedtuple( # type: ignore[no-redef] # noqa "Sample", ["name", "labels", "value", "timestamp", "exemplar"] - ) # type: ignore + ) CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8") diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 0bd563edc7..aa7da1c543 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import Set +from typing import List, Set from pkg_resources import ( DistributionNotFound, @@ -73,6 +73,7 @@ REQUIREMENTS = [ "netaddr>=0.7.18", "Jinja2>=2.9", "bleach>=1.4.3", + "typing-extensions>=3.7.4", ] CONDITIONAL_REQUIREMENTS = { @@ -144,7 +145,11 @@ def check_requirements(for_feature=None): deps_needed.append(dependency) errors.append( "Needed %s, got %s==%s" - % (dependency, e.dist.project_name, e.dist.version) + % ( + dependency, + e.dist.project_name, # type: ignore[attr-defined] # noqa + e.dist.version, # type: ignore[attr-defined] # noqa + ) ) except DistributionNotFound: deps_needed.append(dependency) @@ -159,7 +164,7 @@ def check_requirements(for_feature=None): if not for_feature: # Check the optional dependencies are up to date. We allow them to not be # installed. - OPTS = sum(CONDITIONAL_REQUIREMENTS.values(), []) + OPTS = sum(CONDITIONAL_REQUIREMENTS.values(), []) # type: List[str] for dependency in OPTS: try: @@ -168,7 +173,11 @@ def check_requirements(for_feature=None): deps_needed.append(dependency) errors.append( "Needed optional %s, got %s==%s" - % (dependency, e.dist.project_name, e.dist.version) + % ( + dependency, + e.dist.project_name, # type: ignore[attr-defined] # noqa + e.dist.version, # type: ignore[attr-defined] # noqa + ) ) except DistributionNotFound: # If it's not found, we don't care diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 6bf924dedc..9c1d41421c 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -361,6 +361,10 @@ class PublicRoomListRestServlet(TransactionRestServlet): limit = parse_integer(request, "limit", 0) since_token = parse_string(request, "since", None) + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + handler = self.hs.get_room_list_handler() if server: data = yield handler.get_remote_public_room_list( @@ -398,6 +402,10 @@ class PublicRoomListRestServlet(TransactionRestServlet): else: network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id) + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + handler = self.hs.get_room_list_handler() if server: data = yield handler.get_remote_public_room_list( diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index a6517c4cf3..c4e24edff2 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -183,8 +183,8 @@ class PushRulesWorkerStore( return results @defer.inlineCallbacks - def move_push_rule_from_room_to_room(self, new_room_id, user_id, rule): - """Move a single push rule from one room to another for a specific user. + def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): + """Copy a single push rule from one room to another for a specific user. Args: new_room_id (str): ID of the new room. @@ -209,14 +209,11 @@ class PushRulesWorkerStore( actions=rule["actions"], ) - # Delete push rule for the old room - yield self.delete_push_rule(user_id, rule["rule_id"]) - @defer.inlineCallbacks - def move_push_rules_from_room_to_room_for_user( + def copy_push_rules_from_room_to_room_for_user( self, old_room_id, new_room_id, user_id ): - """Move all of the push rules from one room to another for a specific + """Copy all of the push rules from one room to another for a specific user. Args: @@ -227,15 +224,14 @@ class PushRulesWorkerStore( # Retrieve push rules for this user user_push_rules = yield self.get_push_rules_for_user(user_id) - # Get rules relating to the old room, move them to the new room, then - # delete them from the old room + # Get rules relating to the old room and copy them to the new room for rule in user_push_rules: conditions = rule.get("conditions", []) if any( (c.get("key") == "room_id" and c.get("pattern") == old_room_id) for c in conditions ): - self.move_push_rule_from_room_to_room(new_room_id, user_id, rule) + yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) @defer.inlineCallbacks def bulk_get_push_rules_for_room(self, event, context): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 08e13f3a3b..9b7e31583c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ import collections import logging import re +from typing import Optional, Tuple from canonicaljson import json @@ -24,6 +26,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore from synapse.storage.search import SearchStore +from synapse.types import ThirdPartyInstanceID from synapse.util.caches.descriptors import cached, cachedInlineCallbacks logger = logging.getLogger(__name__) @@ -63,103 +66,193 @@ class RoomWorkerStore(SQLBaseStore): desc="get_public_room_ids", ) - @cached(num_args=2, max_entries=100) - def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): - """Get pulbic rooms for a particular list, or across all lists. + def count_public_rooms(self, network_tuple, ignore_non_federatable): + """Counts the number of public rooms as tracked in the room_stats_current + and room_stats_state table. Args: - stream_id (int) - network_tuple (ThirdPartyInstanceID): The list to use (None, None) - means the main list, None means all lsits. + network_tuple (ThirdPartyInstanceID|None) + ignore_non_federatable (bool): If true filters out non-federatable rooms """ - return self.runInteraction( - "get_public_room_ids_at_stream_id", - self.get_public_room_ids_at_stream_id_txn, - stream_id, - network_tuple=network_tuple, - ) - - def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, network_tuple): - return { - rm - for rm, vis in self.get_published_at_stream_id_txn( - txn, stream_id, network_tuple=network_tuple - ).items() - if vis - } - def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple): - if network_tuple: - # We want to get from a particular list. No aggregation required. + def _count_public_rooms_txn(txn): + query_args = [] + + if network_tuple: + if network_tuple.appservice_id: + published_sql = """ + SELECT room_id from appservice_room_list + WHERE appservice_id = ? AND network_id = ? + """ + query_args.append(network_tuple.appservice_id) + query_args.append(network_tuple.network_id) + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + """ + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + UNION SELECT room_id from appservice_room_list + """ sql = """ - SELECT room_id, visibility FROM public_room_list_stream - INNER JOIN ( - SELECT room_id, max(stream_id) AS stream_id - FROM public_room_list_stream - WHERE stream_id <= ? %s - GROUP BY room_id - ) grouped USING (room_id, stream_id) - """ + SELECT + COALESCE(COUNT(*), 0) + FROM ( + %(published_sql)s + ) published + INNER JOIN room_stats_state USING (room_id) + INNER JOIN room_stats_current USING (room_id) + WHERE + ( + join_rules = 'public' OR history_visibility = 'world_readable' + ) + AND joined_members > 0 + """ % { + "published_sql": published_sql + } + + txn.execute(sql, query_args) + return txn.fetchone()[0] + + return self.runInteraction("count_public_rooms", _count_public_rooms_txn) + + @defer.inlineCallbacks + def get_largest_public_rooms( + self, + network_tuple: Optional[ThirdPartyInstanceID], + search_filter: Optional[dict], + limit: Optional[int], + bounds: Optional[Tuple[int, str]], + forwards: bool, + ignore_non_federatable: bool = False, + ): + """Gets the largest public rooms (where largest is in terms of joined + members, as tracked in the statistics table). + + Args: + network_tuple + search_filter + limit: Maxmimum number of rows to return, unlimited otherwise. + bounds: An uppoer or lower bound to apply to result set if given, + consists of a joined member count and room_id (these are + excluded from result set). + forwards: true iff going forwards, going backwards otherwise + ignore_non_federatable: If true filters out non-federatable rooms. + + Returns: + Rooms in order: biggest number of joined users first. + We then arbitrarily use the room_id as a tie breaker. - if network_tuple.appservice_id is not None: - txn.execute( - sql % ("AND appservice_id = ? AND network_id = ?",), - (stream_id, network_tuple.appservice_id, network_tuple.network_id), + """ + + where_clauses = [] + query_args = [] + + # Work out the bounds if we're given them, these bounds look slightly + # odd, but are designed to help query planner use indices by pulling + # out a common bound. + if bounds: + last_joined_members, last_room_id = bounds + if forwards: + where_clauses.append( + """ + joined_members <= ? AND ( + joined_members < ? OR room_id < ? + ) + """ ) else: - txn.execute(sql % ("AND appservice_id IS NULL",), (stream_id,)) - return dict(txn) - else: - # We want to get from all lists, so we need to aggregate the results + where_clauses.append( + """ + joined_members >= ? AND ( + joined_members > ? OR room_id > ? + ) + """ + ) - logger.info("Executing full list") + query_args += [last_joined_members, last_joined_members, last_room_id] - sql = """ - SELECT room_id, visibility - FROM public_room_list_stream - INNER JOIN ( - SELECT - room_id, max(stream_id) AS stream_id, appservice_id, - network_id - FROM public_room_list_stream - WHERE stream_id <= ? - GROUP BY room_id, appservice_id, network_id - ) grouped USING (room_id, stream_id) - """ + if search_filter and search_filter.get("generic_search_term", None): + search_term = "%" + search_filter["generic_search_term"] + "%" - txn.execute(sql, (stream_id,)) + where_clauses.append( + """ + ( + name LIKE ? + OR topic LIKE ? + OR canonical_alias LIKE ? + ) + """ + ) + query_args += [search_term, search_term, search_term] - results = {} - # A room is visible if its visible on any list. - for room_id, visibility in txn: - results[room_id] = bool(visibility) or results.get(room_id, False) + if network_tuple: + if network_tuple.appservice_id: + published_sql = """ + SELECT room_id from appservice_room_list + WHERE appservice_id = ? AND network_id = ? + """ + query_args.append(network_tuple.appservice_id) + query_args.append(network_tuple.network_id) + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + """ + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + UNION SELECT room_id from appservice_room_list + """ - return results + where_clause = "" + if where_clauses: + where_clause = " AND " + " AND ".join(where_clauses) + + sql = """ + SELECT + room_id, name, topic, canonical_alias, joined_members, + avatar, history_visibility, joined_members, guest_access + FROM ( + %(published_sql)s + ) published + INNER JOIN room_stats_state USING (room_id) + INNER JOIN room_stats_current USING (room_id) + WHERE + ( + join_rules = 'public' OR history_visibility = 'world_readable' + ) + AND joined_members > 0 + %(where_clause)s + ORDER BY joined_members %(dir)s, room_id %(dir)s + """ % { + "published_sql": published_sql, + "where_clause": where_clause, + "dir": "DESC" if forwards else "ASC", + } - def get_public_room_changes(self, prev_stream_id, new_stream_id, network_tuple): - def get_public_room_changes_txn(txn): - then_rooms = self.get_public_room_ids_at_stream_id_txn( - txn, prev_stream_id, network_tuple - ) + if limit is not None: + query_args.append(limit) - now_rooms_dict = self.get_published_at_stream_id_txn( - txn, new_stream_id, network_tuple - ) + sql += """ + LIMIT ? + """ - now_rooms_visible = set(rm for rm, vis in now_rooms_dict.items() if vis) - now_rooms_not_visible = set( - rm for rm, vis in now_rooms_dict.items() if not vis - ) + def _get_largest_public_rooms_txn(txn): + txn.execute(sql, query_args) - newly_visible = now_rooms_visible - then_rooms - newly_unpublished = now_rooms_not_visible & then_rooms + results = self.cursor_to_dict(txn) - return newly_visible, newly_unpublished + if not forwards: + results.reverse() - return self.runInteraction( - "get_public_room_changes", get_public_room_changes_txn + return results + + ret_val = yield self.runInteraction( + "get_largest_public_rooms", _get_largest_public_rooms_txn ) + defer.returnValue(ret_val) @cached(max_entries=10000) def is_room_blocked(self, room_id): diff --git a/synapse/storage/schema/delta/56/public_room_list_idx.sql b/synapse/storage/schema/delta/56/public_room_list_idx.sql new file mode 100644 index 0000000000..7be31ffebb --- /dev/null +++ b/synapse/storage/schema/delta/56/public_room_list_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX public_room_list_stream_network ON public_room_list_stream (appservice_id, network_id, room_id); diff --git a/synapse/types.py b/synapse/types.py index 51eadb6ad4..8f79797f17 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -318,6 +318,7 @@ class StreamToken( ) ): _SEPARATOR = "_" + START = None # type: StreamToken @classmethod def from_string(cls, string): @@ -402,7 +403,7 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): followed by the "stream_ordering" id of the event it comes after. """ - __slots__ = [] + __slots__ = [] # type: list @classmethod def parse(cls, string): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index f1c46836b1..0d3bdd88ce 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -13,9 +13,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import collections import logging from contextlib import contextmanager +from typing import Dict, Sequence, Set, Union from six.moves import range @@ -213,7 +215,9 @@ class Linearizer(object): # the first element is the number of things executing, and # the second element is an OrderedDict, where the keys are deferreds for the # things blocked from executing. - self.key_to_defer = {} + self.key_to_defer = ( + {} + ) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]] def queue(self, key): # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly. @@ -340,10 +344,10 @@ class ReadWriteLock(object): def __init__(self): # Latest readers queued - self.key_to_current_readers = {} + self.key_to_current_readers = {} # type: Dict[str, Set[defer.Deferred]] # Latest writer queued - self.key_to_current_writer = {} + self.key_to_current_writer = {} # type: Dict[str, defer.Deferred] @defer.inlineCallbacks def read(self, key): diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index b50e3503f0..43fd65d693 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -16,6 +16,7 @@ import logging import os +from typing import Dict import six from six.moves import intern @@ -37,7 +38,7 @@ def get_cache_factor_for(cache_name): caches_by_name = {} -collectors_by_name = {} +collectors_by_name = {} # type: Dict cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 43f66ec4be..5ac2530a6a 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -18,10 +18,12 @@ import inspect import logging import threading from collections import namedtuple +from typing import Any, cast from six import itervalues from prometheus_client import Gauge +from typing_extensions import Protocol from twisted.internet import defer @@ -37,6 +39,18 @@ from . import register_cache logger = logging.getLogger(__name__) +class _CachedFunction(Protocol): + invalidate = None # type: Any + invalidate_all = None # type: Any + invalidate_many = None # type: Any + prefill = None # type: Any + cache = None # type: Any + num_args = None # type: Any + + def __name__(self): + ... + + cache_pending_metric = Gauge( "synapse_util_caches_cache_pending", "Number of lookups currently pending for this cache", @@ -245,7 +259,9 @@ class Cache(object): class _CacheDescriptorBase(object): - def __init__(self, orig, num_args, inlineCallbacks, cache_context=False): + def __init__( + self, orig: _CachedFunction, num_args, inlineCallbacks, cache_context=False + ): self.orig = orig if inlineCallbacks: @@ -404,7 +420,7 @@ class CacheDescriptor(_CacheDescriptorBase): return tuple(get_cache_key_gen(args, kwargs)) @functools.wraps(self.orig) - def wrapped(*args, **kwargs): + def _wrapped(*args, **kwargs): # If we're passed a cache_context then we'll want to call its invalidate() # whenever we are invalidated invalidate_callback = kwargs.pop("on_invalidate", None) @@ -440,6 +456,8 @@ class CacheDescriptor(_CacheDescriptorBase): return make_deferred_yieldable(observer) + wrapped = cast(_CachedFunction, _wrapped) + if self.num_args == 1: wrapped.invalidate = lambda key: cache.invalidate(key[0]) wrapped.prefill = lambda key, val: cache.prefill(key[0], val) diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 9a72218d85..2ea4e4e911 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -1,3 +1,5 @@ +from typing import Dict + from six import itervalues SENTINEL = object() @@ -12,7 +14,7 @@ class TreeCache(object): def __init__(self): self.size = 0 - self.root = {} + self.root = {} # type: Dict def __setitem__(self, key, value): return self.set(key, value) diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 7ff7eb1e4d..2705cbe5f8 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -54,5 +54,5 @@ def load_python_module(location: str): if spec is None: raise Exception("Unable to load module at %s" % (location,)) mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) + spec.loader.exec_module(mod) # type: ignore return mod |