diff options
Diffstat (limited to 'synapse')
101 files changed, 1325 insertions, 1510 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 5bada5e290..78fc63aa49 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.30.0" +__version__ = "0.31.1" diff --git a/synapse/api/auth.py b/synapse/api/auth.py index f17fda6315..06fa38366d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -15,6 +15,8 @@ import logging +from six import itervalues + import pymacaroons from twisted.internet import defer @@ -57,7 +59,7 @@ class Auth(object): self.TOKEN_NOT_FOUND_HTTP_STATUS = 401 self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000) - register_cache("token_cache", self.token_cache) + register_cache("cache", "token_cache", self.token_cache) @defer.inlineCallbacks def check_from_context(self, event, context, do_sig_check=True): @@ -66,7 +68,7 @@ class Auth(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { - (e.type, e.state_key): e for e in auth_events.values() + (e.type, e.state_key): e for e in itervalues(auth_events) } self.check(event, auth_events=auth_events, do_sig_check=do_sig_check) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index db43219d24..dbc0e7e445 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -411,7 +411,7 @@ class Filter(object): return room_ids def filter(self, events): - return filter(self.check, events) + return list(filter(self.check, events)) def limit(self): return self.filter_json.get("limit", 10) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index e4318cdfc3..a6925ab139 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -124,6 +124,19 @@ def quit_with_error(error_string): sys.exit(1) +def listen_metrics(bind_addresses, port): + """ + Start Prometheus metrics server. + """ + from synapse.metrics import RegistryProxy + from prometheus_client import start_http_server + + for host in bind_addresses: + reactor.callInThread(start_http_server, int(port), + addr=host, registry=RegistryProxy) + logger.info("Metrics now reporting on %s:%d", host, port) + + def listen_tcp(bind_addresses, port, factory, backlog=50): """ Create a TCP socket for a port and several addresses diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index b1efacc9f8..dd114dee07 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -94,6 +94,13 @@ class AppserviceServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 38b98382c6..85dada7f9f 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -77,7 +78,7 @@ class ClientReaderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) PublicRoomListRestServlet(self).register(resource) @@ -118,7 +119,13 @@ class ClientReaderServer(HomeServer): globals={"hs": self}, ) ) - + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index bd7f3d5679..5ca77c0f1a 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -90,7 +91,7 @@ class EventCreatorServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) RoomSendEventRestServlet(self).register(resource) @@ -134,6 +135,13 @@ class EventCreatorServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 6e10b27b9e..2a1995d0cd 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.directory import DirectoryStore @@ -71,7 +72,7 @@ class FederationReaderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "federation": resources.update({ FEDERATION_PREFIX: TransportLayerServer(self), @@ -107,6 +108,13 @@ class FederationReaderServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 6f24e32d6d..81ad574043 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.federation import send_queue from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore @@ -89,7 +90,7 @@ class FederationSenderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) root_resource = create_resource_tree(resources, NoResource()) @@ -121,6 +122,13 @@ class FederationSenderServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 0f700ee786..5a164a7a95 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -29,6 +29,7 @@ from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, ) from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -131,7 +132,7 @@ class FrontendProxyServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) @@ -172,6 +173,13 @@ class FrontendProxyServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index caccbaa814..714f98a3e0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -34,7 +34,7 @@ from synapse.module_api import ModuleApi from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite -from synapse.metrics import register_memory_metrics +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ check_requirements @@ -230,7 +230,7 @@ class SynapseHomeServer(HomeServer): resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self) if name == "metrics" and self.get_config().enable_metrics: - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) if name == "replication": resources[REPLICATION_PREFIX] = ReplicationRestResource(self) @@ -263,6 +263,13 @@ class SynapseHomeServer(HomeServer): reactor.addSystemEventTrigger( "before", "shutdown", server_listener.stopListening, ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -362,8 +369,6 @@ def setup(config_options): hs.get_datastore().start_doing_background_updates() hs.get_federation_client().start_get_pdu_cache() - register_memory_metrics(hs) - reactor.callWhenRunning(start) return hs @@ -434,6 +439,10 @@ def run(hs): total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() stats["total_nonbridged_users"] = total_nonbridged_users + daily_user_type_results = yield hs.get_datastore().count_daily_user_type() + for name, count in daily_user_type_results.iteritems(): + stats["daily_user_type_" + name] = count + room_count = yield hs.get_datastore().get_room_count() stats["total_room_count"] = room_count diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 9c93195f0a..006bba80a8 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -73,7 +74,7 @@ class MediaRepositoryServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "media": media_repo = self.get_media_repository_resource() resources.update({ @@ -114,6 +115,13 @@ class MediaRepositoryServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 3912eae48c..64df47f9cc 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,6 +23,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -92,7 +93,7 @@ class PusherServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) root_resource = create_resource_tree(resources, NoResource()) @@ -124,6 +125,13 @@ class PusherServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c6294a7a0c..6808d6d3e0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -257,7 +258,7 @@ class SynchrotronServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) sync.register_servlets(self, resource) @@ -301,6 +302,13 @@ class SynchrotronServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 712dfa870e..56ae086128 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -171,6 +171,10 @@ def main(): if cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) + cache_factors = config.get("synctl_cache_factors", {}) + for cache_name, factor in cache_factors.iteritems(): + os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) + worker_configfiles = [] if options.worker: start_stop_synapse = False diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 53eb3474da..ada1c13cec 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -105,7 +106,7 @@ class UserDirectoryServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) user_directory.register_servlets(self, resource) @@ -146,6 +147,13 @@ class UserDirectoryServer(HomeServer): globals={"hs": self}, ) ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "collect_metrics is not enabled!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 5fdb579723..d1c598622a 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -292,4 +292,8 @@ class ApplicationService(object): return self.rate_limited def __str__(self): - return "ApplicationService: %s" % (self.__dict__,) + # copy dictionary and redact token fields so they don't get logged + dict_copy = self.__dict__.copy() + dict_copy["token"] = "<redacted>" + dict_copy["hs_token"] = "<redacted>" + return "ApplicationService: %s" % (dict_copy,) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 00efff1464..47251fb6ad 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID import logging import urllib +from prometheus_client import Counter + logger = logging.getLogger(__name__) +sent_transactions_counter = Counter( + "synapse_appservice_api_sent_transactions", + "Number of /transactions/ requests sent", + ["service"] +) + +failed_transactions_counter = Counter( + "synapse_appservice_api_failed_transactions", + "Number of /transactions/ requests that failed to send", + ["service"] +) + +sent_events_counter = Counter( + "synapse_appservice_api_sent_events", + "Number of events sent to the AS", + ["service"] +) HOUR_IN_MS = 60 * 60 * 1000 @@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) + sent_transactions_counter.labels(service.id).inc() + sent_events_counter.labels(service.id).inc(len(events)) defer.returnValue(True) return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: logger.warning("push_bulk to %s threw exception %s", uri, ex) + failed_transactions_counter.labels(service.id).inc() defer.returnValue(False) def _serialize(self, events): diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index ddcd305f4c..e22c731aad 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -18,6 +18,9 @@ from ._base import Config DEFAULT_CONFIG = """\ # User Consent configuration # +# for detailed instructions, see +# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md +# # Parts of this section are required if enabling the 'consent' resource under # 'listeners', in particular 'template_dir' and 'version'. # @@ -32,7 +35,8 @@ DEFAULT_CONFIG = """\ # # 'server_notice_content', if enabled, will send a user a "Server Notice" # asking them to consent to the privacy policy. The 'server_notices' section -# must also be configured for this to work. +# must also be configured for this to work. Notices will *not* be sent to +# guest users unless 'send_server_notice_to_guests' is set to true. # # 'block_events_error', if set, will block any attempts to send events # until the user consents to the privacy policy. The value of the setting is @@ -46,6 +50,7 @@ DEFAULT_CONFIG = """\ # body: >- # To continue using this homeserver you must review and agree to the # terms and conditions at %(consent_uri)s +# send_server_notice_to_guests: True # block_events_error: >- # To continue using this homeserver you must review and agree to the # terms and conditions at %(consent_uri)s @@ -60,6 +65,7 @@ class ConsentConfig(Config): self.user_consent_version = None self.user_consent_template_dir = None self.user_consent_server_notice_content = None + self.user_consent_server_notice_to_guests = False self.block_events_without_consent_error = None def read_config(self, config): @@ -74,6 +80,9 @@ class ConsentConfig(Config): self.block_events_without_consent_error = consent_config.get( "block_events_error", ) + self.user_consent_server_notice_to_guests = bool(consent_config.get( + "send_server_notice_to_guests", False, + )) def default_config(self, **kwargs): return DEFAULT_CONFIG diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 25ea77738a..81ecf9778c 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -250,6 +250,9 @@ class ContentRepositoryConfig(Config): # - '192.168.0.0/16' # - '100.64.0.0/10' # - '169.254.0.0/16' + # - '::1/128' + # - 'fe80::/64' + # - 'fc00::/7' # # List of IP address CIDR ranges that the URL preview spider is allowed # to access even if they are specified in url_preview_ip_range_blacklist. diff --git a/synapse/config/server.py b/synapse/config/server.py index 8f0b6d1f28..968ecd9ea0 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -14,8 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from ._base import Config, ConfigError +logger = logging.Logger(__name__) + class ServerConfig(Config): @@ -138,6 +142,12 @@ class ServerConfig(Config): metrics_port = config.get("metrics_port") if metrics_port: + logger.warn( + ("The metrics_port configuration option is deprecated in Synapse 0.31 " + "in favour of a listener. Please see " + "http://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.rst" + " on how to configure the new listener.")) + self.listeners.append({ "port": metrics_port, "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")], diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 22ee0fc93f..9b17ef0a08 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -27,10 +27,12 @@ from synapse.util.metrics import Measure from twisted.internet import defer from signedjson.sign import ( - verify_signed_json, signature_ids, sign_json, encode_canonical_json + verify_signed_json, signature_ids, sign_json, encode_canonical_json, + SignatureVerifyException, ) from signedjson.key import ( - is_signing_algorithm_supported, decode_verify_key_bytes + is_signing_algorithm_supported, decode_verify_key_bytes, + encode_verify_key_base64, ) from unpaddedbase64 import decode_base64, encode_base64 @@ -56,7 +58,7 @@ Attributes: key_ids(set(str)): The set of key_ids to that could be used to verify the JSON object json_object(dict): The JSON object to verify. - deferred(twisted.internet.defer.Deferred): + deferred(Deferred[str, str, nacl.signing.VerifyKey]): A deferred (server_name, key_id, verify_key) tuple that resolves when a verify key has been fetched. The deferreds' callbacks are run with no logcontext. @@ -736,6 +738,17 @@ class Keyring(object): @defer.inlineCallbacks def _handle_key_deferred(verify_request): + """Waits for the key to become available, and then performs a verification + + Args: + verify_request (VerifyKeyRequest): + + Returns: + Deferred[None] + + Raises: + SynapseError if there was a problem performing the verification + """ server_name = verify_request.server_name try: with PreserveLoggingContext(): @@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request): )) try: verify_signed_json(json_object, server_name, verify_key) - except Exception: + except SignatureVerifyException as e: + logger.debug( + "Error verifying signature for %s:%s:%s with key %s: %s", + server_name, verify_key.alg, verify_key.version, + encode_verify_key_base64(verify_key), + str(e), + ) raise SynapseError( 401, - "Invalid signature for server %s with key %s:%s" % ( - server_name, verify_key.alg, verify_key.version + "Invalid signature for server %s with key %s:%s: %s" % ( + server_name, verify_key.alg, verify_key.version, str(e), ), Codes.UNAUTHORIZED, ) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index cd5627e36a..eaf9cecde6 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -471,14 +471,14 @@ def _check_power_levels(event, auth_events): ] old_list = current_state.content.get("users", {}) - for user in set(old_list.keys() + user_list.keys()): + for user in set(list(old_list) + list(user_list)): levels_to_check.append( (user, "users") ) old_list = current_state.content.get("events", {}) new_list = event.content.get("events", {}) - for ev_id in set(old_list.keys() + new_list.keys()): + for ev_id in set(list(old_list) + list(new_list)): levels_to_check.append( (ev_id, "events") ) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index c3ff85c49a..cb08da4984 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -146,7 +146,7 @@ class EventBase(object): return field in self._event_dict def items(self): - return self._event_dict.items() + return list(self._event_dict.items()) class FrozenEvent(EventBase): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6163f7c466..87a92f6ea9 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,20 +32,17 @@ from synapse.federation.federation_base import ( FederationBase, event_from_pdu_json, ) -import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination -logger = logging.getLogger(__name__) - +from prometheus_client import Counter -# synapse.federation.federation_client is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.client") +logger = logging.getLogger(__name__) -sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"]) PDU_RETRY_TIME_MS = 1 * 60 * 1000 @@ -108,7 +105,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc(query_type) + sent_queries_counter.labels(query_type).inc() return self.transport_layer.make_query( destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail, @@ -127,7 +124,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_device_keys") + sent_queries_counter.labels("client_device_keys").inc() return self.transport_layer.query_client_keys( destination, content, timeout ) @@ -137,7 +134,7 @@ class FederationClient(FederationBase): """Query the device keys for a list of user ids hosted on a remote server. """ - sent_queries_counter.inc("user_devices") + sent_queries_counter.labels("user_devices").inc() return self.transport_layer.query_user_devices( destination, user_id, timeout ) @@ -154,7 +151,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_one_time_keys") + sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys( destination, content, timeout ) @@ -394,7 +391,7 @@ class FederationClient(FederationBase): """ if return_local: seen_events = yield self.store.get_events(event_ids, allow_rejected=True) - signed_events = seen_events.values() + signed_events = list(seen_events.values()) else: seen_events = yield self.store.have_seen_events(event_ids) signed_events = [] @@ -592,7 +589,7 @@ class FederationClient(FederationBase): } valid_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, pdus.values(), + destination, list(pdus.values()), outlier=True, ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 247ddc89d5..2d420a58a2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -27,12 +27,13 @@ from synapse.federation.federation_base import ( from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction -import synapse.metrics from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache from synapse.util.logutils import log_function +from prometheus_client import Counter + from six import iteritems # when processing incoming transactions, we try to handle multiple rooms in @@ -41,17 +42,17 @@ TRANSACTION_CONCURRENCY_LIMIT = 10 logger = logging.getLogger(__name__) -# synapse.federation.federation_server is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.server") - -received_pdus_counter = metrics.register_counter("received_pdus") +received_pdus_counter = Counter("synapse_federation_server_received_pdus", "") -received_edus_counter = metrics.register_counter("received_edus") +received_edus_counter = Counter("synapse_federation_server_received_edus", "") -received_queries_counter = metrics.register_counter("received_queries", labels=["type"]) +received_queries_counter = Counter( + "synapse_federation_server_received_queries", "", ["type"] +) class FederationServer(FederationBase): + def __init__(self, hs): super(FederationServer, self).__init__(hs) @@ -131,7 +132,7 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - received_pdus_counter.inc_by(len(transaction.pdus)) + received_pdus_counter.inc(len(transaction.pdus)) pdus_by_room = {} @@ -292,7 +293,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_query_request(self, query_type, args): - received_queries_counter.inc(query_type) + received_queries_counter.labels(query_type).inc() resp = yield self.registry.on_query(query_type, args) defer.returnValue((200, resp)) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 0f0c687b37..1d5c0f3797 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -33,9 +33,9 @@ from .units import Edu from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure -import synapse.metrics +from synapse.metrics import LaterGauge -from blist import sorteddict +from sortedcontainers import SortedDict from collections import namedtuple import logging @@ -45,9 +45,6 @@ from six import itervalues, iteritems logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -58,29 +55,27 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = sorteddict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> user_id self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) + self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) - self.edus = sorteddict() # stream position -> Edu + self.edus = SortedDict() # stream position -> Edu - self.failures = sorteddict() # stream position -> (destination, Failure) + self.failures = SortedDict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() # stream position -> destination + self.device_messages = SortedDict() # stream position -> destination self.pos = 1 - self.pos_time = sorteddict() + self.pos_time = SortedDict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - metrics.register_callback( - queue_name + "_size", - lambda: len(queue), - ) + LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), + "", [], lambda: len(queue)) for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", @@ -103,7 +98,7 @@ class FederationRemoteSendQueue(object): now = self.clock.time_msec() keys = self.pos_time.keys() - time = keys.bisect_left(now - FIVE_MINUTES_AGO) + time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO) if not keys[:time]: return @@ -118,7 +113,7 @@ class FederationRemoteSendQueue(object): with Measure(self.clock, "send_queue._clear"): # Delete things out of presence maps keys = self.presence_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.presence_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.presence_changed[key] @@ -136,7 +131,7 @@ class FederationRemoteSendQueue(object): # Delete things out of keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_left(position_to_delete) + i = self.keyed_edu_changed.bisect_left(position_to_delete) for key in keys[:i]: del self.keyed_edu_changed[key] @@ -150,19 +145,19 @@ class FederationRemoteSendQueue(object): # Delete things out of edu map keys = self.edus.keys() - i = keys.bisect_left(position_to_delete) + i = self.edus.bisect_left(position_to_delete) for key in keys[:i]: del self.edus[key] # Delete things out of failure map keys = self.failures.keys() - i = keys.bisect_left(position_to_delete) + i = self.failures.bisect_left(position_to_delete) for key in keys[:i]: del self.failures[key] # Delete things out of device map keys = self.device_messages.keys() - i = keys.bisect_left(position_to_delete) + i = self.device_messages.bisect_left(position_to_delete) for key in keys[:i]: del self.device_messages[key] @@ -202,7 +197,7 @@ class FederationRemoteSendQueue(object): # We only want to send presence for our own users, so lets always just # filter here just in case. - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states)) self.presence_map.update({state.user_id: state for state in local_states}) self.presence_changed[pos] = [state.user_id for state in local_states] @@ -255,13 +250,12 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(federation_ack) # Fetch changed presence - keys = self.presence_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.presence_changed.bisect_right(from_token) + j = self.presence_changed.bisect_right(to_token) + 1 dest_user_ids = [ (pos, user_id) - for pos in keys[i:j] - for user_id in self.presence_changed[pos] + for pos, user_id_list in self.presence_changed.items()[i:j] + for user_id in user_id_list ] for (key, user_id) in dest_user_ids: @@ -270,13 +264,12 @@ class FederationRemoteSendQueue(object): ))) # Fetch changes keyed edus - keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 + i = self.keyed_edu_changed.bisect_right(from_token) + j = self.keyed_edu_changed.bisect_right(to_token) + 1 # We purposefully clobber based on the key here, python dict comprehensions # always use the last value, so this will correctly point to the last # stream position. - keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} + keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} for ((destination, edu_key), pos) in iteritems(keyed_edus): rows.append((pos, KeyedEduRow( @@ -285,19 +278,17 @@ class FederationRemoteSendQueue(object): ))) # Fetch changed edus - keys = self.edus.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - edus = ((k, self.edus[k]) for k in keys[i:j]) + i = self.edus.bisect_right(from_token) + j = self.edus.bisect_right(to_token) + 1 + edus = self.edus.items()[i:j] for (pos, edu) in edus: rows.append((pos, EduRow(edu))) # Fetch changed failures - keys = self.failures.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - failures = ((k, self.failures[k]) for k in keys[i:j]) + i = self.failures.bisect_right(from_token) + j = self.failures.bisect_right(to_token) + 1 + failures = self.failures.items()[i:j] for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -306,10 +297,9 @@ class FederationRemoteSendQueue(object): ))) # Fetch changed device messages - keys = self.device_messages.keys() - i = keys.bisect_right(from_token) - j = keys.bisect_right(to_token) + 1 - device_messages = {self.device_messages[k]: k for k in keys[i:j]} + i = self.device_messages.bisect_right(from_token) + j = self.device_messages.bisect_right(to_token) + 1 + device_messages = {v: k for k, v in self.device_messages.items()[i:j]} for (destination, pos) in iteritems(device_messages): rows.append((pos, DeviceRow( diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ded2b1871a..f0aeb5a0d3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,23 +26,25 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics +from synapse.metrics import LaterGauge +from synapse.metrics import ( + sent_edus_counter, + sent_transactions_counter, + events_processed_counter, +) + +from prometheus_client import Counter + +from six import itervalues import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client") -sent_pdus_destination_dist = client_metrics.register_distribution( - "sent_pdu_destinations" +sent_pdus_destination_dist = Counter( + "synapse_federation_transaction_queue_sent_pdu_destinations", "" ) -sent_edus_counter = client_metrics.register_counter("sent_edus") - -sent_transactions_counter = client_metrics.register_counter("sent_transactions") - -events_processed_counter = client_metrics.register_counter("events_processed") class TransactionQueue(object): @@ -69,8 +71,10 @@ class TransactionQueue(object): # done self.pending_transactions = {} - metrics.register_callback( - "pending_destinations", + LaterGauge( + "synapse_federation_transaction_queue_pending_destinations", + "", + [], lambda: len(self.pending_transactions), ) @@ -94,12 +98,16 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} - metrics.register_callback( - "pending_pdus", + LaterGauge( + "synapse_federation_transaction_queue_pending_pdus", + "", + [], lambda: sum(map(len, pdus.values())), ) - metrics.register_callback( - "pending_edus", + LaterGauge( + "synapse_federation_transaction_queue_pending_edus", + "", + [], lambda: ( sum(map(len, edus.values())) + sum(map(len, presence.values())) @@ -228,7 +236,7 @@ class TransactionQueue(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ logcontext.run_in_background(handle_room_events, evs) - for evs in events_by_room.itervalues() + for evs in itervalues(events_by_room) ], consumeErrors=True )) @@ -241,18 +249,15 @@ class TransactionQueue(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_lag.set( - now - ts, "federation_sender", - ) - synapse.metrics.event_processing_last_ts.set( - ts, "federation_sender", - ) + synapse.metrics.event_processing_lag.labels( + "federation_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "federation_sender").set(ts) - events_processed_counter.inc_by(len(events)) + events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_positions.set( - next_token, "federation_sender", - ) + synapse.metrics.event_processing_positions.labels( + "federation_sender").set(next_token) finally: self._is_processing = False @@ -275,7 +280,7 @@ class TransactionQueue(object): if not destinations: return - sent_pdus_destination_dist.inc_by(len(destinations)) + sent_pdus_destination_dist.inc(len(destinations)) for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( @@ -322,7 +327,7 @@ class TransactionQueue(object): if not states_map: break - yield self._process_presence_inner(states_map.values()) + yield self._process_presence_inner(list(states_map.values())) except Exception: logger.exception("Error sending presence states to servers") finally: diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e089e66fde..2d1db0c245 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -114,14 +114,14 @@ class BaseHandler(object): if guest_access != "can_join": if context: current_state = yield self.store.get_events( - context.current_state_ids.values() + list(context.current_state_ids.values()) ) else: current_state = yield self.state_handler.get_current_state( event.room_id ) - current_state = current_state.values() + current_state = list(current_state.values()) logger.info("maybe_kick_guest_users %r", current_state) yield self.kick_guest_users(current_state) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index b596f098fd..1c29c43a83 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,20 +15,21 @@ from twisted.internet import defer +from six import itervalues + import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure from synapse.util.logcontext import ( make_deferred_yieldable, run_in_background, ) +from prometheus_client import Counter import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -events_processed_counter = metrics.register_counter("events_processed") +events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "") def log_failure(failure): @@ -120,7 +121,7 @@ class ApplicationServicesHandler(object): yield make_deferred_yieldable(defer.gatherResults([ run_in_background(handle_room_events, evs) - for evs in events_by_room.itervalues() + for evs in itervalues(events_by_room) ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) @@ -128,18 +129,15 @@ class ApplicationServicesHandler(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_positions.set( - upper_bound, "appservice_sender", - ) + synapse.metrics.event_processing_positions.labels( + "appservice_sender").set(upper_bound) - events_processed_counter.inc_by(len(events)) + events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_lag.set( - now - ts, "appservice_sender", - ) - synapse.metrics.event_processing_last_ts.set( - ts, "appservice_sender", - ) + synapse.metrics.event_processing_lag.labels( + "appservice_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "appservice_sender").set(ts) finally: self.is_processing = False diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index c058b7cabb..912136534d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -249,7 +249,7 @@ class AuthHandler(BaseHandler): errordict = e.error_dict() for f in flows: - if len(set(f) - set(creds.keys())) == 0: + if len(set(f) - set(creds)) == 0: # it's very useful to know what args are stored, but this can # include the password in the case of registering, so only log # the keys (confusingly, clientdict may contain a password @@ -257,12 +257,12 @@ class AuthHandler(BaseHandler): # and is not sensitive). logger.info( "Auth completed with creds: %r. Client dict has keys: %r", - creds, clientdict.keys() + creds, list(clientdict) ) defer.returnValue((creds, clientdict, session['id'])) ret = self._auth_dict_for_flows(flows, session) - ret['completed'] = creds.keys() + ret['completed'] = list(creds) ret.update(errordict) raise InteractiveAuthIncompleteError( ret, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f7457a7082..11c6fb3657 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -26,6 +26,8 @@ from ._base import BaseHandler import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -112,7 +114,7 @@ class DeviceHandler(BaseHandler): user_id, device_id=None ) - devices = device_map.values() + devices = list(device_map.values()) for device in devices: _update_device_from_client_ips(device, ips) @@ -185,7 +187,7 @@ class DeviceHandler(BaseHandler): defer.Deferred: """ device_map = yield self.store.get_devices_by_user(user_id) - device_ids = device_map.keys() + device_ids = list(device_map) if except_device_id is not None: device_ids = [d for d in device_ids if d != except_device_id] yield self.delete_devices(user_id, device_ids) @@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler): # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler): # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: if state_key != user_id: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 25aec624af..8a2d177539 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,6 +19,7 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer +from six import iteritems from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, @@ -92,7 +93,7 @@ class E2eKeysHandler(object): remote_queries_not_in_cache = {} if remote_queries: query_list = [] - for user_id, device_ids in remote_queries.iteritems(): + for user_id, device_ids in iteritems(remote_queries): if device_ids: query_list.extend((user_id, device_id) for device_id in device_ids) else: @@ -103,9 +104,9 @@ class E2eKeysHandler(object): query_list ) ) - for user_id, devices in remote_results.iteritems(): + for user_id, devices in iteritems(remote_results): user_devices = results.setdefault(user_id, {}) - for device_id, device in devices.iteritems(): + for device_id, device in iteritems(devices): keys = device.get("keys", None) device_display_name = device.get("device_display_name", None) if keys: @@ -250,9 +251,9 @@ class E2eKeysHandler(object): "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2c72beff2e..495ac4c648 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -24,6 +24,7 @@ from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json import six from six.moves import http_client +from six import iteritems from twisted.internet import defer from unpaddedbase64 import decode_base64 @@ -51,7 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.distributor import user_joined_room - logger = logging.getLogger(__name__) @@ -479,8 +479,8 @@ class FederationHandler(BaseHandler): # to get all state ids that we're interested in. event_map = yield self.store.get_events([ e_id - for key_to_eid in event_to_state_ids.itervalues() - for key, e_id in key_to_eid.iteritems() + for key_to_eid in list(event_to_state_ids.values()) + for key, e_id in key_to_eid.items() if key[0] != EventTypes.Member or check_match(key[1]) ]) @@ -1148,13 +1148,13 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - state_ids = context.prev_state_ids.values() + state_ids = list(context.prev_state_ids.values()) auth_chain = yield self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(context.prev_state_ids.values()) + state = yield self.store.get_events(list(context.prev_state_ids.values())) defer.returnValue({ - "state": state.values(), + "state": list(state.values()), "auth_chain": auth_chain, }) @@ -1388,7 +1388,7 @@ class FederationHandler(BaseHandler): ) if state_groups: - _, state = state_groups.items().pop() + _, state = list(iteritems(state_groups)).pop() results = { (e.type, e.state_key): e for e in state } @@ -1404,7 +1404,7 @@ class FederationHandler(BaseHandler): else: del results[(event.type, event.state_key)] - res = results.values() + res = list(results.values()) for event in res: # We sign these again because there was a bug where we # incorrectly signed things the first time round @@ -1445,7 +1445,7 @@ class FederationHandler(BaseHandler): else: results.pop((event.type, event.state_key), None) - defer.returnValue(results.values()) + defer.returnValue(list(results.values())) else: defer.returnValue([]) @@ -1794,6 +1794,10 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) + missing_events = yield self._filter_events_for_server( + origin, room_id, missing_events, + ) + defer.returnValue(missing_events) @defer.inlineCallbacks @@ -1914,7 +1918,7 @@ class FederationHandler(BaseHandler): }) new_state = self.state_handler.resolve_events( - [local_view.values(), remote_view.values()], + [list(local_view.values()), list(remote_view.values())], event ) @@ -2034,7 +2038,7 @@ class FederationHandler(BaseHandler): this will not be included in the current_state in the context. """ state_updates = { - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) if k != event_key } context.current_state_ids = dict(context.current_state_ids) @@ -2044,7 +2048,7 @@ class FederationHandler(BaseHandler): context.delta_ids.update(state_updates) context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) }) context.state_group = yield self.store.store_state_group( event.event_id, @@ -2096,7 +2100,7 @@ class FederationHandler(BaseHandler): def get_next(it, opt=None): try: - return it.next() + return next(it) except Exception: return opt diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 977993e7d4..dcae083734 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -15,6 +15,7 @@ # limitations under the License. from twisted.internet import defer +from six import iteritems from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id @@ -449,7 +450,7 @@ class GroupsLocalHandler(object): results = {} failed_results = [] - for destination, dest_user_ids in destinations.iteritems(): + for destination, dest_user_ids in iteritems(destinations): try: r = yield self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c32b9bcae4..1cb81b6cf8 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -19,6 +19,7 @@ import sys from canonicaljson import encode_canonical_json import six +from six import string_types, itervalues, iteritems from twisted.internet import defer, reactor from twisted.internet.defer import succeed from twisted.python.failure import Failure @@ -402,7 +403,7 @@ class MessageHandler(BaseHandler): "avatar_url": profile.avatar_url, "display_name": profile.display_name, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) }) @@ -571,6 +572,9 @@ class EventCreationHandler(object): u = yield self.store.get_user_by_id(user_id) assert u is not None + if u["appservice_id"] is not None: + # users registered by an appservice are exempt + return if u["consent_version"] == self.config.user_consent_version: return @@ -667,7 +671,7 @@ class EventCreationHandler(object): spam_error = self.spam_checker.check_event_for_spam(event) if spam_error: - if not isinstance(spam_error, basestring): + if not isinstance(spam_error, string_types): spam_error = "Spam is not permitted here" raise SynapseError( 403, spam_error, Codes.FORBIDDEN @@ -881,7 +885,7 @@ class EventCreationHandler(object): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.iteritems() + for k, e_id in iteritems(context.current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -895,7 +899,7 @@ class EventCreationHandler(object): "content": e.content, "sender": e.sender, } - for e in state_to_include.itervalues() + for e in itervalues(state_to_include) ] invitee = UserID.from_string(event.state_key) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 500a131874..7fe568132f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,6 +25,8 @@ The methods that define policy are: from twisted.internet import defer, reactor from contextlib import contextmanager +from six import itervalues, iteritems + from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState @@ -36,27 +38,29 @@ from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id -import synapse.metrics +from synapse.metrics import LaterGauge import logging +from prometheus_client import Counter logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) -notified_presence_counter = metrics.register_counter("notified_presence") -federation_presence_out_counter = metrics.register_counter("federation_presence_out") -presence_updates_counter = metrics.register_counter("presence_updates") -timers_fired_counter = metrics.register_counter("timers_fired") -federation_presence_counter = metrics.register_counter("federation_presence") -bump_active_time_counter = metrics.register_counter("bump_active_time") +notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") +federation_presence_out_counter = Counter( + "synapse_handler_presence_federation_presence_out", "") +presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") +timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") +federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "") +bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "") -get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) +get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) -notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) -state_transition_counter = metrics.register_counter( - "state_transition", labels=["from", "to"] +notify_reason_counter = Counter( + "synapse_handler_presence_notify_reason", "", ["reason"]) +state_transition_counter = Counter( + "synapse_handler_presence_state_transition", "", ["from", "to"] ) @@ -141,8 +145,9 @@ class PresenceHandler(object): for state in active_presence } - metrics.register_callback( - "user_to_current_state_size", lambda: len(self.user_to_current_state) + LaterGauge( + "synapse_handlers_presence_user_to_current_state_size", "", [], + lambda: len(self.user_to_current_state) ) now = self.clock.time_msec() @@ -212,7 +217,8 @@ class PresenceHandler(object): 60 * 1000, ) - metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer)) + LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], + lambda: len(self.wheel_timer)) @defer.inlineCallbacks def _on_shutdown(self): @@ -315,11 +321,11 @@ class PresenceHandler(object): # TODO: We should probably ensure there are no races hereafter - presence_updates_counter.inc_by(len(new_states)) + presence_updates_counter.inc(len(new_states)) if to_notify: - notified_presence_counter.inc_by(len(to_notify)) - yield self._persist_and_notify(to_notify.values()) + notified_presence_counter.inc(len(to_notify)) + yield self._persist_and_notify(list(to_notify.values())) self.unpersisted_users_changes |= set(s.user_id for s in new_states) self.unpersisted_users_changes -= set(to_notify.keys()) @@ -329,7 +335,7 @@ class PresenceHandler(object): if user_id not in to_notify } if to_federation_ping: - federation_presence_out_counter.inc_by(len(to_federation_ping)) + federation_presence_out_counter.inc(len(to_federation_ping)) self._push_to_remotes(to_federation_ping.values()) @@ -367,7 +373,7 @@ class PresenceHandler(object): for user_id in users_to_check ] - timers_fired_counter.inc_by(len(states)) + timers_fired_counter.inc(len(states)) changes = handle_timeouts( states, @@ -530,7 +536,7 @@ class PresenceHandler(object): prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, ) - for prev_state in prev_states.itervalues() + for prev_state in itervalues(prev_states) ]) self.external_process_last_updated_ms.pop(process_id, None) @@ -553,14 +559,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { user_id: UserPresenceState.default(user_id) @@ -656,7 +662,7 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace(**new_fields)) if updates: - federation_presence_counter.inc_by(len(updates)) + federation_presence_counter.inc(len(updates)) yield self._update_states(updates) @defer.inlineCallbacks @@ -681,7 +687,7 @@ class PresenceHandler(object): """ updates = yield self.current_state_for_users(target_user_ids) - updates = updates.values() + updates = list(updates.values()) for user_id in set(target_user_ids) - set(u.user_id for u in updates): updates.append(UserPresenceState.default(user_id)) @@ -747,11 +753,11 @@ class PresenceHandler(object): self._push_to_remotes([state]) else: user_ids = yield self.store.get_users_in_room(room_id) - user_ids = filter(self.is_mine_id, user_ids) + user_ids = list(filter(self.is_mine_id, user_ids)) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes(states.values()) + self._push_to_remotes(list(states.values())) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): @@ -931,28 +937,28 @@ def should_notify(old_state, new_state): return False if old_state.status_msg != new_state.status_msg: - notify_reason_counter.inc("status_msg_change") + notify_reason_counter.labels("status_msg_change").inc() return True if old_state.state != new_state.state: - notify_reason_counter.inc("state_change") - state_transition_counter.inc(old_state.state, new_state.state) + notify_reason_counter.labels("state_change").inc() + state_transition_counter.labels(old_state.state, new_state.state).inc() return True if old_state.state == PresenceState.ONLINE: if new_state.currently_active != old_state.currently_active: - notify_reason_counter.inc("current_active_change") + notify_reason_counter.labels("current_active_change").inc() return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive if not new_state.currently_active: - notify_reason_counter.inc("last_active_change_online") + notify_reason_counter.labels("last_active_change_online").inc() return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. - notify_reason_counter.inc("last_active_change_not_online") + notify_reason_counter.labels("last_active_change_not_online").inc() return True return False @@ -1026,14 +1032,14 @@ class PresenceEventSource(object): if changed is not None and len(changed) < 500: # For small deltas, its quicker to get all changes and then # work out if we share a room or they're in our presence list - get_updates_counter.inc("stream") + get_updates_counter.labels("stream").inc() for other_user_id in changed: if other_user_id in users_interested_in: user_ids_changed.add(other_user_id) else: # Too many possible updates. Find all users we can see and check # if any of them have changed. - get_updates_counter.inc("full") + get_updates_counter.labels("full").inc() if from_key: user_ids_changed = stream_change_cache.get_entities_changed( @@ -1045,10 +1051,10 @@ class PresenceEventSource(object): updates = yield presence.current_state_for_users(user_ids_changed) if include_offline: - defer.returnValue((updates.values(), max_token)) + defer.returnValue((list(updates.values()), max_token)) else: defer.returnValue(([ - s for s in updates.itervalues() + s for s in itervalues(updates) if s.state != PresenceState.OFFLINE ], max_token)) @@ -1106,7 +1112,7 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): if new_state: changes[state.user_id] = new_state - return changes.values() + return list(changes.values()) def handle_timeout(state, is_mine, syncing_user_ids, now): @@ -1305,11 +1311,11 @@ def get_interested_remotes(store, states, state_handler): # hosts in those rooms. room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.iteritems(): + for room_id, states in iteritems(room_ids_to_states): hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.iteritems(): + for user_id, states in iteritems(users_to_states): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b5850db42f..2abd63ad05 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -455,7 +455,7 @@ class RoomContextHandler(BaseHandler): state = yield self.store.get_state_for_events( [last_event_id], None ) - results["state"] = state[last_event_id].values() + results["state"] = list(state[last_event_id].values()) results["start"] = now_token.copy_and_replace( "room_key", results["start"] diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5757bb7f8a..fc507cef36 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -15,6 +15,7 @@ from twisted.internet import defer +from six import iteritems from six.moves import range from ._base import BaseHandler @@ -307,7 +308,7 @@ class RoomListHandler(BaseHandler): ) event_map = yield self.store.get_events([ - event_id for key, event_id in current_state_ids.iteritems() + event_id for key, event_id in iteritems(current_state_ids) if key[0] in ( EventTypes.JoinRules, EventTypes.Name, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 82adfc8fdf..f930e939e8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -298,16 +298,6 @@ class RoomMemberHandler(object): is_blocked = yield self.store.is_room_blocked(room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - else: - # we don't allow people to reject invites to, or leave, the - # server notice room. - is_blocked = yield self._is_server_notice_room(room_id) - if is_blocked: - raise SynapseError( - http_client.FORBIDDEN, - "You cannot leave this room", - errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM, - ) if effective_membership_state == Membership.INVITE: # block any attempts to invite the server notices mxid @@ -383,6 +373,20 @@ class RoomMemberHandler(object): if same_sender and same_membership and same_content: defer.returnValue(old_state) + # we don't allow people to reject invites to the server notice + # room, but they can leave it once they are joined. + if ( + old_membership == Membership.INVITE and + effective_membership_state == Membership.LEAVE + ): + is_blocked = yield self._is_server_notice_room(room_id) + if is_blocked: + raise SynapseError( + http_client.FORBIDDEN, + "You cannot reject this invite", + errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM, + ) + is_host_in_room = yield self._is_host_in_room(current_state_ids) if effective_membership_state == Membership.JOIN: diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 9772ed1a0e..1eca26aa1e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -348,7 +348,7 @@ class SearchHandler(BaseHandler): rooms = set(e.room_id for e in allowed_events) for room_id in rooms: state = yield self.state_handler.get_current_state(room_id) - state_results[room_id] = state.values() + state_results[room_id] = list(state.values()) state_results.values() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 263e42dded..51ec727df0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,6 +28,8 @@ import collections import logging import itertools +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -275,7 +277,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -294,7 +296,7 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -325,7 +327,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) recents = yield filter_events_for_client( self.store, @@ -382,7 +384,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) loaded_recents = yield filter_events_for_client( self.store, @@ -441,6 +443,10 @@ class SyncHandler(object): Returns: A Deferred map from ((type, state_key)->Event) """ + # FIXME this claims to get the state at a stream position, but + # get_recent_events_for_room operates by topo ordering. This therefore + # does not reliably give you the state at the given stream position. + # (https://github.com/matrix-org/synapse/issues/3305) last_events, _ = yield self.store.get_recent_events_for_room( room_id, end_token=stream_position.room_key, limit=1, ) @@ -535,11 +541,11 @@ class SyncHandler(object): state = {} if state_ids: - state = yield self.store.get_events(state_ids.values()) + state = yield self.store.get_events(list(state_ids.values())) defer.returnValue({ (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state(state.values()) + for e in sync_config.filter_collection.filter_room_state(list(state.values())) }) @defer.inlineCallbacks @@ -888,7 +894,7 @@ class SyncHandler(object): presence.extend(states) # Deduplicate the presence entries so that there's at most one per user - presence = {p.user_id: p for p in presence}.values() + presence = list({p.user_id: p for p in presence}.values()) presence = sync_config.filter_collection.filter_presence( presence @@ -984,7 +990,7 @@ class SyncHandler(object): if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.itervalues() + joined_sync.timeline.events, itervalues(joined_sync.state) ) for event in it: if event.type == EventTypes.Member: @@ -1040,7 +1046,13 @@ class SyncHandler(object): Returns: Deferred(tuple): Returns a tuple of the form: - `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` + `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)` + + where: + room_entries is a list [RoomSyncResultBuilder] + invited_rooms is a list [InvitedSyncResult] + newly_joined rooms is a list[str] of room ids + newly_left_rooms is a list[str] of room ids """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token @@ -1062,7 +1074,7 @@ class SyncHandler(object): newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.iteritems(): + for room_id, events in iteritems(mem_change_events_by_room_id): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 2f73d664f9..a39f0f7343 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -22,6 +22,7 @@ from synapse.util.metrics import Measure from synapse.util.async import sleep from synapse.types import get_localpart_from_id +from six import iteritems logger = logging.getLogger(__name__) @@ -410,7 +411,7 @@ class UserDirectoryHandler(object): if change: users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): + for user_id, profile in iteritems(users_with_profile): yield self._handle_new_user(room_id, user_id, profile) else: users = yield self.store.get_users_in_public_due_to_room(room_id) diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 054372e179..58ef8d3ce4 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -13,6 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import re + from twisted.internet.defer import CancelledError from twisted.python import failure @@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout): value.trap(CancelledError) raise RequestTimedOutError() return value + + +ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') + + +def redact_uri(uri): + """Strips access tokens from the uri replaces with <redacted>""" + return ACCESS_TOKEN_RE.sub( + br'\1<redacted>\3', + uri + ) diff --git a/synapse/http/client.py b/synapse/http/client.py index 70a19d9b74..8064a84c5c 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -19,11 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) -from synapse.http import cancelled_to_request_timed_out_error +from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable -import synapse.metrics from synapse.http.endpoint import SpiderEndpoint from canonicaljson import encode_canonical_json @@ -42,6 +41,7 @@ from twisted.web._newclient import ResponseDone from six import StringIO +from prometheus_client import Counter import simplejson as json import logging import urllib @@ -49,16 +49,9 @@ import urllib logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) +incoming_responses_counter = Counter("synapse_http_client_responses", "", + ["method", "code"]) class SimpleHttpClient(object): @@ -95,9 +88,10 @@ class SimpleHttpClient(object): def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it - outgoing_requests_counter.inc(method) + outgoing_requests_counter.labels(method).inc() - logger.info("Sending request %s %s", method, uri) + # log request but strip `access_token` (AS requests for example include this) + logger.info("Sending request %s %s", method, redact_uri(uri)) try: request_deferred = self.agent.request( @@ -109,17 +103,17 @@ class SimpleHttpClient(object): ) response = yield make_deferred_yieldable(request_deferred) - incoming_responses_counter.inc(method, response.code) + incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", - method, uri, response.code + method, redact_uri(uri), response.code ) defer.returnValue(response) except Exception as e: - incoming_responses_counter.inc(method, "ERR") + incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", - method, uri, type(e).__name__, e.message + method, redact_uri(uri), type(e).__name__, e.message ) raise e diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index bf56e77c7a..993dc06e02 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -45,19 +45,15 @@ from six.moves.urllib import parse as urlparse from six import string_types +from prometheus_client import Counter + logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") -metrics = synapse.metrics.get_metrics_for(__name__) - -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", + "", ["method"]) +incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", + "", ["method", "code"]) MAX_LONG_RETRIES = 10 diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 5917c83958..dc06f6c443 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -16,137 +16,109 @@ import logging -import synapse.metrics +from prometheus_client.core import Counter, Histogram +from synapse.metrics import LaterGauge + from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for("synapse.http.server") # total number of responses served, split by method/servlet/tag -response_count = metrics.register_counter( - "response_count", - labels=["method", "servlet", "tag"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_requests", - "_response_time:count", - "_response_ru_utime:count", - "_response_ru_stime:count", - "_response_db_txn_count:count", - "_response_db_txn_duration:count", - ) - ) +response_count = Counter( + "synapse_http_server_response_count", "", ["method", "servlet", "tag"] ) -requests_counter = metrics.register_counter( - "requests_received", - labels=["method", "servlet", ], +requests_counter = Counter( + "synapse_http_server_requests_received", "", ["method", "servlet"] ) -outgoing_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], +outgoing_responses_counter = Counter( + "synapse_http_server_responses", "", ["method", "code"] ) -response_timer = metrics.register_counter( - "response_time_seconds", - labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_time:total", - ), +response_timer = Histogram( + "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"] ) -response_ru_utime = metrics.register_counter( - "response_ru_utime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_utime:total", - ), +response_ru_utime = Counter( + "synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"] ) -response_ru_stime = metrics.register_counter( - "response_ru_stime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_stime:total", - ), +response_ru_stime = Counter( + "synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"] ) -response_db_txn_count = metrics.register_counter( - "response_db_txn_count", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_count:total", - ), +response_db_txn_count = Counter( + "synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"] ) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request -response_db_txn_duration = metrics.register_counter( - "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_duration:total", - ), +response_db_txn_duration = Counter( + "synapse_http_server_response_db_txn_duration_seconds", + "", + ["method", "servlet", "tag"], ) # seconds spent waiting for a db connection, when processing this request -response_db_sched_duration = metrics.register_counter( - "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"] +response_db_sched_duration = Counter( + "synapse_http_server_response_db_sched_duration_seconds", + "", + ["method", "servlet", "tag"], ) # size in bytes of the response written -response_size = metrics.register_counter( - "response_size", labels=["method", "servlet", "tag"] +response_size = Counter( + "synapse_http_server_response_size", "", ["method", "servlet", "tag"] ) # In flight metrics are incremented while the requests are in flight, rather # than when the response was written. -in_flight_requests_ru_utime = metrics.register_counter( - "in_flight_requests_ru_utime_seconds", labels=["method", "servlet"], +in_flight_requests_ru_utime = Counter( + "synapse_http_server_in_flight_requests_ru_utime_seconds", + "", + ["method", "servlet"], ) -in_flight_requests_ru_stime = metrics.register_counter( - "in_flight_requests_ru_stime_seconds", labels=["method", "servlet"], +in_flight_requests_ru_stime = Counter( + "synapse_http_server_in_flight_requests_ru_stime_seconds", + "", + ["method", "servlet"], ) -in_flight_requests_db_txn_count = metrics.register_counter( - "in_flight_requests_db_txn_count", labels=["method", "servlet"], +in_flight_requests_db_txn_count = Counter( + "synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"] ) # seconds spent waiting for db txns, excluding scheduling time, when processing # this request -in_flight_requests_db_txn_duration = metrics.register_counter( - "in_flight_requests_db_txn_duration_seconds", labels=["method", "servlet"], +in_flight_requests_db_txn_duration = Counter( + "synapse_http_server_in_flight_requests_db_txn_duration_seconds", + "", + ["method", "servlet"], ) # seconds spent waiting for a db connection, when processing this request -in_flight_requests_db_sched_duration = metrics.register_counter( - "in_flight_requests_db_sched_duration_seconds", labels=["method", "servlet"] +in_flight_requests_db_sched_duration = Counter( + "synapse_http_server_in_flight_requests_db_sched_duration_seconds", + "", + ["method", "servlet"], ) - # The set of all in flight requests, set[RequestMetrics] _in_flight_requests = set() -def _collect_in_flight(): - """Called just before metrics are collected, so we use it to update all - the in flight request metrics - """ - - for rm in _in_flight_requests: - rm.update_metrics() - - -metrics.register_collector(_collect_in_flight) - - def _get_in_flight_counts(): """Returns a count of all in flight requests by (method, server_name) Returns: dict[tuple[str, str], int] """ + for rm in _in_flight_requests: + rm.update_metrics() # Map from (method, name) -> int, the number of in flight requests of that # type @@ -158,16 +130,17 @@ def _get_in_flight_counts(): return counts -metrics.register_callback( - "in_flight_requests_count", +LaterGauge( + "synapse_http_request_metrics_in_flight_requests_count", + "", + ["method", "servlet"], _get_in_flight_counts, - labels=["method", "servlet"] ) class RequestMetrics(object): - def start(self, time_msec, name, method): - self.start = time_msec + def start(self, time_sec, name, method): + self.start = time_sec self.start_context = LoggingContext.current_context() self.name = name self.method = method @@ -176,7 +149,7 @@ class RequestMetrics(object): _in_flight_requests.add(self) - def stop(self, time_msec, request): + def stop(self, time_sec, request): _in_flight_requests.discard(self) context = LoggingContext.current_context() @@ -192,34 +165,29 @@ class RequestMetrics(object): ) return - outgoing_responses_counter.inc(request.method, str(request.code)) + outgoing_responses_counter.labels(request.method, str(request.code)).inc() - response_count.inc(request.method, self.name, tag) + response_count.labels(request.method, self.name, tag).inc() - response_timer.inc_by( - time_msec - self.start, request.method, - self.name, tag + response_timer.labels(request.method, self.name, tag).observe( + time_sec - self.start ) ru_utime, ru_stime = context.get_resource_usage() - response_ru_utime.inc_by( - ru_utime, request.method, self.name, tag - ) - response_ru_stime.inc_by( - ru_stime, request.method, self.name, tag - ) - response_db_txn_count.inc_by( - context.db_txn_count, request.method, self.name, tag + response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime) + response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime) + response_db_txn_count.labels(request.method, self.name, tag).inc( + context.db_txn_count ) - response_db_txn_duration.inc_by( - context.db_txn_duration_ms / 1000., request.method, self.name, tag + response_db_txn_duration.labels(request.method, self.name, tag).inc( + context.db_txn_duration_sec ) - response_db_sched_duration.inc_by( - context.db_sched_duration_ms / 1000., request.method, self.name, tag + response_db_sched_duration.labels(request.method, self.name, tag).inc( + context.db_sched_duration_sec ) - response_size.inc_by(request.sentLength, request.method, self.name, tag) + response_size.labels(request.method, self.name, tag).inc(request.sentLength) # We always call this at the end to ensure that we update the metrics # regardless of whether a call to /metrics while the request was in @@ -229,27 +197,21 @@ class RequestMetrics(object): def update_metrics(self): """Updates the in flight metrics with values from this request. """ - diff = self._request_stats.update(self.start_context) - in_flight_requests_ru_utime.inc_by( - diff.ru_utime, self.method, self.name, - ) - - in_flight_requests_ru_stime.inc_by( - diff.ru_stime, self.method, self.name, - ) + in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) + in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) - in_flight_requests_db_txn_count.inc_by( - diff.db_txn_count, self.method, self.name, + in_flight_requests_db_txn_count.labels(self.method, self.name).inc( + diff.db_txn_count ) - in_flight_requests_db_txn_duration.inc_by( - diff.db_txn_duration_ms / 1000., self.method, self.name, + in_flight_requests_db_txn_duration.labels(self.method, self.name).inc( + diff.db_txn_duration_sec ) - in_flight_requests_db_sched_duration.inc_by( - diff.db_sched_duration_ms / 1000., self.method, self.name, + in_flight_requests_db_sched_duration.labels(self.method, self.name).inc( + diff.db_sched_duration_sec ) @@ -258,17 +220,21 @@ class _RequestStats(object): """ __slots__ = [ - "ru_utime", "ru_stime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "ru_utime", + "ru_stime", + "db_txn_count", + "db_txn_duration_sec", + "db_sched_duration_sec", ] - def __init__(self, ru_utime, ru_stime, db_txn_count, - db_txn_duration_ms, db_sched_duration_ms): + def __init__( + self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec + ): self.ru_utime = ru_utime self.ru_stime = ru_stime self.db_txn_count = db_txn_count - self.db_txn_duration_ms = db_txn_duration_ms - self.db_sched_duration_ms = db_sched_duration_ms + self.db_txn_duration_sec = db_txn_duration_sec + self.db_sched_duration_sec = db_sched_duration_sec @staticmethod def from_context(context): @@ -277,8 +243,8 @@ class _RequestStats(object): return _RequestStats( ru_utime, ru_stime, context.db_txn_count, - context.db_txn_duration_ms, - context.db_sched_duration_ms, + context.db_txn_duration_sec, + context.db_sched_duration_sec, ) def update(self, context): @@ -294,14 +260,14 @@ class _RequestStats(object): new.ru_utime - self.ru_utime, new.ru_stime - self.ru_stime, new.db_txn_count - self.db_txn_count, - new.db_txn_duration_ms - self.db_txn_duration_ms, - new.db_sched_duration_ms - self.db_sched_duration_ms, + new.db_txn_duration_sec - self.db_txn_duration_sec, + new.db_sched_duration_sec - self.db_sched_duration_sec, ) self.ru_utime = new.ru_utime self.ru_stime = new.ru_stime self.db_txn_count = new.db_txn_count - self.db_txn_duration_ms = new.db_txn_duration_ms - self.db_sched_duration_ms = new.db_sched_duration_ms + self.db_txn_duration_sec = new.db_txn_duration_sec + self.db_sched_duration_sec = new.db_sched_duration_sec return diff diff --git a/synapse/http/server.py b/synapse/http/server.py index faf700851a..bc09b8b2be 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -210,8 +210,8 @@ def wrap_request_handler_with_logging(h): # dispatching to the handler, so that the handler # can update the servlet name in the request # metrics - requests_counter.inc(request.method, - request.request_metrics.name) + requests_counter.labels(request.method, + request.request_metrics.name).inc() yield d return wrapped_request_handler diff --git a/synapse/http/site.py b/synapse/http/site.py index b608504225..2664006f8c 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,18 +14,16 @@ import contextlib import logging -import re import time from twisted.web.server import Site, Request +from synapse.http import redact_uri from synapse.http.request_metrics import RequestMetrics from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) -ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') - _next_request_seq = 0 @@ -56,7 +54,7 @@ class SynapseRequest(Request): def __repr__(self): # We overwrite this so that we don't log ``access_token`` - return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( self.__class__.__name__, id(self), self.method, @@ -69,10 +67,7 @@ class SynapseRequest(Request): return "%s-%i" % (self.method, self.request_seq) def get_redacted_uri(self): - return ACCESS_TOKEN_RE.sub( - br'\1<redacted>\3', - self.uri - ) + return redact_uri(self.uri) def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] @@ -83,7 +78,7 @@ class SynapseRequest(Request): return Request.render(self, resrc) def _started_processing(self, servlet_name): - self.start_time = int(time.time() * 1000) + self.start_time = time.time() self.request_metrics = RequestMetrics() self.request_metrics.start( self.start_time, name=servlet_name, method=self.method, @@ -102,26 +97,26 @@ class SynapseRequest(Request): context = LoggingContext.current_context() ru_utime, ru_stime = context.get_resource_usage() db_txn_count = context.db_txn_count - db_txn_duration_ms = context.db_txn_duration_ms - db_sched_duration_ms = context.db_sched_duration_ms + db_txn_duration_sec = context.db_txn_duration_sec + db_sched_duration_sec = context.db_sched_duration_sec except Exception: ru_utime, ru_stime = (0, 0) - db_txn_count, db_txn_duration_ms = (0, 0) + db_txn_count, db_txn_duration_sec = (0, 0) - end_time = int(time.time() * 1000) + end_time = time.time() self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)" + " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" " %sB %s \"%s %s %s\" \"%s\"", self.getClientIP(), self.site.site_tag, self.authenticated_entity, end_time - self.start_time, - int(ru_utime * 1000), - int(ru_stime * 1000), - db_sched_duration_ms, - db_txn_duration_ms, + ru_utime, + ru_stime, + db_sched_duration_sec, + db_txn_duration_sec, int(db_txn_count), self.sentLength, self.code, diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index e3b831db67..429e79c472 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -17,165 +17,178 @@ import logging import functools import time import gc +import os import platform +import attr -from twisted.internet import reactor +from prometheus_client import Gauge, Histogram, Counter +from prometheus_client.core import GaugeMetricFamily, REGISTRY -from .metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, GaugeMetric, -) -from .process_collector import register_process_collector +from twisted.internet import reactor logger = logging.getLogger(__name__) - -running_on_pypy = platform.python_implementation() == 'PyPy' +running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] +all_gauges = {} + +HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") + + +class RegistryProxy(object): + + @staticmethod + def collect(): + for metric in REGISTRY.collect(): + if not metric.name.startswith("__"): + yield metric -class Metrics(object): - """ A single Metrics object gives a (mutable) slice view of the all_metrics - dict, allowing callers to easily register new metrics that are namespaced - nicely.""" +@attr.s(hash=True) +class LaterGauge(object): - def __init__(self, name): - self.name_prefix = name + name = attr.ib() + desc = attr.ib() + labels = attr.ib(hash=False) + caller = attr.ib() - def make_subspace(self, name): - return Metrics("%s_%s" % (self.name_prefix, name)) + def collect(self): - def register_collector(self, func): - all_collectors.append(func) + g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) - def _register(self, metric_class, name, *args, **kwargs): - full_name = "%s_%s" % (self.name_prefix, name) + try: + calls = self.caller() + except Exception: + logger.exception( + "Exception running callback for LaterGuage(%s)", + self.name, + ) + yield g + return - metric = metric_class(full_name, *args, **kwargs) + if isinstance(calls, dict): + for k, v in calls.items(): + g.add_metric(k, v) + else: + g.add_metric([], calls) - all_metrics.append(metric) - return metric + yield g - def register_counter(self, *args, **kwargs): - """ - Returns: - CounterMetric - """ - return self._register(CounterMetric, *args, **kwargs) + def __attrs_post_init__(self): + self._register() - def register_gauge(self, *args, **kwargs): - """ - Returns: - GaugeMetric - """ - return self._register(GaugeMetric, *args, **kwargs) + def _register(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) - def register_callback(self, *args, **kwargs): - """ - Returns: - CallbackMetric - """ - return self._register(CallbackMetric, *args, **kwargs) + REGISTRY.register(self) + all_gauges[self.name] = self - def register_distribution(self, *args, **kwargs): - """ - Returns: - DistributionMetric - """ - return self._register(DistributionMetric, *args, **kwargs) - def register_cache(self, *args, **kwargs): - """ - Returns: - CacheMetric - """ - return self._register(CacheMetric, *args, **kwargs) +# +# Detailed CPU metrics +# + +class CPUMetrics(object): + def __init__(self): + ticks_per_sec = 100 + try: + # Try and get the system config + ticks_per_sec = os.sysconf('SC_CLK_TCK') + except (ValueError, TypeError, AttributeError): + pass -def register_memory_metrics(hs): - try: - import psutil - process = psutil.Process() - process.memory_info().rss - except (ImportError, AttributeError): - logger.warn( - "psutil is not installed or incorrect version." - " Disabling memory metrics." - ) - return - metric = MemoryUsageMetric(hs, psutil) - all_metrics.append(metric) + self.ticks_per_sec = ticks_per_sec + def collect(self): + if not HAVE_PROC_SELF_STAT: + return -def get_metrics_for(pkg_name): - """ Returns a Metrics instance for conveniently creating metrics - namespaced with the given name prefix. """ + with open("/proc/self/stat") as s: + line = s.read() + raw_stats = line.split(") ", 1)[1].split(" ") - # Convert a "package.name" to "package_name" because Prometheus doesn't - # let us use . in metric names - return Metrics(pkg_name.replace(".", "_")) + user = GaugeMetricFamily("process_cpu_user_seconds_total", "") + user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec) + yield user + sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") + sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec) + yield sys -def render_all(): - strs = [] - for collector in all_collectors: - collector() +REGISTRY.register(CPUMetrics()) - for metric in all_metrics: - try: - strs += metric.render() - except Exception: - strs += ["# FAILED to render"] - logger.exception("Failed to render metric") +# +# Python GC metrics +# + +gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) +gc_time = Histogram( + "python_gc_time", + "Time taken to GC (sec)", + ["gen"], + buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, + 5.00, 7.50, 15.00, 30.00, 45.00, 60.00], +) - strs.append("") # to generate a final CRLF - return "\n".join(strs) +class GCCounts(object): + def collect(self): + cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"]) + for n, m in enumerate(gc.get_count()): + cm.add_metric([str(n)], m) -register_process_collector(get_metrics_for("process")) + yield cm -python_metrics = get_metrics_for("python") +REGISTRY.register(GCCounts()) -gc_time = python_metrics.register_distribution("gc_time", labels=["gen"]) -gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"]) -python_metrics.register_callback( - "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] +# +# Twisted reactor metrics +# + +tick_time = Histogram( + "python_twisted_reactor_tick_time", + "Tick time of the Twisted reactor (sec)", + buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], +) +pending_calls_metric = Histogram( + "python_twisted_reactor_pending_calls", + "Pending calls", + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], ) -reactor_metrics = get_metrics_for("python.twisted.reactor") -tick_time = reactor_metrics.register_distribution("tick_time") -pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +# +# Federation Metrics +# + +sent_edus_counter = Counter("synapse_federation_client_sent_edus", "") + +sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") -synapse_metrics = get_metrics_for("synapse") +events_processed_counter = Counter("synapse_federation_client_events_processed", "") # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. -event_processing_positions = synapse_metrics.register_gauge( - "event_processing_positions", labels=["name"], -) +event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) # Used to track the current max events stream position -event_persisted_position = synapse_metrics.register_gauge( - "event_persisted_position", -) +event_persisted_position = Gauge("synapse_event_persisted_position", "") # Used to track the received_ts of the last event processed by various # components -event_processing_last_ts = synapse_metrics.register_gauge( - "event_processing_last_ts", labels=["name"], -) +event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) # Used to track the lag processing events. This is the time difference # between the last processed event's received_ts and the time it was # finished being processed. -event_processing_lag = synapse_metrics.register_gauge( - "event_processing_lag", labels=["name"], -) +event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) def runUntilCurrentTimer(func): @@ -197,17 +210,17 @@ def runUntilCurrentTimer(func): num_pending += 1 num_pending += len(reactor.threadCallQueue) - start = time.time() * 1000 + start = time.time() ret = func(*args, **kwargs) - end = time.time() * 1000 + end = time.time() # record the amount of wallclock time spent running pending calls. # This is a proxy for the actual amount of time between reactor polls, # since about 25% of time is actually spent running things triggered by # I/O events, but that is harder to capture without rewriting half the # reactor. - tick_time.inc_by(end - start) - pending_calls_metric.inc_by(num_pending) + tick_time.observe(end - start) + pending_calls_metric.observe(num_pending) if running_on_pypy: return ret @@ -220,12 +233,12 @@ def runUntilCurrentTimer(func): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) - start = time.time() * 1000 + start = time.time() unreachable = gc.collect(i) - end = time.time() * 1000 + end = time.time() - gc_time.inc_by(end - start, i) - gc_unreachable.inc_by(unreachable, i) + gc_time.labels(i).observe(end - start) + gc_unreachable.labels(i).set(unreachable) return ret diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py deleted file mode 100644 index f421e7a93f..0000000000 --- a/synapse/metrics/metric.py +++ /dev/null @@ -1,328 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from itertools import chain -import logging -import re - -logger = logging.getLogger(__name__) - - -def flatten(items): - """Flatten a list of lists - - Args: - items: iterable[iterable[X]] - - Returns: - list[X]: flattened list - """ - return list(chain.from_iterable(items)) - - -class BaseMetric(object): - """Base class for metrics which report a single value per label set - """ - - def __init__(self, name, labels=[], alternative_names=[]): - """ - Args: - name (str): principal name for this metric - labels (list(str)): names of the labels which will be reported - for this metric - alternative_names (iterable(str)): list of alternative names for - this metric. This can be useful to provide a migration path - when renaming metrics. - """ - self._names = [name] + list(alternative_names) - self.labels = labels # OK not to clone as we never write it - - def dimension(self): - return len(self.labels) - - def is_scalar(self): - return not len(self.labels) - - def _render_labelvalue(self, value): - return '"%s"' % (_escape_label_value(value),) - - def _render_key(self, values): - if self.is_scalar(): - return "" - return "{%s}" % ( - ",".join(["%s=%s" % (k, self._render_labelvalue(v)) - for k, v in zip(self.labels, values)]) - ) - - def _render_for_labels(self, label_values, value): - """Render this metric for a single set of labels - - Args: - label_values (list[object]): values for each of the labels, - (which get stringified). - value: value of the metric at with these labels - - Returns: - iterable[str]: rendered metric - """ - rendered_labels = self._render_key(label_values) - return ( - "%s%s %.12g" % (name, rendered_labels, value) - for name in self._names - ) - - def render(self): - """Render this metric - - Each metric is rendered as: - - name{label1="val1",label2="val2"} value - - https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details - - Returns: - iterable[str]: rendered metrics - """ - raise NotImplementedError() - - -class CounterMetric(BaseMetric): - """The simplest kind of metric; one that stores a monotonically-increasing - value that counts events or running totals. - - Example use cases for Counters: - - Number of requests processed - - Number of items that were inserted into a queue - - Total amount of data that a system has processed - Counters can only go up (and be reset when the process restarts). - """ - - def __init__(self, *args, **kwargs): - super(CounterMetric, self).__init__(*args, **kwargs) - - # dict[list[str]]: value for each set of label values. the keys are the - # label values, in the same order as the labels in self.labels. - # - # (if the metric is a scalar, the (single) key is the empty tuple). - self.counts = {} - - # Scalar metrics are never empty - if self.is_scalar(): - self.counts[()] = 0. - - def inc_by(self, incr, *values): - if len(values) != self.dimension(): - raise ValueError( - "Expected as many values to inc() as labels (%d)" % (self.dimension()) - ) - - # TODO: should assert that the tag values are all strings - - if values not in self.counts: - self.counts[values] = incr - else: - self.counts[values] += incr - - def inc(self, *values): - self.inc_by(1, *values) - - def render(self): - return flatten( - self._render_for_labels(k, self.counts[k]) - for k in sorted(self.counts.keys()) - ) - - -class GaugeMetric(BaseMetric): - """A metric that can go up or down - """ - - def __init__(self, *args, **kwargs): - super(GaugeMetric, self).__init__(*args, **kwargs) - - # dict[list[str]]: value for each set of label values. the keys are the - # label values, in the same order as the labels in self.labels. - # - # (if the metric is a scalar, the (single) key is the empty tuple). - self.guages = {} - - def set(self, v, *values): - if len(values) != self.dimension(): - raise ValueError( - "Expected as many values to inc() as labels (%d)" % (self.dimension()) - ) - - # TODO: should assert that the tag values are all strings - - self.guages[values] = v - - def render(self): - return flatten( - self._render_for_labels(k, self.guages[k]) - for k in sorted(self.guages.keys()) - ) - - -class CallbackMetric(BaseMetric): - """A metric that returns the numeric value returned by a callback whenever - it is rendered. Typically this is used to implement gauges that yield the - size or other state of some in-memory object by actively querying it.""" - - def __init__(self, name, callback, labels=[]): - super(CallbackMetric, self).__init__(name, labels=labels) - - self.callback = callback - - def render(self): - try: - value = self.callback() - except Exception: - logger.exception("Failed to render %s", self.name) - return ["# FAILED to render " + self.name] - - if self.is_scalar(): - return list(self._render_for_labels([], value)) - - return flatten( - self._render_for_labels(k, value[k]) - for k in sorted(value.keys()) - ) - - -class DistributionMetric(object): - """A combination of an event counter and an accumulator, which counts - both the number of events and accumulates the total value. Typically this - could be used to keep track of method-running times, or other distributions - of values that occur in discrete occurances. - - TODO(paul): Try to export some heatmap-style stats? - """ - - def __init__(self, name, *args, **kwargs): - self.counts = CounterMetric(name + ":count", **kwargs) - self.totals = CounterMetric(name + ":total", **kwargs) - - def inc_by(self, inc, *values): - self.counts.inc(*values) - self.totals.inc_by(inc, *values) - - def render(self): - return self.counts.render() + self.totals.render() - - -class CacheMetric(object): - __slots__ = ( - "name", "cache_name", "hits", "misses", "evicted_size", "size_callback", - ) - - def __init__(self, name, size_callback, cache_name): - self.name = name - self.cache_name = cache_name - - self.hits = 0 - self.misses = 0 - self.evicted_size = 0 - - self.size_callback = size_callback - - def inc_hits(self): - self.hits += 1 - - def inc_misses(self): - self.misses += 1 - - def inc_evictions(self, size=1): - self.evicted_size += size - - def render(self): - size = self.size_callback() - hits = self.hits - total = self.misses + self.hits - - return [ - """%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits), - """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total), - """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size), - """%s:evicted_size{name="%s"} %d""" % ( - self.name, self.cache_name, self.evicted_size - ), - ] - - -class MemoryUsageMetric(object): - """Keeps track of the current memory usage, using psutil. - - The class will keep the current min/max/sum/counts of rss over the last - WINDOW_SIZE_SEC, by polling UPDATE_HZ times per second - """ - - UPDATE_HZ = 2 # number of times to get memory per second - WINDOW_SIZE_SEC = 30 # the size of the window in seconds - - def __init__(self, hs, psutil): - clock = hs.get_clock() - self.memory_snapshots = [] - - self.process = psutil.Process() - - clock.looping_call(self._update_curr_values, 1000 / self.UPDATE_HZ) - - def _update_curr_values(self): - max_size = self.UPDATE_HZ * self.WINDOW_SIZE_SEC - self.memory_snapshots.append(self.process.memory_info().rss) - self.memory_snapshots[:] = self.memory_snapshots[-max_size:] - - def render(self): - if not self.memory_snapshots: - return [] - - max_rss = max(self.memory_snapshots) - min_rss = min(self.memory_snapshots) - sum_rss = sum(self.memory_snapshots) - len_rss = len(self.memory_snapshots) - - return [ - "process_psutil_rss:max %d" % max_rss, - "process_psutil_rss:min %d" % min_rss, - "process_psutil_rss:total %d" % sum_rss, - "process_psutil_rss:count %d" % len_rss, - ] - - -def _escape_character(m): - """Replaces a single character with its escape sequence. - - Args: - m (re.MatchObject): A match object whose first group is the single - character to replace - - Returns: - str - """ - c = m.group(1) - if c == "\\": - return "\\\\" - elif c == "\"": - return "\\\"" - elif c == "\n": - return "\\n" - return c - - -def _escape_label_value(value): - """Takes a label value and escapes quotes, newlines and backslashes - """ - return re.sub(r"([\n\"\\])", _escape_character, str(value)) diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py deleted file mode 100644 index 6fec3de399..0000000000 --- a/synapse/metrics/process_collector.py +++ /dev/null @@ -1,122 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - - -TICKS_PER_SEC = 100 -BYTES_PER_PAGE = 4096 - -HAVE_PROC_STAT = os.path.exists("/proc/stat") -HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") -HAVE_PROC_SELF_LIMITS = os.path.exists("/proc/self/limits") -HAVE_PROC_SELF_FD = os.path.exists("/proc/self/fd") - -# Field indexes from /proc/self/stat, taken from the proc(5) manpage -STAT_FIELDS = { - "utime": 14, - "stime": 15, - "starttime": 22, - "vsize": 23, - "rss": 24, -} - - -stats = {} - -# In order to report process_start_time_seconds we need to know the -# machine's boot time, because the value in /proc/self/stat is relative to -# this -boot_time = None -if HAVE_PROC_STAT: - with open("/proc/stat") as _procstat: - for line in _procstat: - if line.startswith("btime "): - boot_time = int(line.split()[1]) - - -def update_resource_metrics(): - if HAVE_PROC_SELF_STAT: - global stats - with open("/proc/self/stat") as s: - line = s.read() - # line is PID (command) more stats go here ... - raw_stats = line.split(") ", 1)[1].split(" ") - - for (name, index) in STAT_FIELDS.iteritems(): - # subtract 3 from the index, because proc(5) is 1-based, and - # we've lost the first two fields in PID and COMMAND above - stats[name] = int(raw_stats[index - 3]) - - -def _count_fds(): - # Not every OS will have a /proc/self/fd directory - if not HAVE_PROC_SELF_FD: - return 0 - - return len(os.listdir("/proc/self/fd")) - - -def register_process_collector(process_metrics): - process_metrics.register_collector(update_resource_metrics) - - if HAVE_PROC_SELF_STAT: - process_metrics.register_callback( - "cpu_user_seconds_total", - lambda: float(stats["utime"]) / TICKS_PER_SEC - ) - process_metrics.register_callback( - "cpu_system_seconds_total", - lambda: float(stats["stime"]) / TICKS_PER_SEC - ) - process_metrics.register_callback( - "cpu_seconds_total", - lambda: (float(stats["utime"] + stats["stime"])) / TICKS_PER_SEC - ) - - process_metrics.register_callback( - "virtual_memory_bytes", - lambda: int(stats["vsize"]) - ) - process_metrics.register_callback( - "resident_memory_bytes", - lambda: int(stats["rss"]) * BYTES_PER_PAGE - ) - - process_metrics.register_callback( - "start_time_seconds", - lambda: boot_time + int(stats["starttime"]) / TICKS_PER_SEC - ) - - if HAVE_PROC_SELF_FD: - process_metrics.register_callback( - "open_fds", - lambda: _count_fds() - ) - - if HAVE_PROC_SELF_LIMITS: - def _get_max_fds(): - with open("/proc/self/limits") as limits: - for line in limits: - if not line.startswith("Max open files "): - continue - # Line is Max open files $SOFT $HARD - return int(line.split()[3]) - return None - - process_metrics.register_callback( - "max_fds", - lambda: _get_max_fds() - ) diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py index 870f400600..9789359077 100644 --- a/synapse/metrics/resource.py +++ b/synapse/metrics/resource.py @@ -13,27 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.web.resource import Resource - -import synapse.metrics - +from prometheus_client.twisted import MetricsResource METRICS_PREFIX = "/_synapse/metrics" - -class MetricsResource(Resource): - isLeaf = True - - def __init__(self, hs): - Resource.__init__(self) # Resource is old-style, so no super() - - self.hs = hs - - def render_GET(self, request): - response = synapse.metrics.render_all() - - request.setHeader("Content-Type", "text/plain") - request.setHeader("Content-Length", str(len(response))) - - # Encode as UTF-8 (default) - return response.encode() +__all__ = ["MetricsResource", "METRICS_PREFIX"] diff --git a/synapse/notifier.py b/synapse/notifier.py index 8355c7d621..6dce20a284 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -28,22 +28,20 @@ from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client -import synapse.metrics +from synapse.metrics import LaterGauge from collections import namedtuple +from prometheus_client import Counter import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) +notified_events_counter = Counter("synapse_notifier_notified_events", "") -notified_events_counter = metrics.register_counter("notified_events") - -users_woken_by_stream_counter = metrics.register_counter( - "users_woken_by_stream", labels=["stream"] -) +users_woken_by_stream_counter = Counter( + "synapse_notifier_users_woken_by_stream", "", ["stream"]) # TODO(paul): Should be shared somewhere @@ -108,7 +106,7 @@ class _NotifierUserStream(object): self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred - users_woken_by_stream_counter.inc(stream_key) + users_woken_by_stream_counter.labels(stream_key).inc() with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) @@ -197,14 +195,14 @@ class Notifier(object): all_user_streams.add(x) return sum(stream.count_listeners() for stream in all_user_streams) - metrics.register_callback("listeners", count_listeners) + LaterGauge("synapse_notifier_listeners", "", [], count_listeners) - metrics.register_callback( - "rooms", + LaterGauge( + "synapse_notifier_rooms", "", [], lambda: count(bool, self.room_to_user_streams.values()), ) - metrics.register_callback( - "users", + LaterGauge( + "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream), ) diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 7a18afe5f9..a8ae7bcd6c 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -39,7 +39,7 @@ def list_with_base_rules(rawrules): rawrules = [r for r in rawrules if r['priority_class'] >= 0] # shove the server default rules for each kind onto the end of each - current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] + current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1] ruleslist.extend(make_base_prepend_rules( PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7c680659b6..a5cab1f043 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -22,35 +22,32 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.event_auth import get_user_power_level from synapse.api.constants import EventTypes, Membership -from synapse.metrics import get_metrics_for -from synapse.util.caches import metrics as cache_metrics +from synapse.util.caches import register_cache from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer from synapse.state import POWER_KEY from collections import namedtuple - +from prometheus_client import Counter +from six import itervalues, iteritems logger = logging.getLogger(__name__) rules_by_room = {} -push_metrics = get_metrics_for(__name__) -push_rules_invalidation_counter = push_metrics.register_counter( - "push_rules_invalidation_counter" -) -push_rules_state_size_counter = push_metrics.register_counter( - "push_rules_state_size_counter" -) +push_rules_invalidation_counter = Counter( + "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "") +push_rules_state_size_counter = Counter( + "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "") # Measures whether we use the fast path of using state deltas, or if we have to # recalculate from scratch -push_rules_delta_state_cache_metric = cache_metrics.register_cache( +push_rules_delta_state_cache_metric = register_cache( "cache", - size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values - cache_name="push_rules_delta_state_cache_metric", + "push_rules_delta_state_cache_metric", + cache=[], # Meaningless size, as this isn't a cache that stores values ) @@ -64,10 +61,10 @@ class BulkPushRuleEvaluator(object): self.store = hs.get_datastore() self.auth = hs.get_auth() - self.room_push_rule_cache_metrics = cache_metrics.register_cache( + self.room_push_rule_cache_metrics = register_cache( "cache", - size_callback=lambda: 0, # There's not good value for this - cache_name="room_push_rule_cache", + "room_push_rule_cache", + cache=[], # Meaningless size, as this isn't a cache that stores values ) @defer.inlineCallbacks @@ -126,7 +123,7 @@ class BulkPushRuleEvaluator(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { - (e.type, e.state_key): e for e in auth_events.itervalues() + (e.type, e.state_key): e for e in itervalues(auth_events) } sender_level = get_user_power_level(event.sender, auth_events) @@ -160,7 +157,7 @@ class BulkPushRuleEvaluator(object): condition_cache = {} - for uid, rules in rules_by_user.iteritems(): + for uid, rules in iteritems(rules_by_user): if event.sender == uid: continue @@ -309,7 +306,7 @@ class RulesForRoom(object): current_state_ids = context.current_state_ids push_rules_delta_state_cache_metric.inc_misses() - push_rules_state_size_counter.inc_by(len(current_state_ids)) + push_rules_state_size_counter.inc(len(current_state_ids)) logger.debug( "Looking for member changes in %r %r", state_group, current_state_ids @@ -406,7 +403,7 @@ class RulesForRoom(object): # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: - for event_id in member_event_ids.itervalues(): + for event_id in itervalues(member_event_ids): if event_id == event.event_id: members[event_id] = (event.state_key, event.membership) @@ -414,7 +411,7 @@ class RulesForRoom(object): logger.debug("Found members %r: %r", self.room_id, members.values()) interested_in_user_ids = set( - user_id for user_id, membership in members.itervalues() + user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN ) @@ -426,7 +423,7 @@ class RulesForRoom(object): ) user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher + uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher ) logger.debug("With pushers: %r", user_ids) @@ -447,7 +444,7 @@ class RulesForRoom(object): ) ret_rules_by_user.update( - item for item in rules_by_user.iteritems() if item[0] is not None + item for item in iteritems(rules_by_user) if item[0] is not None ) self.update_cache(sequence, members, ret_rules_by_user, state_group) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index b077e1a446..bf7ff74a1a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -20,22 +20,17 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled from . import push_rule_evaluator from . import push_tools -import synapse from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure -logger = logging.getLogger(__name__) +from prometheus_client import Counter -metrics = synapse.metrics.get_metrics_for(__name__) +logger = logging.getLogger(__name__) -http_push_processed_counter = metrics.register_counter( - "http_pushes_processed", -) +http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "") -http_push_failed_counter = metrics.register_counter( - "http_pushes_failed", -) +http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "") class HttpPusher(object): diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index b5cd9b426a..d4be800e5e 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -229,7 +229,8 @@ class Mailer(object): if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]: prev_messages = room_vars['notifs'][-1]['messages'] for message in notifvars['messages']: - pm = filter(lambda pm: pm['id'] == message['id'], prev_messages) + pm = list(filter(lambda pm: pm['id'] == message['id'], + prev_messages)) if pm: if not message["is_historical"]: pm[0]["is_historical"] = False diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py index 277da3cd35..43f0c74ff3 100644 --- a/synapse/push/presentable_names.py +++ b/synapse/push/presentable_names.py @@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True # so find out who is in the room that isn't the user. if "m.room.member" in room_state_bytype_ids: member_events = yield store.get_events( - room_state_bytype_ids["m.room.member"].values() + list(room_state_bytype_ids["m.room.member"].values()) ) all_members = [ ev for ev in member_events.values() diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index d55efde8cc..cf735f7468 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -152,7 +152,7 @@ class PushRuleEvaluatorForEvent(object): # Caches (glob, word_boundary) -> regex for push. See _glob_matches regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR) -register_cache("regex_push_cache", regex_cache) +register_cache("cache", "regex_push_cache", regex_cache) def _glob_matches(glob, value, word_boundary=False): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 216db4d164..faf6dfdb8d 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -50,13 +50,16 @@ REQUIREMENTS = { "bcrypt": ["bcrypt>=3.1.0"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], - "blist": ["blist"], + "sortedcontainers": ["sortedcontainers"], "pysaml2>=3.0.0": ["saml2>=3.0.0"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], "six": ["six"], + "prometheus_client": ["prometheus_client"], + "attr": ["attr"], } + CONDITIONAL_REQUIREMENTS = { "web_client": { "matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"], diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d7d38464b2..c870475cd1 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -60,21 +60,21 @@ from .commands import ( ) from .streams import STREAMS_MAP +from synapse.metrics import LaterGauge from synapse.util.stringutils import random_string -from synapse.metrics.metric import CounterMetric -import logging -import synapse.metrics -import struct -import fcntl +from prometheus_client import Counter +from collections import defaultdict -metrics = synapse.metrics.get_metrics_for(__name__) +from six import iterkeys, iteritems -connection_close_counter = metrics.register_counter( - "close_reason", labels=["reason_type"], -) +import logging +import struct +import fcntl +connection_close_counter = Counter( + "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]) # A list of all connected protocols. This allows us to send metrics about the # connections. @@ -136,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # The LoopingCall for sending pings. self._send_ping_loop = None - self.inbound_commands_counter = CounterMetric( - "inbound_commands", labels=["command"], - ) - self.outbound_commands_counter = CounterMetric( - "outbound_commands", labels=["command"], - ) + self.inbound_commands_counter = defaultdict(int) + self.outbound_commands_counter = defaultdict(int) def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -201,7 +197,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - self.inbound_commands_counter.inc(cmd_name) + self.inbound_commands_counter[cmd_name] = ( + self.inbound_commands_counter[cmd_name] + 1) cmd_cls = COMMAND_MAP[cmd_name] try: @@ -251,8 +248,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - self.outbound_commands_counter.inc(cmd.NAME) - + self.outbound_commands_counter[cmd.NAME] = ( + self.outbound_commands_counter[cmd.NAME] + 1) string = "%s %s" % (cmd.NAME, cmd.to_line(),) if "\n" in string: raise Exception("Unexpected newline in command: %r", string) @@ -317,9 +314,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def connectionLost(self, reason): logger.info("[%s] Replication connection closed: %r", self.id(), reason) if isinstance(reason, Failure): - connection_close_counter.inc(reason.type.__name__) + connection_close_counter.labels(reason.type.__name__).inc() else: - connection_close_counter.inc(reason.__class__.__name__) + connection_close_counter.labels(reason.__class__.__name__).inc() try: # Remove us from list of connections to be monitored @@ -392,7 +389,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): if stream_name == "ALL": # Subscribe to all streams we're publishing to. - for stream in self.streamer.streams_by_name.iterkeys(): + for stream in iterkeys(self.streamer.streams_by_name): self.subscribe_to_stream(stream, token) else: self.subscribe_to_stream(stream_name, token) @@ -498,7 +495,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): BaseReplicationStreamProtocol.connectionMade(self) # Once we've connected subscribe to the necessary streams - for stream_name, token in self.handler.get_streams_to_replicate().iteritems(): + for stream_name, token in iteritems(self.handler.get_streams_to_replicate()): self.replicate(stream_name, token) # Tell the server if we have any users currently syncing (should only @@ -518,7 +515,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_RDATA(self, cmd): stream_name = cmd.stream_name - inbound_rdata_count.inc(stream_name) + inbound_rdata_count.labels(stream_name).inc() try: row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) @@ -566,14 +563,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # The following simply registers metrics for the replication connections -metrics.register_callback( - "pending_commands", +pending_commands = LaterGauge( + "pending_commands", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) def transport_buffer_size(protocol): @@ -583,14 +578,12 @@ def transport_buffer_size(protocol): return 0 -metrics.register_callback( - "transport_send_buffer", +transport_send_buffer = LaterGauge( + "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) def transport_kernel_read_buffer_size(protocol, read=True): @@ -608,48 +601,38 @@ def transport_kernel_read_buffer_size(protocol, read=True): return 0 -metrics.register_callback( - "transport_kernel_send_buffer", +tcp_transport_kernel_send_buffer = LaterGauge( + "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) -metrics.register_callback( - "transport_kernel_read_buffer", +tcp_transport_kernel_read_buffer = LaterGauge( + "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"], lambda: { (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True) for p in connected_connections - }, - labels=["name", "conn_id"], -) + }) -metrics.register_callback( - "inbound_commands", +tcp_inbound_commands = LaterGauge( + "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"], lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.inbound_commands_counter.counts.iteritems() - }, - labels=["command", "name", "conn_id"], -) + for k, count in iteritems(p.inbound_commands_counter) + }) -metrics.register_callback( - "outbound_commands", +tcp_outbound_commands = LaterGauge( + "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"], lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.outbound_commands_counter.counts.iteritems() - }, - labels=["command", "name", "conn_id"], -) + for k, count in iteritems(p.outbound_commands_counter) + }) # number of updates received for each RDATA stream -inbound_rdata_count = metrics.register_counter( - "inbound_rdata_count", - labels=["stream_name"], -) +inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "", + ["stream_name"]) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a603c520ea..63bd6d2652 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -22,20 +22,21 @@ from .streams import STREAMS_MAP, FederationStream from .protocol import ServerReplicationStreamProtocol from synapse.util.metrics import Measure, measure_func +from synapse.metrics import LaterGauge import logging -import synapse.metrics +from prometheus_client import Counter +from six import itervalues -metrics = synapse.metrics.get_metrics_for(__name__) -stream_updates_counter = metrics.register_counter( - "stream_updates", labels=["stream_name"] -) -user_sync_counter = metrics.register_counter("user_sync") -federation_ack_counter = metrics.register_counter("federation_ack") -remove_pusher_counter = metrics.register_counter("remove_pusher") -invalidate_cache_counter = metrics.register_counter("invalidate_cache") -user_ip_cache_counter = metrics.register_counter("user_ip_cache") +stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", + "", ["stream_name"]) +user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") +federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") +remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") +invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", + "") +user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") logger = logging.getLogger(__name__) @@ -74,29 +75,29 @@ class ReplicationStreamer(object): # Current connections. self.connections = [] - metrics.register_callback("total_connections", lambda: len(self.connections)) + LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], + lambda: len(self.connections)) # List of streams that clients can subscribe to. # We only support federation stream if federation sending hase been # disabled on the master. self.streams = [ - stream(hs) for stream in STREAMS_MAP.itervalues() + stream(hs) for stream in itervalues(STREAMS_MAP) if stream != FederationStream or not hs.config.send_federation ] self.streams_by_name = {stream.NAME: stream for stream in self.streams} - metrics.register_callback( - "connections_per_stream", + LaterGauge( + "synapse_replication_tcp_resource_connections_per_stream", "", + ["stream_name"], lambda: { (stream_name,): len([ conn for conn in self.connections if stream_name in conn.replication_streams ]) for stream_name in self.streams_by_name - }, - labels=["stream_name"], - ) + }) self.federation_sender = None if not hs.config.send_federation: @@ -176,7 +177,7 @@ class ReplicationStreamer(object): logger.info( "Streaming: %s -> %s", stream.NAME, updates[-1][0] ) - stream_updates_counter.inc_by(len(updates), stream.NAME) + stream_updates_counter.labels(stream.NAME).inc(len(updates)) # Some streams return multiple rows with the same stream IDs, # we need to make sure they get sent out in batches. We do diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 20fa6678ef..7c01b438cb 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -104,7 +104,7 @@ class HttpTransactionCache(object): def _cleanup(self): now = self.clock.time_msec() - for key in self.transactions.keys(): + for key in list(self.transactions): ts = self.transactions[key][1] if now > (ts + CLEANUP_PERIOD_MS): # after cleanup period del self.transactions[key] diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 6835a7bba2..b8665a45eb 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): yield self.store.find_first_stream_ordering_after_ts(ts) ) - room_event_after_stream_ordering = ( + r = ( yield self.store.get_room_event_after_stream_ordering( room_id, stream_ordering, ) ) - if room_event_after_stream_ordering: - token = yield self.store.get_topological_token_for_event( - room_event_after_stream_ordering, - ) - else: + if not r: logger.warn( "[purge] purging events not possible: No event found " "(received_ts %i => stream_ordering %i)", @@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): "there is no event to be purged", errcode=Codes.NOT_FOUND, ) + (stream, topo, _event_id) = r + token = "t%d-%d" % (topo, stream) logger.info( - "[purge] purging up to token %d (received_ts %i => " + "[purge] purging up to token %s (received_ts %i => " "stream_ordering %i)", token, ts, stream_ordering, ) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 9800ce7581..2ac767d2dc 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -48,6 +48,7 @@ import shutil import cgi import logging from six.moves.urllib import parse as urlparse +from six import iteritems logger = logging.getLogger(__name__) @@ -603,7 +604,7 @@ class MediaRepository(object): thumbnails[(t_width, t_height, r_type)] = r_method # Now we generate the thumbnails for each dimension, store it - for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): + for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 2839207abc..565cef2b8d 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -24,7 +24,9 @@ import shutil import sys import traceback import simplejson as json -import urlparse + +from six.moves import urllib_parse as urlparse +from six import string_types from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -590,8 +592,8 @@ def _iterate_over_text(tree, *tags_to_ignore): # to be returned. elements = iter([tree]) while True: - el = elements.next() - if isinstance(el, basestring): + el = next(elements) + if isinstance(el, string_types): yield el elif el is not None and el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index a709802856..bb74af1af5 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -42,6 +42,7 @@ class ConsentServerNotices(object): self._current_consent_version = hs.config.user_consent_version self._server_notice_content = hs.config.user_consent_server_notice_content + self._send_to_guests = hs.config.user_consent_server_notice_to_guests if self._server_notice_content is not None: if not self._server_notices_manager.is_enabled(): @@ -78,6 +79,10 @@ class ConsentServerNotices(object): try: u = yield self._store.get_user_by_id(user_id) + if u["is_guest"] and not self._send_to_guests: + # don't send to guests + return + if u["consent_version"] == self._current_consent_version: # user has already consented return diff --git a/synapse/state.py b/synapse/state.py index 26093c8434..8098db94b4 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -32,6 +32,8 @@ from frozendict import frozendict import logging import hashlib +from six import iteritems, itervalues + logger = logging.getLogger(__name__) @@ -130,9 +132,10 @@ class StateHandler(object): defer.returnValue(event) return - state_map = yield self.store.get_events(state.values(), get_prev_content=False) + state_map = yield self.store.get_events(list(state.values()), + get_prev_content=False) state = { - key: state_map[e_id] for key, e_id in state.iteritems() if e_id in state_map + key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map } defer.returnValue(state) @@ -338,7 +341,7 @@ class StateHandler(object): ) if len(state_groups_ids) == 1: - name, state_list = state_groups_ids.items().pop() + name, state_list = list(state_groups_ids.items()).pop() prev_group, delta_ids = yield self.store.get_state_group_delta(name) @@ -378,7 +381,7 @@ class StateHandler(object): new_state = resolve_events_with_state_map(state_set_ids, state_map) new_state = { - key: state_map[ev_id] for key, ev_id in new_state.iteritems() + key: state_map[ev_id] for key, ev_id in iteritems(new_state) } return new_state @@ -458,15 +461,15 @@ class StateResolutionHandler(object): # build a map from state key to the event_ids which set that state. # dict[(str, str), set[str]) state = {} - for st in state_groups_ids.itervalues(): - for key, e_id in st.iteritems(): + for st in itervalues(state_groups_ids): + for key, e_id in iteritems(st): state.setdefault(key, set()).add(e_id) # build a map from state key to the event_ids which set that state, # including only those where there are state keys in conflict. conflicted_state = { k: list(v) - for k, v in state.iteritems() + for k, v in iteritems(state) if len(v) > 1 } @@ -474,13 +477,13 @@ class StateResolutionHandler(object): logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_factory( - state_groups_ids.values(), + list(state_groups_ids.values()), event_map=event_map, state_map_factory=state_map_factory, ) else: new_state = { - key: e_ids.pop() for key, e_ids in state.iteritems() + key: e_ids.pop() for key, e_ids in iteritems(state) } with Measure(self.clock, "state.create_group_ids"): @@ -489,8 +492,8 @@ class StateResolutionHandler(object): # which will be used as a cache key for future resolutions, but # not get persisted. state_group = None - new_state_event_ids = frozenset(new_state.itervalues()) - for sg, events in state_groups_ids.iteritems(): + new_state_event_ids = frozenset(itervalues(new_state)) + for sg, events in iteritems(state_groups_ids): if new_state_event_ids == frozenset(e_id for e_id in events): state_group = sg break @@ -501,11 +504,11 @@ class StateResolutionHandler(object): prev_group = None delta_ids = None - for old_group, old_ids in state_groups_ids.iteritems(): + for old_group, old_ids in iteritems(state_groups_ids): if not set(new_state) - set(old_ids): n_delta_ids = { k: v - for k, v in new_state.iteritems() + for k, v in iteritems(new_state) if old_ids.get(k) != v } if not delta_ids or len(n_delta_ids) < len(delta_ids): @@ -527,7 +530,7 @@ class StateResolutionHandler(object): def _ordered_events(events): def key_func(e): - return -int(e.depth), hashlib.sha1(e.event_id).hexdigest() + return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest() return sorted(events, key=key_func) @@ -584,7 +587,7 @@ def _seperate(state_sets): conflicted_state = {} for state_set in state_sets[1:]: - for key, value in state_set.iteritems(): + for key, value in iteritems(state_set): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) if unconflicted_value is None: @@ -640,7 +643,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): needed_events = set( event_id - for event_ids in conflicted_state.itervalues() + for event_ids in itervalues(conflicted_state) for event_id in event_ids ) if event_map is not None: @@ -662,7 +665,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): unconflicted_state, conflicted_state, state_map ) - new_needed_events = set(auth_events.itervalues()) + new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events if event_map is not None: new_needed_events -= set(event_map.iterkeys()) @@ -679,7 +682,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map): auth_events = {} - for event_ids in conflicted_state.itervalues(): + for event_ids in itervalues(conflicted_state): for event_id in event_ids: if event_id in state_map: keys = event_auth.auth_types_for_event(state_map[event_id]) @@ -691,10 +694,10 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma return auth_events -def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids, +def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map): conflicted_state = {} - for key, event_ids in conflicted_state_ds.iteritems(): + for key, event_ids in iteritems(conflicted_state_ids): events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map] if len(events) > 1: conflicted_state[key] = events @@ -703,7 +706,7 @@ def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ auth_events = { key: state_map[ev_id] - for key, ev_id in auth_event_ids.iteritems() + for key, ev_id in iteritems(auth_event_ids) if ev_id in state_map } @@ -716,7 +719,7 @@ def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ raise new_state = unconflicted_state_ids - for key, event in resolved_state.iteritems(): + for key, event in iteritems(resolved_state): new_state[key] = event.event_id return new_state @@ -741,7 +744,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.iteritems(): + for key, events in iteritems(conflicted_state): if key[0] == EventTypes.JoinRules: logger.debug("Resolving conflicted join rules %r", events) resolved_state[key] = _resolve_auth_events( @@ -751,7 +754,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.iteritems(): + for key, events in iteritems(conflicted_state): if key[0] == EventTypes.Member: logger.debug("Resolving conflicted member lists %r", events) resolved_state[key] = _resolve_auth_events( @@ -761,7 +764,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.iteritems(): + for key, events in iteritems(conflicted_state): if key not in resolved_state: logger.debug("Resolving conflicted state %r:%r", key, events) resolved_state[key] = _resolve_normal_events( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2262776ab2..22d6257a9f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,8 +18,8 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine -import synapse.metrics +from prometheus_client import Histogram from twisted.internet import defer @@ -27,20 +27,25 @@ import sys import time import threading +from six import itervalues, iterkeys, iteritems +from six.moves import intern, range logger = logging.getLogger(__name__) +try: + MAX_TXN_ID = sys.maxint - 1 +except AttributeError: + # python 3 does not have a maximum int value + MAX_TXN_ID = 2**63 - 1 + sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") +sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec") -metrics = synapse.metrics.get_metrics_for("synapse.storage") - -sql_scheduling_timer = metrics.register_distribution("schedule_time") - -sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) -sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) +sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"]) +sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"]) class LoggingTransaction(object): @@ -105,7 +110,7 @@ class LoggingTransaction(object): # Don't let logging failures stop SQL from working pass - start = time.time() * 1000 + start = time.time() try: return func( @@ -115,9 +120,9 @@ class LoggingTransaction(object): logger.debug("[SQL FAIL] {%s} %s", self.name, e) raise finally: - msecs = (time.time() * 1000) - start - sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) - sql_query_timer.inc_by(msecs, sql.split()[0]) + secs = time.time() - start + sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs) + sql_query_timer.labels(sql.split()[0]).observe(secs) class PerformanceCounters(object): @@ -127,7 +132,7 @@ class PerformanceCounters(object): def update(self, key, start_time, end_time=None): if end_time is None: - end_time = time.time() * 1000 + end_time = time.time() duration = end_time - start_time count, cum_time = self.current_counters.get(key, (0, 0)) count += 1 @@ -137,7 +142,7 @@ class PerformanceCounters(object): def interval(self, interval_duration, limit=3): counters = [] - for name, (count, cum_time) in self.current_counters.iteritems(): + for name, (count, cum_time) in iteritems(self.current_counters): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append(( (cum_time - prev_time) / interval_duration, @@ -217,12 +222,12 @@ class SQLBaseStore(object): def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks, logging_context, func, *args, **kwargs): - start = time.time() * 1000 + start = time.time() txn_id = self._TXN_ID # We don't really need these to be unique, so lets stop it from # growing really large. - self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID) name = "%s-%x" % (desc, txn_id, ) @@ -277,17 +282,17 @@ class SQLBaseStore(object): logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: - end = time.time() * 1000 + end = time.time() duration = end - start if logging_context is not None: logging_context.add_database_transaction(duration) - transaction_logger.debug("[TXN END] {%s} %f", name, duration) + transaction_logger.debug("[TXN END] {%s} %f sec", name, duration) self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) - sql_txn_timer.inc_by(duration, desc) + sql_txn_timer.labels(desc).observe(duration) @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): @@ -344,13 +349,13 @@ class SQLBaseStore(object): """ current_context = LoggingContext.current_context() - start_time = time.time() * 1000 + start_time = time.time() def inner_func(conn, *args, **kwargs): with LoggingContext("runWithConnection") as context: - sched_duration_ms = time.time() * 1000 - start_time - sql_scheduling_timer.inc_by(sched_duration_ms) - current_context.add_database_scheduled(sched_duration_ms) + sched_duration_sec = time.time() - start_time + sql_scheduling_timer.observe(sched_duration_sec) + current_context.add_database_scheduled(sched_duration_sec) if self.database_engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") @@ -543,7 +548,7 @@ class SQLBaseStore(object): ", ".join("%s = ?" % (k,) for k in values), " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - sqlargs = values.values() + keyvalues.values() + sqlargs = list(values.values()) + list(keyvalues.values()) txn.execute(sql, sqlargs) if txn.rowcount > 0: @@ -561,7 +566,7 @@ class SQLBaseStore(object): ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues) ) - txn.execute(sql, allvalues.values()) + txn.execute(sql, list(allvalues.values())) # successfully inserted return True @@ -629,8 +634,8 @@ class SQLBaseStore(object): } if keyvalues: - sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) - txn.execute(sql, keyvalues.values()) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + txn.execute(sql, list(keyvalues.values())) else: txn.execute(sql) @@ -694,7 +699,7 @@ class SQLBaseStore(object): table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) else: sql = "SELECT %s FROM %s" % ( ", ".join(retcols), @@ -725,9 +730,12 @@ class SQLBaseStore(object): if not iterable: defer.returnValue(results) + # iterables can not be sliced, so convert it to a list first + it_list = list(iterable) + chunks = [ - iterable[i:i + batch_size] - for i in xrange(0, len(iterable), batch_size) + it_list[i:i + batch_size] + for i in range(0, len(it_list), batch_size) ] for chunk in chunks: rows = yield self.runInteraction( @@ -767,7 +775,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -790,7 +798,7 @@ class SQLBaseStore(object): @staticmethod def _simple_update_txn(txn, table, keyvalues, updatevalues): if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) else: where = "" @@ -802,7 +810,7 @@ class SQLBaseStore(object): txn.execute( update_sql, - updatevalues.values() + keyvalues.values() + list(updatevalues.values()) + list(keyvalues.values()) ) return txn.rowcount @@ -850,7 +858,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - txn.execute(select_sql, keyvalues.values()) + txn.execute(select_sql, list(keyvalues.values())) row = txn.fetchone() if not row: @@ -888,7 +896,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) if txn.rowcount == 0: raise StoreError(404, "No row found") if txn.rowcount > 1: @@ -906,7 +914,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - return txn.execute(sql, keyvalues.values()) + return txn.execute(sql, list(keyvalues.values())) def _simple_delete_many(self, table, column, iterable, keyvalues, desc): return self.runInteraction( @@ -938,7 +946,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -978,7 +986,7 @@ class SQLBaseStore(object): txn.close() if cache: - min_val = min(cache.itervalues()) + min_val = min(itervalues(cache)) else: min_val = max_value @@ -1093,7 +1101,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues), " ? ASC LIMIT ? OFFSET ?" ) - txn.execute(sql, keyvalues.values() + pagevalues) + txn.execute(sql, list(keyvalues.values()) + list(pagevalues)) else: sql = "SELECT %s FROM %s ORDER BY %s" % ( ", ".join(retcols), diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ba46907737..ce338514e8 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -22,6 +22,8 @@ from . import background_updates from synapse.util.caches import CACHE_SIZE_FACTOR +from six import iteritems + logger = logging.getLogger(__name__) @@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): def _update_client_ips_batch_txn(self, txn, to_update): self.database_engine.lock_table(txn, "user_ips") - for entry in to_update.iteritems(): + for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry self._simple_upsert_txn( @@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_agent": user_agent, "last_seen": last_seen, } - for (access_token, ip), (user_agent, last_seen) in results.iteritems() + for (access_token, ip), (user_agent, last_seen) in iteritems(results) )) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 712106b83a..d149d8392e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, []) if len(query_map) >= 20: - now_stream_id = max(stream_id for stream_id in query_map.itervalues()) + now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( txn, query_map.keys(), include_all_devices=True @@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore): """ results = [] - for user_id, user_devices in devices.iteritems(): + for user_id, user_devices in iteritems(devices): # The prev_id for the first row is always the last row before # `from_stream_id` txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) rows = txn.fetchall() prev_id = rows[0][0] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): stream_id = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore): if devices: user_devices = devices[user_id] results = [] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): result = { "device_id": device_id, } diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ff8538ddf8..b146487943 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -21,6 +21,8 @@ import simplejson as json from ._base import SQLBaseStore +from six import iteritems + class EndToEndKeyStore(SQLBaseStore): def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): @@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore): query_list, include_all_devices, ) - for user_id, device_keys in results.iteritems(): - for device_id, device_info in device_keys.iteritems(): + for user_id, device_keys in iteritems(results): + for device_id, device_info in iteritems(device_keys): device_info["keys"] = json.loads(device_info.pop("key_json")) defer.returnValue(results) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f084a5f54b..d0350ee5fe 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks import logging import simplejson as json +from six import iteritems + logger = logging.getLogger(__name__) @@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): txn.executemany(sql, ( _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.iteritems() + for user_id, actions in iteritems(user_id_actions) )) return self.runInteraction( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5ebef98c4f..cb1082e864 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -40,30 +40,30 @@ import synapse.metrics from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 -logger = logging.getLogger(__name__) +from six.moves import range +from six import itervalues, iteritems +from prometheus_client import Counter -metrics = synapse.metrics.get_metrics_for(__name__) -persist_event_counter = metrics.register_counter("persisted_events") -event_counter = metrics.register_counter( - "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] -) +logger = logging.getLogger(__name__) + +persist_event_counter = Counter("synapse_storage_events_persisted_events", "") +event_counter = Counter("synapse_storage_events_persisted_events_sep", "", + ["type", "origin_type", "origin_entity"]) # The number of times we are recalculating the current state -state_delta_counter = metrics.register_counter( - "state_delta", -) +state_delta_counter = Counter("synapse_storage_events_state_delta", "") + # The number of times we are recalculating state when there is only a # single forward extremity -state_delta_single_event_counter = metrics.register_counter( - "state_delta_single_event", -) +state_delta_single_event_counter = Counter( + "synapse_storage_events_state_delta_single_event", "") + # The number of times we are reculating state when we could have resonably # calculated the delta when we calculated the state for an event we were # persisting. -state_delta_reuse_delta_counter = metrics.register_counter( - "state_delta_reuse_delta", -) +state_delta_reuse_delta_counter = Counter( + "synapse_storage_events_state_delta_reuse_delta", "") def encode_json(json_object): @@ -248,7 +248,7 @@ class EventsStore(EventsWorkerStore): partitioned.setdefault(event.room_id, []).append((event, ctx)) deferreds = [] - for room_id, evs_ctxs in partitioned.iteritems(): + for room_id, evs_ctxs in iteritems(partitioned): d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled, @@ -333,7 +333,7 @@ class EventsStore(EventsWorkerStore): chunks = [ events_and_contexts[x:x + 100] - for x in xrange(0, len(events_and_contexts), 100) + for x in range(0, len(events_and_contexts), 100) ] for chunk in chunks: @@ -367,7 +367,7 @@ class EventsStore(EventsWorkerStore): (event, context) ) - for room_id, ev_ctx_rm in events_by_room.iteritems(): + for room_id, ev_ctx_rm in iteritems(events_by_room): # Work out new extremities by recursively adding and removing # the new events. latest_event_ids = yield self.get_latest_event_ids_in_room( @@ -445,7 +445,7 @@ class EventsStore(EventsWorkerStore): state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) - persist_event_counter.inc_by(len(chunk)) + persist_event_counter.inc(len(chunk)) synapse.metrics.event_persisted_position.set( chunk[-1][0].internal_metadata.stream_ordering, ) @@ -460,14 +460,14 @@ class EventsStore(EventsWorkerStore): origin_type = "remote" origin_entity = get_domain_from_id(event.sender) - event_counter.inc(event.type, origin_type, origin_entity) + event_counter.labels(event.type, origin_type, origin_entity).inc() - for room_id, new_state in current_state_for_room.iteritems(): + for room_id, new_state in iteritems(current_state_for_room): self.get_current_state_ids.prefill( (room_id, ), new_state ) - for room_id, latest_event_ids in new_forward_extremeties.iteritems(): + for room_id, latest_event_ids in iteritems(new_forward_extremeties): self.get_latest_event_ids_in_room.prefill( (room_id,), list(latest_event_ids) ) @@ -644,20 +644,20 @@ class EventsStore(EventsWorkerStore): """ existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(existing_state.itervalues()) - new_events = set(ev_id for ev_id in current_state.itervalues()) + existing_events = set(itervalues(existing_state)) + new_events = set(ev_id for ev_id in itervalues(current_state)) changed_events = existing_events ^ new_events if not changed_events: return to_delete = { - key: ev_id for key, ev_id in existing_state.iteritems() + key: ev_id for key, ev_id in iteritems(existing_state) if ev_id in changed_events } events_to_insert = (new_events - existing_events) to_insert = { - key: ev_id for key, ev_id in current_state.iteritems() + key: ev_id for key, ev_id in iteritems(current_state) if ev_id in events_to_insert } @@ -760,11 +760,11 @@ class EventsStore(EventsWorkerStore): ) def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): - for room_id, current_state_tuple in state_delta_by_room.iteritems(): + for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in to_delete.itervalues()], + [(ev_id,) for ev_id in itervalues(to_delete)], ) self._simple_insert_many_txn( @@ -777,7 +777,7 @@ class EventsStore(EventsWorkerStore): "type": key[0], "state_key": key[1], } - for key, ev_id in to_insert.iteritems() + for key, ev_id in iteritems(to_insert) ], ) @@ -796,7 +796,7 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "prev_event_id": to_delete.get(key, None), } - for key, ev_id in state_deltas.iteritems() + for key, ev_id in iteritems(state_deltas) ] ) @@ -839,7 +839,7 @@ class EventsStore(EventsWorkerStore): def _update_forward_extremities_txn(self, txn, new_forward_extremities, max_stream_order): - for room_id, new_extrem in new_forward_extremities.iteritems(): + for room_id, new_extrem in iteritems(new_forward_extremities): self._simple_delete_txn( txn, table="event_forward_extremities", @@ -857,7 +857,7 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "room_id": room_id, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for ev_id in new_extrem ], ) @@ -874,7 +874,7 @@ class EventsStore(EventsWorkerStore): "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for event_id in new_extrem ] ) @@ -902,7 +902,7 @@ class EventsStore(EventsWorkerStore): new_events_and_contexts[event.event_id] = (event, context) else: new_events_and_contexts[event.event_id] = (event, context) - return new_events_and_contexts.values() + return list(new_events_and_contexts.values()) def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): """Update min_depth for each room @@ -928,7 +928,7 @@ class EventsStore(EventsWorkerStore): event.depth, depth_updates.get(event.room_id, event.depth) ) - for room_id, depth in depth_updates.iteritems(): + for room_id, depth in iteritems(depth_updates): self._update_min_depth_for_room_txn(txn, room_id, depth) def _update_outliers_txn(self, txn, events_and_contexts): @@ -1312,7 +1312,7 @@ class EventsStore(EventsWorkerStore): " WHERE e.event_id IN (%s)" ) % (",".join(["?"] * len(ev_map)),) - txn.execute(sql, ev_map.keys()) + txn.execute(sql, list(ev_map)) rows = self.cursor_to_dict(txn) for row in rows: event = ev_map[row["event_id"]] @@ -1575,7 +1575,7 @@ class EventsStore(EventsWorkerStore): chunks = [ event_ids[i:i + 100] - for i in xrange(0, len(event_ids), 100) + for i in range(0, len(event_ids), 100) ] for chunk in chunks: ev_rows = self._simple_select_many_txn( @@ -1989,7 +1989,7 @@ class EventsStore(EventsWorkerStore): logger.info("[purge] finding state groups which depend on redundant" " state groups") remaining_state_groups = [] - for i in xrange(0, len(state_rows), 100): + for i in range(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] # look for state groups whose prev_state_group is one we are about # to delete @@ -2045,7 +2045,7 @@ class EventsStore(EventsWorkerStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in curr_state.iteritems() + for key, state_id in iteritems(curr_state) ], ) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index ba834854e1..32d9d00ffb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -337,7 +337,7 @@ class EventsWorkerStore(SQLBaseStore): def _fetch_event_rows(self, txn, events): rows = [] N = 200 - for i in range(1 + len(events) / N): + for i in range(1 + len(events) // N): evs = events[i * N:(i + 1) * N] if not evs: break diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 78b1e30945..2e2763126d 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore): desc="get_user_filter", ) - defer.returnValue(json.loads(str(def_json).decode("utf-8"))) + defer.returnValue(json.loads(bytes(def_json).decode("utf-8"))) def add_user_filter(self, user_localpart, user_filter): def_json = encode_canonical_json(user_filter) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 87aeaf71d6..0f13b61da8 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer +import six import OpenSSL from signedjson.key import decode_verify_key_bytes @@ -26,6 +27,13 @@ import logging logger = logging.getLogger(__name__) +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates @@ -72,7 +80,7 @@ class KeyStore(SQLBaseStore): values={ "from_server": from_server, "ts_added_ms": time_now_ms, - "tls_certificate": buffer(tls_certificate_bytes), + "tls_certificate": db_binary_type(tls_certificate_bytes), }, desc="store_server_certificate", ) @@ -92,7 +100,7 @@ class KeyStore(SQLBaseStore): if verify_key_bytes: defer.returnValue(decode_verify_key_bytes( - key_id, str(verify_key_bytes) + key_id, bytes(verify_key_bytes) )) @defer.inlineCallbacks @@ -135,7 +143,7 @@ class KeyStore(SQLBaseStore): values={ "from_server": from_server, "ts_added_ms": time_now_ms, - "verify_key": buffer(verify_key.encode()), + "verify_key": db_binary_type(verify_key.encode()), }, ) txn.call_after( @@ -172,7 +180,7 @@ class KeyStore(SQLBaseStore): "from_server": from_server, "ts_added_ms": ts_now_ms, "ts_valid_until_ms": ts_expires_ms, - "key_json": buffer(key_json_bytes), + "key_json": db_binary_type(key_json_bytes), }, desc="store_server_keys_json", ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c08e9cd65a..cf2aae0468 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 49 +SCHEMA_VERSION = 50 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 9e9d3c2591..f05d91cc58 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.api.constants import PresenceState from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util import batch_iter from collections import namedtuple from twisted.internet import defer @@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore): " AND user_id IN (%s)" ) - batches = ( - presence_states[i:i + 50] - for i in xrange(0, len(presence_states), 50) - ) - for states in batches: + for states in batch_iter(presence_states, 50): args = [stream_id] args.extend(s.user_id for s in states) txn.execute( diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 709c69a926..c93c228f6e 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + stream_ordering = int(res["stream_ordering"]) if res else None + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + if stream_ordering is not None: + sql = ( + "SELECT stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + txn.execute(sql, (room_id, receipt_type, user_id)) + + for so, eid in txn: + if int(so) >= stream_ordering: + logger.debug( + "Ignoring new receipt for %s in favour of existing " + "one for later event %s", + event_id, eid, + ) + return False + txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) @@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore): (user_id, room_id, receipt_type) ) - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - - topological_ordering = int(res["topological_ordering"]) if res else None - stream_ordering = int(res["stream_ordering"]) if res else None - - # We don't want to clobber receipts for more recent events, so we - # have to compare orderings of existing receipts - sql = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized as r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" - ) - - txn.execute(sql, (room_id, receipt_type, user_id)) - - if topological_ordering: - for to, so, _ in txn: - if int(to) > topological_ordering: - return False - elif int(to) == topological_ordering and int(so) >= stream_ordering: - return False - self._simple_delete_txn( txn, table="receipts_linearized", @@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore): } ) - if receipt_type == "m.read" and topological_ordering: + if receipt_type == "m.read" and stream_ordering is not None: self._remove_old_push_actions_before_txn( txn, room_id=room_id, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 5d8a850ac9..9c9cf46e7f 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore): retcols=[ "name", "password_hash", "is_guest", "consent_version", "consent_server_notice_sent", + "appservice_id", ], allow_none=True, desc="get_user_by_id", @@ -101,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore, columns=["user_id", "device_id"], ) + self.register_background_index_update( + "users_creation_ts", + index_name="users_creation_ts", + table="users", + columns=["creation_ts"], + ) + # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. @@ -476,6 +484,35 @@ class RegistrationStore(RegistrationWorkerStore, ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) + def count_daily_user_type(self): + """ + Counts 1) native non guest users + 2) native guests users + 3) bridged users + who registered on the homeserver in the past 24 hours + """ + def _count_daily_user_type(txn): + yesterday = int(self._clock.time()) - (60 * 60 * 24) + + sql = """ + SELECT user_type, COALESCE(count(*), 0) AS count FROM ( + SELECT + CASE + WHEN is_guest=0 AND appservice_id IS NULL THEN 'native' + WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest' + WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged' + END AS user_type + FROM users + WHERE creation_ts > ? + ) AS t GROUP BY user_type + """ + results = {'native': 0, 'guest': 0, 'bridged': 0} + txn.execute(sql, (yesterday,)) + for row in txn: + results[row[0]] = row[1] + return results + return self.runInteraction("count_daily_user_type", _count_daily_user_type) + @defer.inlineCallbacks def count_nonbridged_users(self): def _count_users(txn): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 6a861943a2..48a88f755e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -30,6 +30,8 @@ from synapse.types import get_domain_from_id import logging import simplejson as json +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -272,7 +274,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = {} member_event_ids = [ e_id - for key, e_id in current_state_ids.iteritems() + for key, e_id in iteritems(current_state_ids) if key[0] == EventTypes.Member ] @@ -289,7 +291,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = dict(prev_res) member_event_ids = [ e_id - for key, e_id in context.delta_ids.iteritems() + for key, e_id in iteritems(context.delta_ids) if key[0] == EventTypes.Member ] for etype, state_key in context.delta_ids: @@ -576,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore): ) txn.execute(sql, (user_id, room_id)) - txn.call_after(self.was_forgotten_at.invalidate_all) txn.call_after(self.did_forget.invalidate, (user_id, room_id)) self._invalidate_cache_and_stream( txn, self.who_forgot_in_room, (room_id,) @@ -607,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore): count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) - @cachedInlineCallbacks(num_args=3) - def was_forgotten_at(self, user_id, room_id, event_id): - """Returns whether user_id has elected to discard history for room_id at - event_id. - - event_id must be a membership event.""" - def f(txn): - sql = ( - "SELECT" - " forgotten" - " FROM" - " room_memberships" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - " AND" - " event_id = ?" - ) - txn.execute(sql, (user_id, room_id, event_id)) - rows = txn.fetchall() - return rows[0][0] - forgot = yield self.runInteraction("did_forget_membership_at", f) - defer.returnValue(forgot == 1) - @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( @@ -741,7 +717,7 @@ class _JoinedHostsCache(object): if state_entry.state_group == self.state_group: pass elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in state_entry.delta_ids.iteritems(): + for (typ, state_key), event_id in iteritems(state_entry.delta_ids): if typ != EventTypes.Member: continue @@ -771,7 +747,7 @@ class _JoinedHostsCache(object): self.state_group = state_entry.state_group else: self.state_group = object() - self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) + self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users)) defer.returnValue(frozenset(self.hosts_to_joined_users)) def __len__(self): diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql new file mode 100644 index 0000000000..c93ae47532 --- /dev/null +++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql @@ -0,0 +1,19 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +INSERT into background_updates (update_name, progress_json) + VALUES ('users_creation_ts', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 6ba3e59889..f0fa5d7631 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -18,13 +18,14 @@ import logging import re import simplejson as json +from six import string_types + from twisted.internet import defer from .background_updates import BackgroundUpdateStore from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine - logger = logging.getLogger(__name__) SearchEntry = namedtuple('SearchEntry', [ @@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore): # skip over it. continue - if not isinstance(value, basestring): + if not isinstance(value, string_types): # If the event body, name or topic isn't a string # then skip over it continue @@ -447,7 +448,7 @@ class SearchStore(BackgroundUpdateStore): "search_msgs", self.cursor_to_dict, sql, *args ) - results = filter(lambda row: row["room_id"] in room_ids, results) + results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) @@ -602,7 +603,7 @@ class SearchStore(BackgroundUpdateStore): "search_rooms", self.cursor_to_dict, sql, *args ) - results = filter(lambda row: row["room_id"] in room_ids, results) + results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 9e6eaaa532..25922e5a9c 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -14,6 +14,7 @@ # limitations under the License. from twisted.internet import defer +import six from ._base import SQLBaseStore @@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash from synapse.util.caches.descriptors import cached, cachedList +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class SignatureWorkerStore(SQLBaseStore): @cached() @@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore): for e_id, h in hashes.items() } - defer.returnValue(hashes.items()) + defer.returnValue(list(hashes.items())) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. @@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore): vals.append({ "event_id": event.event_id, "algorithm": ref_alg, - "hash": buffer(ref_hash_bytes), + "hash": db_binary_type(ref_hash_bytes), }) self._simple_insert_many_txn( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ffa4246031..85b8ec2b8f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,11 +16,14 @@ from collections import namedtuple import logging +from six import iteritems, itervalues +from six.moves import range + from twisted.internet import defer from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine -from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR +from synapse.util.caches import intern_string, get_cache_factor_for from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.stringutils import to_ascii @@ -54,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore): super(StateGroupWorkerStore, self).__init__(db_conn, hs) self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR + "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache") ) @cached(max_entries=100000, iterable=True) @@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups) defer.returnValue(group_to_state) @@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore): state_event_map = yield self.get_events( [ - ev_id for group_ids in group_to_ids.itervalues() - for ev_id in group_ids.itervalues() + ev_id for group_ids in itervalues(group_to_ids) + for ev_id in itervalues(group_ids) ], get_prev_content=False ) defer.returnValue({ group: [ - state_event_map[v] for v in event_id_map.itervalues() + state_event_map[v] for v in itervalues(event_id_map) if v in state_event_map ] - for group, event_id_map in group_to_ids.iteritems() + for group, event_id_map in iteritems(group_to_ids) }) @defer.inlineCallbacks @@ -186,7 +189,7 @@ class StateGroupWorkerStore(SQLBaseStore): """ results = {} - chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] + chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)] for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", @@ -269,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore): for typ in types: if typ[1] is None: where_clauses.append("(type = ?)") - where_args.extend(typ[0]) + where_args.append(typ[0]) wildcard_types = True else: where_clauses.append("(type = ? AND state_key = ?)") @@ -347,21 +350,21 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups, types) state_event_map = yield self.get_events( - [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()], + [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], get_prev_content=False ) event_to_state = { event_id: { k: state_event_map[v] - for k, v in group_to_state[group].iteritems() + for k, v in iteritems(group_to_state[group]) if v in state_event_map } - for event_id, group in event_to_groups.iteritems() + for event_id, group in iteritems(event_to_groups) } defer.returnValue({event: event_to_state[event] for event in event_ids}) @@ -384,12 +387,12 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups, types) event_to_state = { event_id: group_to_state[group] - for event_id, group in event_to_groups.iteritems() + for event_id, group in iteritems(event_to_groups) } defer.returnValue({event: event_to_state[event] for event in event_ids}) @@ -503,7 +506,7 @@ class StateGroupWorkerStore(SQLBaseStore): got_all = is_all or not missing_types return { - k: v for k, v in state_dict_ids.iteritems() + k: v for k, v in iteritems(state_dict_ids) if include(k[0], k[1]) }, missing_types, got_all @@ -562,12 +565,12 @@ class StateGroupWorkerStore(SQLBaseStore): # Now we want to update the cache with all the things we fetched # from the database. - for group, group_state_dict in group_to_state_dict.iteritems(): + for group, group_state_dict in iteritems(group_to_state_dict): state_dict = results[group] state_dict.update( ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) - for k, v in group_state_dict.iteritems() + for k, v in iteritems(group_state_dict) ) self._state_group_cache.update( @@ -654,7 +657,7 @@ class StateGroupWorkerStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in delta_ids.iteritems() + for key, state_id in iteritems(delta_ids) ], ) else: @@ -669,7 +672,7 @@ class StateGroupWorkerStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in current_state_ids.iteritems() + for key, state_id in iteritems(current_state_ids) ], ) @@ -794,11 +797,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): "state_group": state_group_id, "event_id": event_id, } - for event_id, state_group_id in state_groups.iteritems() + for event_id, state_group_id in iteritems(state_groups) ], ) - for event_id, state_group_id in state_groups.iteritems(): + for event_id, state_group_id in iteritems(state_groups): txn.call_after( self._get_state_group_for_event.prefill, (event_id,), state_group_id @@ -826,7 +829,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): def reindex_txn(txn): new_last_state_group = last_state_group - for count in xrange(batch_size): + for count in range(batch_size): txn.execute( "SELECT id, room_id FROM state_groups" " WHERE ? < id AND id <= ?" @@ -884,7 +887,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): # of keys delta_state = { - key: value for key, value in curr_state.iteritems() + key: value for key, value in iteritems(curr_state) if prev_state.get(key, None) != value } @@ -924,7 +927,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in delta_state.iteritems() + for key, state_id in iteritems(delta_state) ], ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index f825264ea9..e485d19b84 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from twisted.internet import defer +import six from canonicaljson import encode_canonical_json @@ -25,6 +26,13 @@ from collections import namedtuple import logging import simplejson as json +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + logger = logging.getLogger(__name__) @@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore): "transaction_id": transaction_id, "origin": origin, "response_code": code, - "response_json": buffer(encode_canonical_json(response_dict)), + "response_json": db_binary_type(encode_canonical_json(response_dict)), "ts": self._clock.time_msec(), }, or_ignore=True, diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index d6e289ffbe..275c299998 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id +from six import iteritems + import re import logging @@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore): user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), profile.display_name, ) - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ @@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore): user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) - for user_id, p in users_with_profile.iteritems() + for user_id, p in iteritems(users_with_profile) ) else: # This should be unreachable. @@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore): "display_name": profile.display_name, "avatar_url": profile.avatar_url, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) ] ) for user_id in users_with_profile: diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 814a7bf71b..fc11e26623 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -20,6 +20,8 @@ from twisted.internet import defer, reactor, task import time import logging +from itertools import islice + logger = logging.getLogger(__name__) @@ -79,3 +81,19 @@ class Clock(object): except Exception: if not ignore_errs: raise + + +def batch_iter(iterable, size): + """batch an iterable up into tuples with a maximum size + + Args: + iterable (iterable): the iterable to slice + size (int): the maximum batch size + + Returns: + an iterator over the chunks + """ + # make sure we can deal with iterables like lists too + sourceiter = iter(iterable) + # call islice until it returns an empty tuple + return iter(lambda: tuple(islice(sourceiter, size)), ()) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 4adae96681..900575eb3c 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -13,28 +13,87 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse.metrics +from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily + import os +from six.moves import intern +import six + CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) -metrics = synapse.metrics.get_metrics_for("synapse.util.caches") + +def get_cache_factor_for(cache_name): + env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper() + factor = os.environ.get(env_var) + if factor: + return float(factor) + + return CACHE_SIZE_FACTOR + caches_by_name = {} -# cache_counter = metrics.register_cache( -# "cache", -# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, -# labels=["name"], -# ) - - -def register_cache(name, cache): - caches_by_name[name] = cache - return metrics.register_cache( - "cache", - lambda: len(cache), - name, - ) +collectors_by_name = {} + +cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) +cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) +cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"]) +cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) + +response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) +response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) +response_cache_evicted = Gauge( + "synapse_util_caches_response_cache:evicted_size", "", ["name"] +) +response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"]) + + +def register_cache(cache_type, cache_name, cache): + + # Check if the metric is already registered. Unregister it, if so. + # This usually happens during tests, as at runtime these caches are + # effectively singletons. + metric_name = "cache_%s_%s" % (cache_type, cache_name) + if metric_name in collectors_by_name.keys(): + REGISTRY.unregister(collectors_by_name[metric_name]) + + class CacheMetric(object): + + hits = 0 + misses = 0 + evicted_size = 0 + + def inc_hits(self): + self.hits += 1 + + def inc_misses(self): + self.misses += 1 + + def inc_evictions(self, size=1): + self.evicted_size += size + + def describe(self): + return [] + + def collect(self): + if cache_type == "response_cache": + response_cache_size.labels(cache_name).set(len(cache)) + response_cache_hits.labels(cache_name).set(self.hits) + response_cache_evicted.labels(cache_name).set(self.evicted_size) + response_cache_total.labels(cache_name).set(self.hits + self.misses) + else: + cache_size.labels(cache_name).set(len(cache)) + cache_hits.labels(cache_name).set(self.hits) + cache_evicted.labels(cache_name).set(self.evicted_size) + cache_total.labels(cache_name).set(self.hits + self.misses) + + yield GaugeMetricFamily("__unused", "") + + metric = CacheMetric() + REGISTRY.register(metric) + caches_by_name[cache_name] = cache + collectors_by_name[metric_name] = metric + return metric KNOWN_KEYS = { @@ -66,7 +125,9 @@ def intern_string(string): return None try: - string = string.encode("ascii") + if six.PY2: + string = string.encode("ascii") + return intern(string) except UnicodeEncodeError: return string diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 68285a7594..65a1042de1 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -17,7 +17,7 @@ import logging from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError, logcontext -from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.caches import get_cache_factor_for from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry from synapse.util.stringutils import to_ascii @@ -31,6 +31,9 @@ import functools import inspect import threading +from six import string_types, itervalues +import six + logger = logging.getLogger(__name__) @@ -80,7 +83,7 @@ class Cache(object): self.name = name self.keylen = keylen self.thread = None - self.metrics = register_cache(name, self.cache) + self.metrics = register_cache("cache", name, self.cache) def _on_evicted(self, evicted_count): self.metrics.inc_evictions(evicted_count) @@ -205,7 +208,7 @@ class Cache(object): def invalidate_all(self): self.check_thread() self.cache.clear() - for entry in self._pending_deferred_cache.itervalues(): + for entry in itervalues(self._pending_deferred_cache): entry.invalidate() self._pending_deferred_cache.clear() @@ -310,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase): orig, num_args=num_args, inlineCallbacks=inlineCallbacks, cache_context=cache_context) - max_entries = int(max_entries * CACHE_SIZE_FACTOR) + max_entries = int(max_entries * get_cache_factor_for(orig.__name__)) self.max_entries = max_entries self.tree = tree @@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase): ret.addErrback(onErr) - # If our cache_key is a string, try to convert to ascii to save - # a bit of space in large caches - if isinstance(cache_key, basestring): + # If our cache_key is a string on py2, try to convert to ascii + # to save a bit of space in large caches. Py3 does this + # internally automatically. + if six.PY2 and isinstance(cache_key, string_types): cache_key = to_ascii(cache_key) result_d = ObservableDeferred(ret, consumeErrors=True) @@ -565,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase): return results return logcontext.make_deferred_yieldable(defer.gatherResults( - cached_defers.values(), + list(cached_defers.values()), consumeErrors=True, ).addCallback(update_results_dict).addErrback( unwrapFirstError diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 1709e8b429..bdc21e348f 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -55,7 +55,7 @@ class DictionaryCache(object): __slots__ = [] self.sentinel = Sentinel() - self.metrics = register_cache(name, self.cache) + self.metrics = register_cache("dictionary", name, self.cache) def check_thread(self): expected_thread = self.thread diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 0aa103eecb..ff04c91955 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -52,12 +52,12 @@ class ExpiringCache(object): self._cache = OrderedDict() - self.metrics = register_cache(cache_name, self) - self.iterable = iterable self._size_estimate = 0 + self.metrics = register_cache("expiring", cache_name, self) + def start(self): if not self._expiry_ms: # Don't bother starting the loop if things never expire diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 7f79333e96..a8491b42d5 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -17,7 +17,7 @@ import logging from twisted.internet import defer from synapse.util.async import ObservableDeferred -from synapse.util.caches import metrics as cache_metrics +from synapse.util.caches import register_cache from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -38,15 +38,16 @@ class ResponseCache(object): self.timeout_sec = timeout_ms / 1000. self._name = name - self._metrics = cache_metrics.register_cache( - "response_cache", - size_callback=lambda: self.size(), - cache_name=name, + self._metrics = register_cache( + "response_cache", name, self ) def size(self): return len(self.pending_result_cache) + def __len__(self): + return self.size() + def get(self, key): """Look up the given key. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 941d873ab8..817118e30f 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR +from synapse.util import caches -from blist import sorteddict +from sortedcontainers import SortedDict import logging @@ -32,16 +32,18 @@ class StreamChangeCache(object): entities that may have changed since that position. If position key is too old then the cache will simply return all given entities. """ - def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}): - self._max_size = int(max_size * CACHE_SIZE_FACTOR) + + def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None): + self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR) self._entity_to_key = {} - self._cache = sorteddict() + self._cache = SortedDict() self._earliest_known_stream_pos = current_stream_pos self.name = name - self.metrics = register_cache(self.name, self._cache) + self.metrics = caches.register_cache("cache", self.name, self._cache) - for entity, stream_pos in prefilled_cache.items(): - self.entity_has_changed(entity, stream_pos) + if prefilled_cache: + for entity, stream_pos in prefilled_cache.items(): + self.entity_has_changed(entity, stream_pos) def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos @@ -65,22 +67,25 @@ class StreamChangeCache(object): return False def get_entities_changed(self, entities, stream_pos): - """Returns subset of entities that have had new things since the - given position. If the position is too old it will just return the given list. + """ + Returns subset of entities that have had new things since the given + position. Entities unknown to the cache will be returned. If the + position is too old it will just return the given list. """ assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) + not_known_entities = set(entities) - set(self._entity_to_key) - result = set( - self._cache[k] for k in keys[i:] - ).intersection(entities) + result = ( + set(self._cache.values()[self._cache.bisect_right(stream_pos) :]) + .intersection(entities) + .union(not_known_entities) + ) self.metrics.inc_hits() else: - result = entities + result = set(entities) self.metrics.inc_misses() return result @@ -90,12 +95,13 @@ class StreamChangeCache(object): """ assert type(stream_pos) is int + if not self._cache: + # If we have no cache, nothing can have changed. + return False + if stream_pos >= self._earliest_known_stream_pos: self.metrics.inc_hits() - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) - - return i < len(keys) + return self._cache.bisect_right(stream_pos) < len(self._cache) else: self.metrics.inc_misses() return True @@ -107,10 +113,7 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - keys = self._cache.keys() - i = keys.bisect_right(stream_pos) - - return [self._cache[k] for k in keys[i:]] + return self._cache.values()[self._cache.bisect_right(stream_pos) :] else: return None @@ -129,8 +132,10 @@ class StreamChangeCache(object): self._entity_to_key[entity] = stream_pos while len(self._cache) > self._max_size: - k, r = self._cache.popitem() - self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + k, r = self._cache.popitem(0) + self._earliest_known_stream_pos = max( + k, self._earliest_known_stream_pos, + ) self._entity_to_key.pop(r, None) def get_max_pos_of_last_change(self, entity): diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index fcc341a6b7..dd4c9e6067 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -1,3 +1,5 @@ +from six import itervalues + SENTINEL = object() @@ -49,7 +51,7 @@ class TreeCache(object): if popped is SENTINEL: return default - node_and_keys = zip(nodes, key) + node_and_keys = list(zip(nodes, key)) node_and_keys.reverse() node_and_keys.append((self.root, None)) @@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d): can contain dicts. """ if isinstance(d, dict): - for value_d in d.itervalues(): + for value_d in itervalues(d): for value in iterate_tree_cache_entry(value_d): yield value else: diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index f497b51f4a..15f0a7ba9e 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -16,16 +16,17 @@ from frozendict import frozendict import simplejson as json +from six import string_types + def freeze(o): - t = type(o) - if t is dict: + if isinstance(o, dict): return frozendict({k: freeze(v) for k, v in o.items()}) - if t is frozendict: + if isinstance(o, frozendict): return o - if t is str or t is unicode: + if isinstance(o, string_types): return o try: @@ -37,11 +38,10 @@ def freeze(o): def unfreeze(o): - t = type(o) - if t is dict or t is frozendict: + if isinstance(o, (dict, frozendict)): return dict({k: unfreeze(v) for k, v in o.items()}) - if t is str or t is unicode: + if isinstance(o, string_types): return o try: diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 914f616312..a58c723403 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -59,7 +59,7 @@ class LoggingContext(object): __slots__ = [ "previous_context", "name", "ru_stime", "ru_utime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", "usage_start", "main_thread", "alive", "request", "tag", @@ -84,10 +84,10 @@ class LoggingContext(object): def stop(self): pass - def add_database_transaction(self, duration_ms): + def add_database_transaction(self, duration_sec): pass - def add_database_scheduled(self, sched_ms): + def add_database_scheduled(self, sched_sec): pass def __nonzero__(self): @@ -103,11 +103,11 @@ class LoggingContext(object): self.ru_utime = 0. self.db_txn_count = 0 - # ms spent waiting for db txns, excluding scheduling time - self.db_txn_duration_ms = 0 + # sec spent waiting for db txns, excluding scheduling time + self.db_txn_duration_sec = 0 - # ms spent waiting for db txns to be scheduled - self.db_sched_duration_ms = 0 + # sec spent waiting for db txns to be scheduled + self.db_sched_duration_sec = 0 # If alive has the thread resource usage when the logcontext last # became active. @@ -230,18 +230,18 @@ class LoggingContext(object): return ru_utime, ru_stime - def add_database_transaction(self, duration_ms): + def add_database_transaction(self, duration_sec): self.db_txn_count += 1 - self.db_txn_duration_ms += duration_ms + self.db_txn_duration_sec += duration_sec - def add_database_scheduled(self, sched_ms): + def add_database_scheduled(self, sched_sec): """Record a use of the database pool Args: - sched_ms (int): number of milliseconds it took us to get a + sched_sec (float): number of seconds it took us to get a connection """ - self.db_sched_duration_ms += sched_ms + self.db_sched_duration_sec += sched_sec class LoggingContextFilter(logging.Filter): diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 3a83828d25..03249c5dc8 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -96,7 +96,7 @@ def time_function(f): id = _TIME_FUNC_ID _TIME_FUNC_ID += 1 - start = time.clock() * 1000 + start = time.clock() try: _log_debug_as_f( @@ -107,10 +107,10 @@ def time_function(f): r = f(*args, **kwargs) finally: - end = time.clock() * 1000 + end = time.clock() _log_debug_as_f( f, - "[FUNC END] {%s-%d} %f", + "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start,), ) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index e4b5687a4b..1ba7d65c7c 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -15,8 +15,8 @@ from twisted.internet import defer +from prometheus_client import Counter from synapse.util.logcontext import LoggingContext -import synapse.metrics from functools import wraps import logging @@ -24,66 +24,26 @@ import logging logger = logging.getLogger(__name__) +block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"]) -metrics = synapse.metrics.get_metrics_for(__name__) - -# total number of times we have hit this block -block_counter = metrics.register_counter( - "block_count", - labels=["block_name"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_block_timer:count", - "_block_ru_utime:count", - "_block_ru_stime:count", - "_block_db_txn_count:count", - "_block_db_txn_duration:count", - ) - ) -) - -block_timer = metrics.register_counter( - "block_time_seconds", - labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_timer:total", - ), -) - -block_ru_utime = metrics.register_counter( - "block_ru_utime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_utime:total", - ), -) - -block_ru_stime = metrics.register_counter( - "block_ru_stime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_stime:total", - ), -) - -block_db_txn_count = metrics.register_counter( - "block_db_txn_count", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_count:total", - ), -) +block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"]) + +block_ru_utime = Counter( + "synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]) + +block_ru_stime = Counter( + "synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]) + +block_db_txn_count = Counter( + "synapse_util_metrics_block_db_txn_count", "", ["block_name"]) # seconds spent waiting for db txns, excluding scheduling time, in this block -block_db_txn_duration = metrics.register_counter( - "block_db_txn_duration_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_duration:total", - ), -) +block_db_txn_duration = Counter( + "synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]) # seconds spent waiting for a db connection, in this block -block_db_sched_duration = metrics.register_counter( - "block_db_sched_duration_seconds", labels=["block_name"], -) +block_db_sched_duration = Counter( + "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) def measure_func(name): @@ -102,7 +62,7 @@ class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", "ru_stime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", "created_context", ] @@ -114,7 +74,7 @@ class Measure(object): self.created_context = False def __enter__(self): - self.start = self.clock.time_msec() + self.start = self.clock.time() self.start_context = LoggingContext.current_context() if not self.start_context: self.start_context = LoggingContext("Measure") @@ -123,17 +83,17 @@ class Measure(object): self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration_ms = self.start_context.db_txn_duration_ms - self.db_sched_duration_ms = self.start_context.db_sched_duration_ms + self.db_txn_duration_sec = self.start_context.db_txn_duration_sec + self.db_sched_duration_sec = self.start_context.db_sched_duration_sec def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: return - duration = self.clock.time_msec() - self.start + duration = self.clock.time() - self.start - block_counter.inc(self.name) - block_timer.inc_by(duration, self.name) + block_counter.labels(self.name).inc() + block_timer.labels(self.name).inc(duration) context = LoggingContext.current_context() @@ -150,19 +110,13 @@ class Measure(object): ru_utime, ru_stime = context.get_resource_usage() - block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name) - block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name) - block_db_txn_count.inc_by( - context.db_txn_count - self.db_txn_count, self.name - ) - block_db_txn_duration.inc_by( - (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000., - self.name - ) - block_db_sched_duration.inc_by( - (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000., - self.name - ) + block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime) + block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime) + block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count) + block_db_txn_duration.labels(self.name).inc( + context.db_txn_duration_sec - self.db_txn_duration_sec) + block_db_sched_duration.labels(self.name).inc( + context.db_sched_duration_sec - self.db_sched_duration_sec) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) |